]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9952: More work on the connection and transport code, couple things left to do...
authorShane Bryldt <astaelan@gmail.com>
Fri, 10 Feb 2017 02:17:20 +0000 (02:17 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 22 Mar 2017 21:42:49 +0000 (17:42 -0400)
libs/libblade/src/blade_connection.c
libs/libblade/src/blade_module_wss.c
libs/libblade/src/include/blade_connection.h
libs/libblade/src/include/blade_types.h

index 3ac336697f5d3ea367a250b86a8c65339394ebf3..69f1a39366045ccd65e4ad72b56f4c8f4c75d97d 100644 (file)
@@ -37,16 +37,18 @@ struct blade_connection_s {
        blade_handle_t *handle;
        ks_pool_t *pool;
 
+       void *transport_init_data;
        void *transport_data;
        blade_transport_callbacks_t *transport_callbacks;
 
        ks_bool_t shutdown;
        // @todo add auto generated UUID
+       blade_connection_direction_t direction;
     ks_thread_t *state_thread;
        blade_connection_state_t state;
        
        ks_q_t *sending;
-       ks_q_t *receiving;
+       //ks_q_t *receiving;
 };
 
 void *blade_connection_state_thread(ks_thread_t *thread, void *data);
@@ -54,7 +56,7 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data);
 
 KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
                                                                                                blade_handle_t *bh,
-                                                                                               void *transport_data,
+                                                                                               void *transport_init_data,
                                                                                                blade_transport_callbacks_t *transport_callbacks)
 {
        blade_connection_t *bc = NULL;
@@ -62,7 +64,6 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
 
        ks_assert(bcP);
        ks_assert(bh);
-       ks_assert(transport_data);
        ks_assert(transport_callbacks);
 
        pool = blade_handle_pool_get(bh);
@@ -70,10 +71,10 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
        bc = ks_pool_alloc(pool, sizeof(blade_connection_t));
        bc->handle = bh;
        bc->pool = pool;
-       bc->transport_data = transport_data;
+       bc->transport_init_data = transport_init_data;
        bc->transport_callbacks = transport_callbacks;
        ks_q_create(&bc->sending, pool, 0);
-       ks_q_create(&bc->receiving, pool, 0);
+       //ks_q_create(&bc->receiving, pool, 0);
        *bcP = bc;
 
        return KS_STATUS_SUCCESS;
@@ -91,17 +92,18 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
        blade_connection_shutdown(bc);
 
        ks_q_destroy(&bc->sending);
-       ks_q_destroy(&bc->receiving);
+       //ks_q_destroy(&bc->receiving);
 
        ks_pool_free(bc->pool, bcP);
 
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc)
+KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction)
 {
        ks_assert(bc);
 
+       bc->direction = direction;
        blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
 
     if (ks_thread_create_ex(&bc->state_thread,
@@ -112,7 +114,6 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc)
                                                        KS_PRI_NORMAL,
                                                        bc->pool) != KS_STATUS_SUCCESS) {
                // @todo error logging
-               blade_connection_disconnect(bc);
                return KS_STATUS_FAIL;
        }
        
@@ -136,6 +137,13 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc)
+{
+       ks_assert(bc);
+
+       return bc->transport_init_data;
+}
+
 KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
 {
        ks_assert(bc);
@@ -143,19 +151,72 @@ KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
        return bc->transport_data;
 }
 
+KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data)
+{
+       ks_assert(bc);
+
+       bc->transport_data = transport_data;
+}
+
+blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state)
+{
+       blade_transport_state_callback_t callback = NULL;
+       
+       ks_assert(bc);
+
+       switch (state) {
+       case BLADE_CONNECTION_STATE_DISCONNECT:
+               if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_disconnect_inbound;
+               else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_disconnect_outbound;
+               break;
+       case BLADE_CONNECTION_STATE_NEW:
+               if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_new_inbound;
+               else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_new_outbound;
+               break;
+       case BLADE_CONNECTION_STATE_CONNECT:
+               if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_connect_inbound;
+               else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_connect_outbound;
+               break;
+       case BLADE_CONNECTION_STATE_ATTACH:
+               if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_attach_inbound;
+               else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_attach_outbound;
+               break;
+       case BLADE_CONNECTION_STATE_DETACH:
+               if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_detach_inbound;
+               else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_detach_outbound;
+               break;
+       case BLADE_CONNECTION_STATE_READY:
+               if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_ready_inbound;
+               else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_ready_outbound;
+               break;
+       default: break;
+       }
+
+       return callback;
+}
+
 KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state)
 {
+       blade_transport_state_callback_t callback = NULL;
+       blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
        ks_assert(bc);
 
-       bc->transport_callbacks->onstate(bc, state, BLADE_CONNECTION_STATE_CONDITION_PRE);
+       callback = blade_connection_state_callback_lookup(bc, state);
+
+       if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_PRE);
+
        bc->state = state;
+       
+       if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
 }
 
 KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
 {
        ks_assert(bc);
 
-       blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
+       if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT)
+               blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH);
 }
 
 KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json)
@@ -178,26 +239,30 @@ KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, bla
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json)
-{
-       ks_assert(bc);
-       ks_assert(json);
+// @todo may not need receiving queue on connection, by the time we are queueing we should have a session to receive into
+//KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json)
+//{
+//     ks_assert(bc);
+//     ks_assert(json);
 
-       return ks_q_push(bc->receiving, json);
-}
+//     return ks_q_push(bc->receiving, json);
+//}
 
-KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json)
-{
-       ks_assert(bc);
-       ks_assert(json);
+//KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json)
+//{
+//     ks_assert(bc);
+//     ks_assert(json);
        
-       return ks_q_trypop(bc->receiving, (void **)json);
-}
+//     return ks_q_trypop(bc->receiving, (void **)json);
+//}
 
 void *blade_connection_state_thread(ks_thread_t *thread, void *data)
 {
        blade_connection_t *bc = NULL;
-       blade_connection_state_hook_t hook;
+       blade_connection_state_t state;
+       blade_transport_state_callback_t callback = NULL;
+       blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+       cJSON *json = NULL;
 
        ks_assert(thread);
        ks_assert(data);
@@ -205,20 +270,34 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
        bc = (blade_connection_t *)data;
 
        while (!bc->shutdown) {
-               // @todo need to get messages from the transport into receiving queue, and pop messages from sending queue to write out using transport
-               // sending is relatively easy, but receiving cannot occur universally due to cases like kws_init() blocking and expecting data to be on the wire
-               // and other transports may have similar behaviours, but CONNECTIN, ATTACH, and READY require async message passing into application layer
-               // and sending whenever the response hits the queue
+
+               // @todo pop from connection sending queue and call transport callback to write one message (passing target identity too)
+               // and delete the cJSON object here after returning from callback
+                               
                
-               // @todo it's possible that onstate could handle receiving and sending messages during the appropriate states, but this means some states
-               // like CONNECTIN which may send and receive multiple messages require BYPASSing until the application layer updates the state or disconnects
+               // @todo seems like connection will not need a receiving queue as the session will exist prior to async transmissions
+
+               state = bc->state;
+               hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+               callback = blade_connection_state_callback_lookup(bc, state);
+
+               // @todo should this just go in the ready state callback? it's generalized here, so the callback for READY doesn't really
+               // need to do anything
+               if (state == BLADE_CONNECTION_STATE_READY && bc->transport_callbacks->onreceive(bc, &json) == KS_STATUS_SUCCESS && json) {
+                       // @todo push json to session receiving queue
+                       
+               }
                
-               hook = bc->transport_callbacks->onstate(bc, bc->state, BLADE_CONNECTION_STATE_CONDITION_POST);
-               if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT)
-                       blade_connection_disconnect(bc);
+               if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
+
+               if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT && (state == BLADE_CONNECTION_STATE_DETACH || state == BLADE_CONNECTION_STATE_DISCONNECT))
+                       hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+               if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT)     blade_connection_disconnect(bc);
                else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) {
-                       // @todo pop from sending queue, and pass to transport callback to send out
-                       switch (bc->state) {
+                       switch (state) {
+                       case BLADE_CONNECTION_STATE_DISCONNECT:
+                               return NULL;
                        case BLADE_CONNECTION_STATE_NEW:
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT);
                                break;
@@ -226,10 +305,14 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH);
                                break;
                        case BLADE_CONNECTION_STATE_ATTACH:
+                               // @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge?
+                               // determine how much of session management is handled here... do we process these session negotiation messages without
+                               // passing it up to the application layer? or does the application layer give back a session and build the response?
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
                                break;
                        case BLADE_CONNECTION_STATE_DETACH:
-                               blade_connection_disconnect(bc);
+                               // @todo detach from session if this connection is attached
+                               blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
                                break;
                        default: break;
                        }
index 3efe4307af6f826fecf285e3e00393daabcbd244..75f2993a51284f213a1ef3633b602710943db8f9 100644 (file)
@@ -37,6 +37,7 @@
 
 typedef struct blade_module_wss_s blade_module_wss_t;
 typedef struct blade_transport_wss_s blade_transport_wss_t;
+typedef struct blade_transport_wss_init_s blade_transport_wss_init_t;
 
 struct blade_module_wss_s {
        blade_handle_t *handle;
@@ -70,42 +71,81 @@ struct blade_transport_wss_s {
        kws_t *kws;
 };
 
+struct blade_transport_wss_init_s {
+       blade_module_wss_t *module;
+       ks_pool_t *pool;
+
+       ks_socket_t sock;
+};
+
 
 
 ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh);
 ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP);
 
-ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh);
-ks_status_t blade_module_wss_onunload(blade_module_t *bm);
-ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config);
-ks_status_t blade_module_wss_onshutdown(blade_module_t *bm);
+ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh);
+ks_status_t blade_module_wss_on_unload(blade_module_t *bm);
+ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config);
+ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm);
 
 ks_status_t blade_module_wss_listen(blade_module_wss_t *bm, ks_sockaddr_t *addr);
 void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
 
 
+
 ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
 ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
 
-ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
-blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target);
-blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition);
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
+blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
+
+ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json);
+ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json);
+
+blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition);
+
+
+
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock);
+ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
 
 
 
 static blade_module_callbacks_t g_module_wss_callbacks =
 {
-       blade_module_wss_onload,
-       blade_module_wss_onunload,
-       blade_module_wss_onstartup,
-       blade_module_wss_onshutdown,
+       blade_module_wss_on_load,
+       blade_module_wss_on_unload,
+       blade_module_wss_on_startup,
+       blade_module_wss_on_shutdown,
 };
 
 static blade_transport_callbacks_t g_transport_wss_callbacks =
 {
-       blade_transport_wss_onconnect,
-       blade_transport_wss_onrank,
-       blade_transport_wss_onstate,
+       blade_transport_wss_on_connect,
+       blade_transport_wss_on_rank,
+       blade_transport_wss_on_send,
+       blade_transport_wss_on_receive,
+       
+       blade_transport_wss_on_state_disconnect,
+       blade_transport_wss_on_state_disconnect,
+       blade_transport_wss_on_state_new_inbound,
+       blade_transport_wss_on_state_new_outbound,
+       blade_transport_wss_on_state_connect_inbound,
+       blade_transport_wss_on_state_connect_outbound,
+       blade_transport_wss_on_state_attach_inbound,
+       blade_transport_wss_on_state_attach_outbound,
+       blade_transport_wss_on_state_detach,
+       blade_transport_wss_on_state_detach,
+       blade_transport_wss_on_state_ready,
+       blade_transport_wss_on_state_ready,
 };
 
 
@@ -144,7 +184,7 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
 
        bm_wss = *bm_wssP;
 
-       blade_module_wss_onshutdown(bm_wss->module);
+       blade_module_wss_on_shutdown(bm_wss->module);
        
        blade_module_destroy(&bm_wss->module);
 
@@ -156,7 +196,7 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh)
+ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh)
 {
        blade_module_wss_t *bm_wss = NULL;
 
@@ -171,7 +211,7 @@ ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh)
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_module_wss_onunload(blade_module_t *bm)
+ks_status_t blade_module_wss_on_unload(blade_module_t *bm)
 {
        blade_module_wss_t *bm_wss = NULL;
 
@@ -293,7 +333,7 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config)
+ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config)
 {
        blade_module_wss_t *bm_wss = NULL;
        
@@ -331,7 +371,7 @@ ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *con
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_module_wss_onshutdown(blade_module_t *bm)
+ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm)
 {
        blade_module_wss_t *bm_wss = NULL;
        blade_transport_wss_t *bt_wss = NULL;
@@ -423,6 +463,7 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a
 void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 {
        blade_module_wss_t *bm_wss = NULL;
+       blade_transport_wss_init_t *bt_wss_init = NULL;
        blade_transport_wss_t *bt_wss = NULL;
        blade_connection_t *bc = NULL;
 
@@ -448,27 +489,32 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
                                        continue;
                                }
 
-                               blade_transport_wss_create(&bt_wss, bm_wss, sock);
-                               ks_assert(bt_wss);
+                               blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
+                               ks_assert(bt_wss_init);
                                
-                blade_connection_create(&bc, bm_wss->handle, bt_wss, bm_wss->transport_callbacks);
+                blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
                                ks_assert(bc);
 
-                               blade_connection_startup(bc);
-
+                               if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
+                                       blade_connection_destroy(&bc);
+                                       blade_transport_wss_init_destroy(&bt_wss_init);
+                                       ks_socket_close(&sock);
+                                       continue;
+                               }
                                list_append(&bm_wss->connected, bc);
-
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
                        }
                }
 
                while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) {
+                       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
                        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
                        list_delete(&bm_wss->connected, bc);
 
+                       if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
                        blade_connection_destroy(&bc);
-                       blade_transport_wss_destroy(&bt_wss);
+                       if (bt_wss) blade_transport_wss_destroy(&bt_wss);
                }
        }
 
@@ -512,7 +558,7 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
 {
        ks_assert(bcP);
        ks_assert(bm);
@@ -525,14 +571,44 @@ ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module
        return KS_STATUS_SUCCESS;
 }
 
-blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target)
+blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target)
 {
        ks_assert(bc);
        ks_assert(target);
        
+       return BLADE_CONNECTION_RANK_POOR;
+}
+
+ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json)
+{
+       char *json_str = cJSON_PrintUnformatted(json);
+       ks_size_t json_str_len = 0;
+       if (!json_str) {
+               // @todo error logging
+               return KS_STATUS_FAIL;
+       }
+       json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
+       kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len);
+
+       free(json_str);
+
        return KS_STATUS_SUCCESS;
 }
 
+ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json)
+{
+       blade_transport_wss_t *bt_wss = NULL;
+
+       ks_assert(bc);
+       ks_assert(json);
+
+       ks_log(KS_LOG_DEBUG, "Send Callback\n");
+
+       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+
+       return blade_transport_wss_write(bt_wss, json);
+}
+
 ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json)
 {
        // @todo get exact timeout from service config?
@@ -559,12 +635,6 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json
                        return KS_STATUS_FAIL;
                }
 
-               //if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
-               // @todo error logging
-               //      return KS_STATUS_FAIL;
-               //}
-                               
-               // @todo convert frame_data to cJSON safely, make sure data is null-terminated at frame_data_len
                if (!(*json = cJSON_Parse((char *)frame_data))) {
                        return KS_STATUS_FAIL;
                }
@@ -572,77 +642,170 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json)
+ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json)
 {
-       //blade_message_get(message, &target, &json);
-       char *json_str = cJSON_PrintUnformatted(json);
-       ks_size_t json_str_len = 0;
-       if (!json_str) {
-               // @todo error logging
-               return KS_STATUS_FAIL;
-       }
-       json_str_len = strlen(json_str) + 1;
-       kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len);
+       blade_transport_wss_t *bt_wss = NULL;
 
-       return KS_STATUS_SUCCESS;
+       ks_assert(bc);
+       ks_assert(json);
+
+       ks_log(KS_LOG_DEBUG, "Receive Callback\n");
+
+       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+
+       return blade_transport_wss_read(bt_wss, json);
 }
 
-blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition)
+blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
        blade_transport_wss_t *bt_wss = NULL;
-       //cJSON *json = NULL;
 
        ks_assert(bc);
 
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
-       switch (state) {
-       case BLADE_CONNECTION_STATE_DISCONNECT:
-               {
-                       if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) {
-                               ks_q_push(bt_wss->module->disconnected, bc);
-                               blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
-                       }
-                       break;
-               }
-       case BLADE_CONNECTION_STATE_NEW:
-               {
-                       if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) {
-                               // @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
-                               if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
-                                       // @todo error logging
-                                       return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
-                               }
-                       }
-                       break;
-               }
-       case BLADE_CONNECTION_STATE_CONNECT:
-               {
-                       // @todo abstract read message and write message, so these can be called from connection and processed from there
-                       
-                       //if (blade_transport_wss_read(bt_wss, &json) != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATEHOOK_DISCONNECT;
-
-                       //if (json) {
-                               // @todo processing connectin messages for identity registration
-                       //      cJSON_Delete(json);
-                               //blade_connection_receiving_push(conn, json);
-                       //}
-
-                       // @todo wrap identity + json into an envelope for queueing through the connection
-                       //while (blade_connection_sending_pop(bc, (void **)&json) == KS_STATUS_SUCCESS && json) {
-                       //      ks_status_t ret = blade_transport_wss_write(bt_wss, json);
-                       //      cJSON_Delete(json);
-                       //      if (ret != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
-                       //}
-                       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
-                       //break;
-               }
-       default: break;
+       ks_q_push(bt_wss->module->disconnected, bc);
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       blade_transport_wss_t *bt_wss = NULL;
+       blade_transport_wss_init_t *bt_wss_init = NULL;
+
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
+
+       blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
+       ks_assert(bt_wss);
+
+       blade_connection_transport_set(bc, bt_wss);
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       blade_transport_wss_t *bt_wss = NULL;
+
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+
+       // @todo: SSL init stuffs based on data from config to pass into kws_init
+       if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
+               // @todo error logging
+               return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
        }
-       
+
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+       // @todo Establish sessid and discover existing session or create and register new session through BLADE commands
+       // Set session state to CONNECT if its new or RECONNECT if existing
+       // start session and its thread if its new
+                                                                                                                                                                                               
+       return BLADE_CONNECTION_STATE_HOOK_BYPASS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+       ks_assert(bc);
+
+       ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+       return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock)
+{
+       blade_transport_wss_init_t *bt_wssi = NULL;
+
+       ks_assert(bt_wssiP);
+       ks_assert(bm_wss);
+       ks_assert(sock != KS_SOCK_INVALID);
+
+    bt_wssi = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_init_t));
+       bt_wssi->module = bm_wss;
+       bt_wssi->pool = bm_wss->pool;
+       bt_wssi->sock = sock;
+
+       *bt_wssiP = bt_wssi;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP)
+{
+       blade_transport_wss_init_t *bt_wssi = NULL;
+       
+       ks_assert(bt_wssiP);
+       ks_assert(*bt_wssiP);
+
+       bt_wssi = *bt_wssiP;
+
+       ks_pool_free(bt_wssi->pool, bt_wssiP);
+       
+       return KS_STATUS_SUCCESS;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index d48ec8c221b8debb23cd213038e42c24c7b15687..70ee105b7df49c1da47280e2d7ac46a5193a8a15 100644 (file)
@@ -41,9 +41,11 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
                                                                                                void *transport_data,
                                                                                                blade_transport_callbacks_t *transport_callbacks);
 KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP);
-KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc);
+KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction);
 KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc);
+KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc);
 KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
+KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data);
 KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
 KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
 KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
index cc2c05419709bd818b75e82f556d2b7d9bc2fbaa..c2c961266ba3a127c2a337bdd8a781d810f4f83e 100644 (file)
@@ -62,8 +62,8 @@ typedef enum {
 } blade_connection_state_t;
 
 typedef enum {
-       BLADE_CONNECTION_DIRECTION_IN,
-       BLADE_CONNECTION_DIRECTION_OUT,
+       BLADE_CONNECTION_DIRECTION_INBOUND,
+       BLADE_CONNECTION_DIRECTION_OUTBOUND,
 } blade_connection_direction_t;
 
 typedef enum {
@@ -99,14 +99,28 @@ struct blade_module_callbacks_s {
 
 typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
 typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target);
-typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc,
-                                                                                                                                                 blade_connection_state_t state,
-                                                                                                                                                 blade_connection_state_condition_t condition);
+typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, blade_identity_t *target, cJSON *json);
+typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json);
+typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition);
 
 struct blade_transport_callbacks_s {
        blade_transport_connect_callback_t onconnect;
        blade_transport_rank_callback_t onrank;
-       blade_transport_state_callback_t onstate;
+       blade_transport_send_callback_t onsend;
+       blade_transport_receive_callback_t onreceive;
+
+       blade_transport_state_callback_t onstate_disconnect_inbound;
+       blade_transport_state_callback_t onstate_disconnect_outbound;
+       blade_transport_state_callback_t onstate_new_inbound;
+       blade_transport_state_callback_t onstate_new_outbound;
+       blade_transport_state_callback_t onstate_connect_inbound;
+       blade_transport_state_callback_t onstate_connect_outbound;
+       blade_transport_state_callback_t onstate_attach_inbound;
+       blade_transport_state_callback_t onstate_attach_outbound;
+       blade_transport_state_callback_t onstate_detach_inbound;
+       blade_transport_state_callback_t onstate_detach_outbound;
+       blade_transport_state_callback_t onstate_ready_inbound;
+       blade_transport_state_callback_t onstate_ready_outbound;
 };