]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9952: Preliminary session negotiations done, added a bunch of logging, fixed up...
authorShane Bryldt <astaelan@gmail.com>
Thu, 23 Feb 2017 23:01:22 +0000 (23:01 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 22 Mar 2017 21:42:50 +0000 (17:42 -0400)
libs/libblade/src/blade_connection.c
libs/libblade/src/blade_identity.c
libs/libblade/src/blade_module_wss.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade_connection.h
libs/libblade/src/include/blade_session.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_types.h
libs/libblade/test/bladec.c

index eadbea4adb1762f983152bbf82874c2820163367..7d5cfadfe08e85885030b4293ba60d7079f910c6 100644 (file)
@@ -90,6 +90,8 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
 
        *bcP = bc;
 
+       ks_log(KS_LOG_DEBUG, "Created\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -112,6 +114,8 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
 
        ks_pool_free(bc->pool, bcP);
 
+       ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -133,6 +137,8 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_c
                return KS_STATUS_FAIL;
        }
        
+       ks_log(KS_LOG_DEBUG, "Started\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -153,6 +159,8 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
 
        while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
 
+       ks_log(KS_LOG_DEBUG, "Stopped\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -163,6 +171,13 @@ KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc)
        return bc->handle;
 }
 
+KS_DECLARE(ks_pool_t *) blade_connection_pool_get(blade_connection_t *bc)
+{
+       ks_assert(bc);
+
+       return bc->pool;
+}
+
 KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc)
 {
        ks_assert(bc);
@@ -285,8 +300,10 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
 {
        ks_assert(bc);
 
-       if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT)
+       if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT) {
+               ks_log(KS_LOG_DEBUG, "Connection (%s) disconnecting\n", bc->id);
                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH);
+       }
 }
 
 KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json)
@@ -342,6 +359,9 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
                callback = blade_connection_state_callback_lookup(bc, state);
 
+               if (state == BLADE_CONNECTION_STATE_DISCONNECT) {
+                       blade_handle_connections_remove(bc);
+               }
                // @todo only READY state?
                if (state != BLADE_CONNECTION_STATE_DETACH && state != BLADE_CONNECTION_STATE_DISCONNECT) {
                        while (blade_connection_sending_pop(bc, &json) == KS_STATUS_SUCCESS && json) {
@@ -363,7 +383,9 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                                        break;
                                }
                                if (!(done = (json == NULL))) {
-                                       // @todo push json to session receiving queue
+                                       blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
+                                       ks_assert(bs);
+                                       blade_session_receiving_push(bs, json);
                                        cJSON_Delete(json);
                                        json = NULL;
                                }
@@ -379,7 +401,8 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) {
                        switch (state) {
                        case BLADE_CONNECTION_STATE_DISCONNECT:
-                               return NULL;
+                               blade_connection_destroy(&bc);
+                               break;
                        case BLADE_CONNECTION_STATE_NEW:
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT);
                                break;
@@ -388,24 +411,38 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                                break;
                        case BLADE_CONNECTION_STATE_ATTACH:
                                {
+                                       // @todo this is adding a second lock, since we keep it locked in the callback to allow finishing, we don't want get locking here...
+                                       // or just try unlocking twice to confirm...
                                        blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
                                        ks_assert(bs); // should not happen because bs should still be locked
                                        
                                        blade_session_connections_add(bs, bc->id);
                                        
                                        blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
-                                       blade_session_state_set(bs, BLADE_SESSION_STATE_READY);
+                                       blade_session_state_set(bs, BLADE_SESSION_STATE_READY); // @todo only set this if it's not already in the READY state from prior connection
                                        
+                                       blade_session_read_unlock(bs); // unlock the session we locked obtaining it above
                                        blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching
                                        break;
                                }
                        case BLADE_CONNECTION_STATE_DETACH:
-                               // @todo detach from session if this connection is attached
-                               blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
-                               break;
+                               {
+                                       if (bc->session) {
+                                               blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
+                                               ks_assert(bs);
+
+                                               blade_session_connections_remove(bs, bc->id);
+                                               blade_session_read_unlock(bs);
+                                               // keep bc->session for later in case something triggers a reconnect later and needs the old session id for a hint
+                                       }
+                                       blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
+                                       break;
+                               }
                        default: break;
                        }
                }
+
+               if (state == BLADE_CONNECTION_STATE_DISCONNECT) break;
        }
 
        return NULL;
index 040bc9727d2d7a1bc087d07b1f4d6463fe9e939d..5c2cecafad0443778b6bcc38dfa99caac5585268 100644 (file)
@@ -127,19 +127,6 @@ KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *u
                }
        }
 
-       // @todo remove this, temporary for testing
-       ks_log(KS_LOG_DEBUG, "       name: %s\n", bi->name);
-       ks_log(KS_LOG_DEBUG, "     domain: %s\n", bi->domain);
-       ks_log(KS_LOG_DEBUG, "   resource: %s\n", bi->resource);
-       for (ks_hash_iterator_t *it = ks_hash_first(bi->parameters, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-               const char *key = NULL;
-               const char *val = NULL;
-
-               ks_hash_this(it, (const void **)&key, NULL, (void **)&val);
-               
-               ks_log(KS_LOG_DEBUG, "        key: %s = %s\n", key, val);
-       }
-               
        return KS_STATUS_SUCCESS;
 }
 
index 429fdba344eef2cc335bb9d901d2e8a6f47a8253..31d2235e30418e43015a5d1a287ed98ebc177362 100644 (file)
@@ -60,8 +60,7 @@ struct blade_module_wss_s {
        struct pollfd *listeners_poll;
        int32_t listeners_count;
 
-       list_t connected;
-       ks_q_t *disconnected;
+       list_t connected; // @todo consider keeping this only as the list of connection id's, since the handle retains the pointer lookup
 };
 
 struct blade_transport_wss_s {
@@ -77,6 +76,7 @@ struct blade_transport_wss_init_s {
        ks_pool_t *pool;
 
        ks_socket_t sock;
+       const char *session_id;
 };
 
 
@@ -98,7 +98,7 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
 ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
 ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
 
-ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
 blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
 
 ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json);
@@ -116,7 +116,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio
 
 
 
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock);
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
 ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
 
 
@@ -172,11 +172,11 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t
        bm_wss->transport_callbacks = &g_transport_wss_callbacks;
 
        list_init(&bm_wss->connected);
-       ks_q_create(&bm_wss->disconnected, bm_wss->pool, 0);
-       ks_assert(bm_wss->disconnected);
 
        *bm_wssP = bm_wss;
 
+       ks_log(KS_LOG_DEBUG, "Created\n");
+       
        return KS_STATUS_SUCCESS;
 }
 
@@ -194,10 +194,11 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
        blade_module_destroy(&bm_wss->module);
 
        list_destroy(&bm_wss->connected);
-       ks_q_destroy(&bm_wss->disconnected);
 
        ks_pool_free(bm_wss->pool, bm_wssP);
 
+       ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -213,6 +214,8 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_load(blade_module_t **bmP, blade_han
 
        *bmP = bm_wss->module;
        
+       ks_log(KS_LOG_DEBUG, "Loaded\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -226,10 +229,12 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_unload(blade_module_t *bm)
        
        blade_module_wss_destroy(&bm_wss);
        
+       ks_log(KS_LOG_DEBUG, "Unloaded\n");
+
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock)
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id)
 {
        blade_transport_wss_init_t *bt_wssi = NULL;
 
@@ -241,9 +246,12 @@ ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssi
        bt_wssi->module = bm_wss;
        bt_wssi->pool = bm_wss->pool;
        bt_wssi->sock = sock;
+       if (session_id) bt_wssi->session_id = ks_pstrdup(bt_wssi->pool, session_id);
 
        *bt_wssiP = bt_wssi;
        
+       ks_log(KS_LOG_DEBUG, "Created\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -256,8 +264,12 @@ ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wss
 
        bt_wssi = *bt_wssiP;
 
+       if (bt_wssi->session_id) ks_pool_free(bt_wssi->pool, &bt_wssi->session_id);
+
        ks_pool_free(bt_wssi->pool, bt_wssiP);
        
+       ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -365,6 +377,8 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t
        bm_wss->config_wss_endpoints_backlog = config_wss_endpoints_backlog;
        //bm_wss->config_wss_ssl = config_wss_ssl;
 
+       ks_log(KS_LOG_DEBUG, "Configured\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -405,13 +419,14 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
        
        blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
        
+       ks_log(KS_LOG_DEBUG, "Started\n");
+
        return KS_STATUS_SUCCESS;
 }
 
 KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
 {
        blade_module_wss_t *bm_wss = NULL;
-       blade_transport_wss_t *bt_wss = NULL;
        blade_connection_t *bc = NULL;
        
        ks_assert(bm);
@@ -435,20 +450,18 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
        bm_wss->listeners_count = 0;
        if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
 
-       // @todo connections should be gracefully disconnected so that they detach from sessions properly
-       // which means this should occur before the listeners thread is terminated, which requires that
-       // the listener sockets be made inactive (or closed) to stop accepting while shutting down
-       while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) ;
-       list_iterator_start(&bm_wss->connected);
-       while (list_iterator_hasnext(&bm_wss->connected)) {
-               bc = (blade_connection_t *)list_iterator_next(&bm_wss->connected);
-               bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
-
-               blade_connection_destroy(&bc);
-               blade_transport_wss_destroy(&bt_wss);
+       if (list_size(&bm_wss->connected) > 0) {
+               // this approach to shutdown is cleaner, ensures connections will detach from sessions and be destroyed all in the same places
+               list_iterator_start(&bm_wss->connected);
+               while (list_iterator_hasnext(&bm_wss->connected)) {
+                       bc = (blade_connection_t *)list_iterator_next(&bm_wss->connected);
+                       blade_connection_disconnect(bc);
+               }
+               list_iterator_stop(&bm_wss->connected);
+               while (list_size(&bm_wss->connected) > 0) ks_sleep_ms(100);
        }
-       list_iterator_stop(&bm_wss->connected);
-       list_clear(&bm_wss->connected);
+       
+       ks_log(KS_LOG_DEBUG, "Stopped\n");
 
        return KS_STATUS_SUCCESS;
 }
@@ -492,6 +505,8 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a
        bm_wss->listeners_poll[listener_index].fd = listener;
        bm_wss->listeners_poll[listener_index].events = POLLIN | POLLERR;
 
+       ks_log(KS_LOG_DEBUG, "Bound %s on port %d at index %d\n", ks_addr_get_host(addr), ks_addr_get_port(addr), listener_index);
+
  done:
        if (ret != KS_STATUS_SUCCESS) {
                if (listener != KS_SOCK_INVALID) {
@@ -506,7 +521,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 {
        blade_module_wss_t *bm_wss = NULL;
        blade_transport_wss_init_t *bt_wss_init = NULL;
-       blade_transport_wss_t *bt_wss = NULL;
        blade_connection_t *bc = NULL;
 
        ks_assert(thread);
@@ -523,18 +537,22 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 
                                if (bm_wss->listeners_poll[index].revents & POLLERR) {
                                        // @todo: error handling, just skip the listener for now, it might recover, could skip X times before closing?
+                                       ks_log(KS_LOG_DEBUG, "POLLERR on index %d\n", index);
                                        continue;
                                }
                                if (!(bm_wss->listeners_poll[index].revents & POLLIN)) continue;
 
                                if ((sock = accept(bm_wss->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) {
                                        // @todo: error handling, just skip the socket for now as most causes are because remote side became unreachable
+                                       ks_log(KS_LOG_DEBUG, "Accept failed on index %d\n", index);
                                        continue;
                                }
 
-                               ks_log(KS_LOG_DEBUG, "Socket Accepted\n");
+                               // @todo getsockname and getpeername (getpeername can be skipped if passing to accept instead)
+                               
+                               ks_log(KS_LOG_DEBUG, "Socket accepted\n", index);
 
-                               blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
+                               blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, NULL);
                                ks_assert(bt_wss_init);
 
                 blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
@@ -543,11 +561,14 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
                                blade_connection_read_lock(bc, KS_TRUE);
 
                                if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
+                                       ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
                                        blade_connection_destroy(&bc);
                                        blade_transport_wss_init_destroy(&bt_wss_init);
                                        ks_socket_close(&sock);
                                        continue;
                                }
+                               ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
+                               
                                blade_handle_connections_add(bc);
                                list_append(&bm_wss->connected, bc);
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
@@ -555,18 +576,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
                                blade_connection_read_unlock(bc);
                        }
                }
-
-               while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) {
-                       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-                       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
-
-                       blade_handle_connections_remove(bc);
-                       list_delete(&bm_wss->connected, bc);
-
-                       if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
-                       blade_connection_destroy(&bc);
-                       if (bt_wss) blade_transport_wss_destroy(&bt_wss);
-               }
        }
        ks_log(KS_LOG_DEBUG, "Stopped\n");
 
@@ -590,6 +599,8 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_mo
 
        *bt_wssP = bt_wss;
        
+       ks_log(KS_LOG_DEBUG, "Created\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -607,10 +618,12 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
        
        ks_pool_free(bt_wss->pool, bt_wssP);
        
+       ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_module_wss_t *bm_wss = NULL;
@@ -640,6 +653,7 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul
        if (!ip) {
                // @todo: temporary, this should fall back on DNS SRV or whatever else can turn "a@b.com" into an ip (and port?) to connect to
                // also need to deal with hostname lookup, so identities with wss transport need to have a host parameter that is an IP for the moment
+               ks_log(KS_LOG_DEBUG, "No host provided\n");
                ret = KS_STATUS_FAIL;
                goto done;
        }
@@ -649,6 +663,7 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul
                ks_size_t len = strlen(ip);
 
                if (len <= 3) {
+                       ks_log(KS_LOG_DEBUG, "Invalid host provided\n");
                        ret = KS_STATUS_FAIL;
                        goto done;
                }
@@ -660,36 +675,43 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul
                int p = atoi(portstr);
                if (p > 0 && p <= UINT16_MAX) port = p;
        }
-       
+
+       ks_log(KS_LOG_DEBUG, "Connecting to %s on port %d\n", ip, port);
+
        ks_addr_set(&addr, ip, port, family);
        if ((sock = ks_socket_connect(SOCK_STREAM, IPPROTO_TCP, &addr)) == KS_SOCK_INVALID) {
                // @todo: error handling, just fail for now as most causes are because remote side became unreachable
+               ks_log(KS_LOG_DEBUG, "Connect failed\n");
                ret = KS_STATUS_FAIL;
                goto done;
        }
 
-       ks_log(KS_LOG_DEBUG, "Socket Connected\n");
+       ks_log(KS_LOG_DEBUG, "Socket connected\n");
 
-       blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
+       blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, session_id);
        ks_assert(bt_wss_init);
 
        blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
        ks_assert(bc);
 
        if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_OUTBOUND) != KS_STATUS_SUCCESS) {
+               ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
                blade_connection_destroy(&bc);
                blade_transport_wss_init_destroy(&bt_wss_init);
                ks_socket_close(&sock);
                ret = KS_STATUS_FAIL;
                goto done;
        }
+       ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
        // @todo make sure it's sensible to be mixing outbound and inbound connections in the same list, but this allows entering the destruction pipeline
        // for module shutdown, disconnects and errors without special considerations
+       blade_handle_connections_add(bc);
        list_append(&bm_wss->connected, bc);
-       *bcP = bc;
-       
+
        blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
 
+       *bcP = bc;
+
  done:
        return ret;
 }
@@ -708,16 +730,18 @@ ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json
        char *json_str = cJSON_PrintUnformatted(json);
        ks_size_t json_str_len = 0;
        if (!json_str) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Failed to generate json string\n");
                ret = KS_STATUS_FAIL;
                goto done;
        }
-       json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
+       // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
+       json_str_len = strlen(json_str) + 1;
        if (kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len) != json_str_len) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Failed to write frame\n");
                ret = KS_STATUS_FAIL;
                goto done;
        }
+       ks_log(KS_LOG_DEBUG, "Frame written %d bytes\n", json_str_len);
 
  done:
        if (json_str) free(json_str);
@@ -733,13 +757,10 @@ ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json)
        ks_assert(bc);
        ks_assert(json);
 
-       ks_log(KS_LOG_DEBUG, "Send Callback\n");
-
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
        ret = blade_transport_wss_write(bt_wss, json);
 
-       // @todo use reference counting on blade_identity_t and cJSON objects
        cJSON_Delete(json);
 
        return ret;
@@ -753,7 +774,7 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json
        *json = NULL;
 
        if (poll_flags & KS_POLL_ERROR) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "POLLERR\n");
                return KS_STATUS_FAIL;
        }
        if (poll_flags & KS_POLL_READ) {
@@ -768,10 +789,13 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json
                        // -2 means nonblocking wait
                        // other values are based on WS_XXX reasons
                        // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
+                       ks_log(KS_LOG_DEBUG, "Failed to read frame\n");
                        return KS_STATUS_FAIL;
                }
+               ks_log(KS_LOG_DEBUG, "Frame read %d bytes\n", frame_data_len);
 
                if (!(*json = cJSON_Parse((char *)frame_data))) {
+                       ks_log(KS_LOG_DEBUG, "Failed to parse frame\n");
                        return KS_STATUS_FAIL;
                }
        }
@@ -785,8 +809,6 @@ ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json)
        ks_assert(bc);
        ks_assert(json);
 
-       ks_log(KS_LOG_DEBUG, "Receive Callback\n");
-
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
        return blade_transport_wss_read(bt_wss, json);
@@ -795,6 +817,7 @@ ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json)
 blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
        blade_transport_wss_t *bt_wss = NULL;
+       blade_transport_wss_init_t *bt_wss_init = NULL;
 
        ks_assert(bc);
 
@@ -803,8 +826,12 @@ blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_conn
        if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
+
+       list_delete(&bt_wss->module->connected, bc);
 
-       ks_q_push(bt_wss->module->disconnected, bc);
+       if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
+       if (bt_wss) blade_transport_wss_destroy(&bt_wss);
 
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
@@ -865,7 +892,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade
 
        // @todo: SSL init stuffs based on data from config to pass into kws_init
        if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Failed websocket init\n");
                return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
        }
 
@@ -886,7 +913,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blad
 
        // @todo: SSL init stuffs based on data from config to pass into kws_init
        if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, "/blade:blade.invalid:blade", KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Failed websocket init\n");
                return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
        }
 
@@ -897,13 +924,16 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_
 {
        blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
        blade_transport_wss_t *bt_wss = NULL;
-       cJSON *json = NULL;
+       cJSON *json_req = NULL;
+       cJSON *json_res = NULL;
        cJSON *params = NULL;
+       cJSON *result = NULL;
+       //cJSON *error = NULL;
        blade_session_t *bs = NULL;
        blade_handle_t *bh = NULL;
        const char *jsonrpc = NULL;
-       const char *method = NULL;
        const char *id = NULL;
+       const char *method = NULL;
        const char *sid = NULL;
        ks_time_t timeout;
 
@@ -914,114 +944,258 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
+       if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
        // @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
        timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
-       while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) {
-               if (json) break;
-               ks_sleep(250);
+       while (blade_transport_wss_read(bt_wss, &json_req) == KS_STATUS_SUCCESS) {
+               if (json_req) break;
+               ks_sleep_ms(250);
                if (ks_time_now() >= timeout) break;
        }
 
-       if (!json) {
-               // @todo error logging
+       if (!json_req) {
+               ks_log(KS_LOG_DEBUG, "Failed to receive message before timeout\n");
                ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
                goto done;
        }
 
        // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
-       jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values
+       jsonrpc = cJSON_GetObjectCstr(json_req, "jsonrpc"); // @todo check for definitions of these keys and fixed values
        if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Received message is not the expected protocol\n");
+               // @todo send error response before disconnecting, code = -32600 (invalid request)
                ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
                goto done;
        }
 
-       id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id
+       id = cJSON_GetObjectCstr(json_req, "id"); // @todo switch to number if we are not using a uuid for message id
        if (!id) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Received message is missing 'id'\n");
+               // @todo send error response before disconnecting, code = -32600 (invalid request)
                ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
                goto done;
        }
 
-       method = cJSON_GetObjectCstr(json, "method");
+       method = cJSON_GetObjectCstr(json_req, "method");
        if (!method || strcasecmp(method, "blade.session.attach")) {
-               // @todo error logging
+               ks_log(KS_LOG_DEBUG, "Received message is missing 'method' or is an unexpected method\n");
+               // @todo send error response before disconnecting, code = -32601 (method not found)
                ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
                goto done;
        }
 
-       params = cJSON_GetObjectItem(json, "params");
+       params = cJSON_GetObjectItem(json_req, "params");
        if (params) {
                sid = cJSON_GetObjectCstr(params, "session-id");
                if (sid) {
-                       // @todo validate uuid format by parsing, not currently available in uuid functions
-                       ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid);
+                       // @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid
+                       ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sid);
                }
        }
 
        if (sid) {
                bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
                if (bs) {
-                       ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs));
+                       if (blade_session_terminating(bs)) {
+                               blade_session_read_unlock(bs);
+                               ks_log(KS_LOG_DEBUG, "Session (%s) terminating\n", blade_session_id_get(bs));
+                               bs = NULL;
+                       } else {
+                               ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
+                       }
                }
        }
-       
+
        if (!bs) {
                blade_session_create(&bs, bh);
                ks_assert(bs);
 
-               ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs));
+               ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
 
                blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
 
                if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bs));
+                       // @todo send error response before disconnecting, code = -32603 (internal error)
                        blade_session_read_unlock(bs);
                        blade_session_destroy(&bs);
                        ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
                        goto done;
                }
+               ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs));
                blade_handle_sessions_add(bs);
        }
 
+       // @todo wrapper to generate request and response
+       json_res = cJSON_CreateObject();
+       cJSON_AddStringToObject(json_res, "jsonrpc", "2.0");
+       cJSON_AddStringToObject(json_res, "id", id);
+       
+       result = cJSON_CreateObject();
+       cJSON_AddStringToObject(result, "session-id", blade_session_id_get(bs));
+       cJSON_AddItemToObject(json_res, "result", result);
+
+       // @todo send response
+       if (blade_transport_wss_write(bt_wss, json_res) != KS_STATUS_SUCCESS) {
+               ks_log(KS_LOG_DEBUG, "Failed to write message\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
+
        blade_connection_session_set(bc, blade_session_id_get(bs));
        
  done:
        // @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state
        // machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard
        // behaviour to simply go as far as assigning a session to the connection and let the system handle the rest
-       if (json) cJSON_Delete(json);
+       if (json_req) cJSON_Delete(json_req);
+       if (json_res) cJSON_Delete(json_res);
        return ret;
 }
 
 blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
+       blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+       blade_handle_t *bh = NULL;
+       blade_transport_wss_t *bt_wss = NULL;
+       blade_transport_wss_init_t *bt_wss_init = NULL;
+       ks_pool_t *pool = NULL;
+       cJSON *json_req = NULL;
+       cJSON *json_res = NULL;
+       uuid_t msgid;
+       const char *mid = NULL;
+       ks_time_t timeout;
+       const char *jsonrpc = NULL;
+       const char *id = NULL;
+       cJSON *error = NULL;
+       cJSON *result = NULL;
+       const char *sid = NULL;
+       blade_session_t *bs = NULL;
+
        ks_assert(bc);
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
-       // @todo produce jsonrpc compliant message to call method "blade.session.attach"
+       if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 
-       // @todo add params with nested session-id and session-token if attempting to reconnect as a client, this should probably be passed in from
-       // the blade_handle_connect() call and then through the init parameters for the transport (do not directly use the old session, but copy the id and token)
+       bh = blade_connection_handle_get(bc);
+       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
+       pool = blade_connection_pool_get(bc);
+
+
+       // @todo wrapper to build a request and response/error
+       json_req = cJSON_CreateObject();
+       cJSON_AddStringToObject(json_req, "jsonrpc", "2.0");
+       cJSON_AddStringToObject(json_req, "method", "blade.session.attach");
+
+       ks_uuid(&msgid);
+       mid = ks_uuid_str(pool, &msgid);
+       cJSON_AddStringToObject(json_req, "id", mid);
+
+       if (bt_wss_init->session_id) {
+               cJSON *params = cJSON_CreateObject();
+               cJSON_AddStringToObject(params, "session-id", bt_wss_init->session_id);
+               cJSON_AddItemToObject(json_req, "params", params);
+       }
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss_init->session_id ? bt_wss_init->session_id : "none"));
+       
+       if (blade_transport_wss_write(bt_wss, json_req) != KS_STATUS_SUCCESS) {
+               ks_log(KS_LOG_DEBUG, "Failed to write message\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo block while sending message with blade_transport_wss_write(bt_wss, json)
 
-       // @todo block while receiving expected response with blade_transport_wss_read(bt_wss, json)
+       timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
+       while (blade_transport_wss_read(bt_wss, &json_res) == KS_STATUS_SUCCESS) {
+               if (json_res) break;
+               ks_sleep_ms(250);
+               if (ks_time_now() >= timeout) break;
+       }
+
+       if (!json_res) {
+               ks_log(KS_LOG_DEBUG, "Failed to receive message before timeout\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
+       
+       // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
+       jsonrpc = cJSON_GetObjectCstr(json_res, "jsonrpc"); // @todo check for definitions of these keys and fixed values
+       if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
+               ks_log(KS_LOG_DEBUG, "Received message is not the expected protocol\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo check for error field, log and return HOOK_DISCONNECT if any errors occur
+       id = cJSON_GetObjectCstr(json_res, "id"); // @todo switch to number if we are not using a uuid for message id
+       if (!id || strcasecmp(mid, id)) {
+               ks_log(KS_LOG_DEBUG, "Received message is missing 'id'\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo check for result field, and nested session-id and session-token
+       error = cJSON_GetObjectItem(json_res, "error");
+       if (error) {
+               ks_log(KS_LOG_DEBUG, "Error message ... add the details\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo lookup the old session from the blade_handle_t, if it still exists then use this session
+       result = cJSON_GetObjectItem(json_res, "result");
+       if (!result) {
+               ks_log(KS_LOG_DEBUG, "Received message is missing 'result'\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo if the old session does not exist, then create a new session and populate with the parameters from the results
+       sid = cJSON_GetObjectCstr(result, "session-id");
+       if (!sid) {
+               ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'session-id'\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
        
-       // @todo once session is established, associate it to the connection, see attach_inbound for notes regarding universal actions after returning SUCCESS
+       if (sid) {
+               // @todo validate uuid format by parsing, not currently available in uuid functions
+               bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
+               if (bs) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
+               }
+       }
+
+       if (!bs) {
+               blade_session_create(&bs, bh); // @todo let sid be passed to constructor, NULL to generate
+               ks_assert(bs);
+
+               blade_session_id_set(bs, sid);
+
+               ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
+
+               blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
+
+               if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bs));
+                       blade_session_read_unlock(bs);
+                       blade_session_destroy(&bs);
+                       ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+                       goto done;
+               }
+               ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs));
+               blade_handle_sessions_add(bs);
+       }
+
+       blade_connection_session_set(bc, blade_session_id_get(bs));
 
-       ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done
-       return BLADE_CONNECTION_STATE_HOOK_BYPASS;
+ done:
+       if (mid) ks_pool_free(pool, &mid);
+       if (json_req) cJSON_Delete(json_req);
+       if (json_res) cJSON_Delete(json_res);
+       return ret;
 }
 
 blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition)
@@ -1029,8 +1203,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connecti
        ks_assert(bc);
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
-
-       ks_sleep(1000);
+       
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
@@ -1040,7 +1213,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
-       ks_sleep(1000);
+       ks_sleep_ms(1000);
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
index fbfd227254abd49befa83d9f93195eaa9ff72a0d..06f500a9d78091ec66845513f37032d10f601e94 100644 (file)
@@ -81,6 +81,8 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
 
        *bsP = bs;
 
+       ks_log(KS_LOG_DEBUG, "Created\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -105,6 +107,8 @@ KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP)
 
        ks_pool_free(bs->pool, bsP);
 
+       ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -125,6 +129,8 @@ KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs)
                return KS_STATUS_FAIL;
        }
        
+       ks_log(KS_LOG_DEBUG, "Started\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -152,6 +158,8 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs)
        list_iterator_stop(&bs->connections);
        list_clear(&bs->connections);
        
+       ks_log(KS_LOG_DEBUG, "Stopped\n");
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -226,8 +234,17 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs)
 {
        ks_assert(bs);
 
-       if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY)
+       if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) hanging up\n", bs->id);
                blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP);
+       }
+}
+
+KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs)
+{
+       ks_assert(bs);
+
+       return bs->state == BLADE_SESSION_STATE_HANGUP || bs->state == BLADE_SESSION_STATE_DESTROY;
 }
 
 KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id)
@@ -242,6 +259,8 @@ KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const
        
        list_append(&bs->connections, cid);
 
+       ks_log(KS_LOG_DEBUG, "Session (%s) connection added (%s)\n", bs->id, id);
+
        return ret;
 }
 
@@ -256,6 +275,7 @@ KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, co
        for (uint32_t i = 0; i < size; ++i) {
                const char *cid = (const char *)list_get_at(&bs->connections, i);
                if (!strcasecmp(cid, id)) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) connection removed (%s)\n", bs->id, id);
                        list_delete_at(&bs->connections, i);
                        ks_pool_free(bs->pool, &cid);
                        break;
@@ -278,7 +298,7 @@ ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, b
        // later there will need to be a way to pick which connection to use
        cid = list_get_at(&bs->connections, 0);
        if (!cid) {
-               // @todo error logging... this shouldn't happen
+               // no connections available
                return KS_STATUS_FAIL;
        }
                
@@ -310,6 +330,7 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json)
                if (blade_session_connections_choose(bs, json, &bc) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
                // @todo cache the blade_request_t here if it exists to gaurentee it's cached before a response could be received
                blade_connection_sending_push(bc, json);
+               blade_connection_read_unlock(bc);
        }
        
        return KS_STATUS_SUCCESS;
@@ -334,7 +355,25 @@ KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **j
        return ks_q_trypop(bs->sending, (void **)json);
 }
 
-// @todo receive queue push and pop
+KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json)
+{
+    cJSON *json_copy = NULL;
+
+    ks_assert(bs);
+    ks_assert(json);
+
+    json_copy = cJSON_Duplicate(json, 1);
+    return ks_q_push(bs->receiving, json_copy);
+}
+
+KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json)
+{
+       ks_assert(bs);
+       ks_assert(json);
+
+       return ks_q_trypop(bs->receiving, (void **)json);
+}
+
 
 void *blade_session_state_thread(ks_thread_t *thread, void *data)
 {
@@ -354,26 +393,56 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
                if (!list_empty(&bs->connections)) {
                        while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
                                blade_connection_t *bc = NULL;
-                               if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) blade_connection_sending_push(bc, json);
+                               if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) {
+                                       blade_connection_sending_push(bc, json);
+                                       blade_connection_read_unlock(bc);
+                               }
                                cJSON_Delete(json);
                        }
                }
 
                switch (state) {
                case BLADE_SESSION_STATE_DESTROY:
+                       ks_log(KS_LOG_DEBUG, "Session (%s) state destroy\n", bs->id);
+                       blade_handle_sessions_remove(bs);
+                       blade_session_destroy(&bs);
                        return NULL;
                case BLADE_SESSION_STATE_HANGUP:
-                       // @todo detach from session if this connection is attached
-                       blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
-                       break;
+                       {
+                               ks_log(KS_LOG_DEBUG, "Session (%s) state hangup\n", bs->id);
+                               
+                               list_iterator_start(&bs->connections);
+                               while (list_iterator_hasnext(&bs->connections)) {
+                                       const char *cid = (const char *)list_iterator_next(&bs->connections);
+                                       blade_connection_t *bc = blade_handle_connections_get(bs->handle, cid);
+                                       ks_assert(bc);
+
+                                       blade_connection_disconnect(bc);
+                                       blade_connection_read_unlock(bc);
+                               }
+                               list_iterator_stop(&bs->connections);
+
+                               while (!list_empty(&bs->connections)) ks_sleep(100);
+
+                               blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
+                               break;
+                       }
                case BLADE_SESSION_STATE_CONNECT:
+                       ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id);
+                       ks_sleep_ms(1000);
                        break;
                case BLADE_SESSION_STATE_ATTACH:
+                       ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id);
+                       ks_sleep_ms(1000);
                        break;
                case BLADE_SESSION_STATE_DETACH:
+                       ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id);
+                       ks_sleep_ms(1000);
                        break;
                case BLADE_SESSION_STATE_READY:
-                       // @todo pop from session receiving queue and pass to blade_protocol_process()
+                       ks_log(KS_LOG_DEBUG, "Session (%s) state ready\n", bs->id);
+                       // @todo pop from session receiving queue and pass into protocol layer through something like blade_protocol_process()
+                       ks_sleep_ms(1000);
                        break;
                default: break;
                }
index 5977c091d7c32558802bae982b70b2ac1eccbf3f..1a680a0ff512e6e4cd61994b2ecd7858781c52a0 100644 (file)
@@ -244,8 +244,17 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
                blade_request_destroy(&value);
        }
        
-       // @todo terminate all sessions, which will disconnect all attached connections
-       
+       for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               void *key = NULL;
+               blade_session_t *value = NULL;
+               
+               ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
+               ks_hash_remove(bh->requests, key);
+               
+               blade_session_hangup(value);
+       }
+       while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100);
+
        // @todo call onshutdown and onunload callbacks for modules from DSOs, which will unregister transports and disconnect remaining unattached connections
 
        // @todo unload DSOs
@@ -312,7 +321,7 @@ KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, co
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target)
+KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_handle_transport_registration_t *bhtr = NULL;
@@ -358,7 +367,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
 
        // @todo need to be able to get to the blade_module_t from the callbacks, may require envelope around registration of callbacks to include module
        // this is required because onconnect transport callback needs to be able to get back to the module data to create the connection being returned
-       if (bhtr) ret = bhtr->callbacks->onconnect(bcP, bhtr->module, target);
+       if (bhtr) ret = bhtr->callbacks->onconnect(bcP, bhtr->module, target, session_id);
        else ret = KS_STATUS_FAIL;
 
        return ret;
@@ -423,7 +432,7 @@ KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, cons
 {
        blade_session_t *bs = NULL;
 
-       ks_assert(bs);
+       ks_assert(bh);
        ks_assert(sid);
 
        ks_hash_read_lock(bh->sessions);
index 2242197861e4feb5c9fe8577c65409b6f92962db..8b8660f9cbeadbaf22a82f8d859a65aae6949db1 100644 (file)
@@ -44,6 +44,7 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP);
 KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction);
 KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc);
 KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc);
+KS_DECLARE(ks_pool_t *) blade_connection_pool_get(blade_connection_t *bc);
 KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc);
 KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bool_t block);
 KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc);
index c6d376acd96c9ee9384331fcb05a7b304600faf7..f4e92775b36f4ea7adc44ebbb4e3d77d50571a6e 100644 (file)
@@ -49,11 +49,14 @@ KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t
 KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs);
 KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state);
 KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
+KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id);
 KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id);
 KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
+KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
+KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json);
 KS_END_EXTERN_C
 
 #endif
index 7b87fa84f3b8b964ad8389c18ba670a402f31c3c..dd1affb71d28c9f7e37d23e2d7ab104594ef10c1 100644 (file)
@@ -50,7 +50,7 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
 
 KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks);
 KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name);
-KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target);
+KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
 
 KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid);
 KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc);
index 2442d88a062e1a6f9396a27048b54e9a1e10b54a..66e8ea0d01771476a800a2dd225b7e2353eba99e 100644 (file)
@@ -113,7 +113,7 @@ struct blade_module_callbacks_s {
 };
 
 
-typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
+typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
 typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target);
 typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, cJSON *json);
 typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json);
index 510897971d040c1ae967573ece52b316f06043dc..46bfda99691d88a6b6839f405cc900c5322fced9 100644 (file)
@@ -253,7 +253,7 @@ void command_connect(blade_handle_t *bh, char *args)
 
        blade_identity_create(&target, blade_handle_pool_get(bh));
        
-       if (blade_identity_parse(target, args) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target);
+       if (blade_identity_parse(target, args) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target, NULL);
 
        blade_identity_destroy(&target);
 }