From: Shane Bryldt Date: Thu, 23 Feb 2017 23:01:22 +0000 (+0000) Subject: FS-9952: Preliminary session negotiations done, added a bunch of logging, fixed up... X-Git-Tag: v1.8.0~696^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=14a99987bb0b9f41e363e7716facf4a278e1e330;p=thirdparty%2Ffreeswitch.git FS-9952: Preliminary session negotiations done, added a bunch of logging, fixed up cleanup code, needs more testing and more error handling --- diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index eadbea4adb..7d5cfadfe0 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -90,6 +90,8 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, *bcP = bc; + ks_log(KS_LOG_DEBUG, "Created\n"); + return KS_STATUS_SUCCESS; } @@ -112,6 +114,8 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP) ks_pool_free(bc->pool, bcP); + ks_log(KS_LOG_DEBUG, "Destroyed\n"); + return KS_STATUS_SUCCESS; } @@ -133,6 +137,8 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_c return KS_STATUS_FAIL; } + ks_log(KS_LOG_DEBUG, "Started\n"); + return KS_STATUS_SUCCESS; } @@ -153,6 +159,8 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); + ks_log(KS_LOG_DEBUG, "Stopped\n"); + return KS_STATUS_SUCCESS; } @@ -163,6 +171,13 @@ KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc) return bc->handle; } +KS_DECLARE(ks_pool_t *) blade_connection_pool_get(blade_connection_t *bc) +{ + ks_assert(bc); + + return bc->pool; +} + KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc) { ks_assert(bc); @@ -285,8 +300,10 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) { ks_assert(bc); - if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT) + if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT) { + ks_log(KS_LOG_DEBUG, "Connection (%s) disconnecting\n", bc->id); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH); + } } KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json) @@ -342,6 +359,9 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; callback = blade_connection_state_callback_lookup(bc, state); + if (state == BLADE_CONNECTION_STATE_DISCONNECT) { + blade_handle_connections_remove(bc); + } // @todo only READY state? if (state != BLADE_CONNECTION_STATE_DETACH && state != BLADE_CONNECTION_STATE_DISCONNECT) { while (blade_connection_sending_pop(bc, &json) == KS_STATUS_SUCCESS && json) { @@ -363,7 +383,9 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) break; } if (!(done = (json == NULL))) { - // @todo push json to session receiving queue + blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session); + ks_assert(bs); + blade_session_receiving_push(bs, json); cJSON_Delete(json); json = NULL; } @@ -379,7 +401,8 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) { switch (state) { case BLADE_CONNECTION_STATE_DISCONNECT: - return NULL; + blade_connection_destroy(&bc); + break; case BLADE_CONNECTION_STATE_NEW: blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT); break; @@ -388,24 +411,38 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) break; case BLADE_CONNECTION_STATE_ATTACH: { + // @todo this is adding a second lock, since we keep it locked in the callback to allow finishing, we don't want get locking here... + // or just try unlocking twice to confirm... blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session); ks_assert(bs); // should not happen because bs should still be locked blade_session_connections_add(bs, bc->id); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY); - blade_session_state_set(bs, BLADE_SESSION_STATE_READY); + blade_session_state_set(bs, BLADE_SESSION_STATE_READY); // @todo only set this if it's not already in the READY state from prior connection + blade_session_read_unlock(bs); // unlock the session we locked obtaining it above blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching break; } case BLADE_CONNECTION_STATE_DETACH: - // @todo detach from session if this connection is attached - blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT); - break; + { + if (bc->session) { + blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session); + ks_assert(bs); + + blade_session_connections_remove(bs, bc->id); + blade_session_read_unlock(bs); + // keep bc->session for later in case something triggers a reconnect later and needs the old session id for a hint + } + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT); + break; + } default: break; } } + + if (state == BLADE_CONNECTION_STATE_DISCONNECT) break; } return NULL; diff --git a/libs/libblade/src/blade_identity.c b/libs/libblade/src/blade_identity.c index 040bc9727d..5c2cecafad 100644 --- a/libs/libblade/src/blade_identity.c +++ b/libs/libblade/src/blade_identity.c @@ -127,19 +127,6 @@ KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *u } } - // @todo remove this, temporary for testing - ks_log(KS_LOG_DEBUG, " name: %s\n", bi->name); - ks_log(KS_LOG_DEBUG, " domain: %s\n", bi->domain); - ks_log(KS_LOG_DEBUG, " resource: %s\n", bi->resource); - for (ks_hash_iterator_t *it = ks_hash_first(bi->parameters, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const char *key = NULL; - const char *val = NULL; - - ks_hash_this(it, (const void **)&key, NULL, (void **)&val); - - ks_log(KS_LOG_DEBUG, " key: %s = %s\n", key, val); - } - return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 429fdba344..31d2235e30 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -60,8 +60,7 @@ struct blade_module_wss_s { struct pollfd *listeners_poll; int32_t listeners_count; - list_t connected; - ks_q_t *disconnected; + list_t connected; // @todo consider keeping this only as the list of connection id's, since the handle retains the pointer lookup }; struct blade_transport_wss_s { @@ -77,6 +76,7 @@ struct blade_transport_wss_init_s { ks_pool_t *pool; ks_socket_t sock; + const char *session_id; }; @@ -98,7 +98,7 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data); ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock); ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP); -ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); +ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id); blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target); ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json); @@ -116,7 +116,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio -ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock); +ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id); ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP); @@ -172,11 +172,11 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t bm_wss->transport_callbacks = &g_transport_wss_callbacks; list_init(&bm_wss->connected); - ks_q_create(&bm_wss->disconnected, bm_wss->pool, 0); - ks_assert(bm_wss->disconnected); *bm_wssP = bm_wss; + ks_log(KS_LOG_DEBUG, "Created\n"); + return KS_STATUS_SUCCESS; } @@ -194,10 +194,11 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP) blade_module_destroy(&bm_wss->module); list_destroy(&bm_wss->connected); - ks_q_destroy(&bm_wss->disconnected); ks_pool_free(bm_wss->pool, bm_wssP); + ks_log(KS_LOG_DEBUG, "Destroyed\n"); + return KS_STATUS_SUCCESS; } @@ -213,6 +214,8 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_load(blade_module_t **bmP, blade_han *bmP = bm_wss->module; + ks_log(KS_LOG_DEBUG, "Loaded\n"); + return KS_STATUS_SUCCESS; } @@ -226,10 +229,12 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_unload(blade_module_t *bm) blade_module_wss_destroy(&bm_wss); + ks_log(KS_LOG_DEBUG, "Unloaded\n"); + return KS_STATUS_SUCCESS; } -ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock) +ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id) { blade_transport_wss_init_t *bt_wssi = NULL; @@ -241,9 +246,12 @@ ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssi bt_wssi->module = bm_wss; bt_wssi->pool = bm_wss->pool; bt_wssi->sock = sock; + if (session_id) bt_wssi->session_id = ks_pstrdup(bt_wssi->pool, session_id); *bt_wssiP = bt_wssi; + ks_log(KS_LOG_DEBUG, "Created\n"); + return KS_STATUS_SUCCESS; } @@ -256,8 +264,12 @@ ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wss bt_wssi = *bt_wssiP; + if (bt_wssi->session_id) ks_pool_free(bt_wssi->pool, &bt_wssi->session_id); + ks_pool_free(bt_wssi->pool, bt_wssiP); + ks_log(KS_LOG_DEBUG, "Destroyed\n"); + return KS_STATUS_SUCCESS; } @@ -365,6 +377,8 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t bm_wss->config_wss_endpoints_backlog = config_wss_endpoints_backlog; //bm_wss->config_wss_ssl = config_wss_ssl; + ks_log(KS_LOG_DEBUG, "Configured\n"); + return KS_STATUS_SUCCESS; } @@ -405,13 +419,14 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks); + ks_log(KS_LOG_DEBUG, "Started\n"); + return KS_STATUS_SUCCESS; } KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm) { blade_module_wss_t *bm_wss = NULL; - blade_transport_wss_t *bt_wss = NULL; blade_connection_t *bc = NULL; ks_assert(bm); @@ -435,20 +450,18 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm) bm_wss->listeners_count = 0; if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll); - // @todo connections should be gracefully disconnected so that they detach from sessions properly - // which means this should occur before the listeners thread is terminated, which requires that - // the listener sockets be made inactive (or closed) to stop accepting while shutting down - while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) ; - list_iterator_start(&bm_wss->connected); - while (list_iterator_hasnext(&bm_wss->connected)) { - bc = (blade_connection_t *)list_iterator_next(&bm_wss->connected); - bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); - - blade_connection_destroy(&bc); - blade_transport_wss_destroy(&bt_wss); + if (list_size(&bm_wss->connected) > 0) { + // this approach to shutdown is cleaner, ensures connections will detach from sessions and be destroyed all in the same places + list_iterator_start(&bm_wss->connected); + while (list_iterator_hasnext(&bm_wss->connected)) { + bc = (blade_connection_t *)list_iterator_next(&bm_wss->connected); + blade_connection_disconnect(bc); + } + list_iterator_stop(&bm_wss->connected); + while (list_size(&bm_wss->connected) > 0) ks_sleep_ms(100); } - list_iterator_stop(&bm_wss->connected); - list_clear(&bm_wss->connected); + + ks_log(KS_LOG_DEBUG, "Stopped\n"); return KS_STATUS_SUCCESS; } @@ -492,6 +505,8 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a bm_wss->listeners_poll[listener_index].fd = listener; bm_wss->listeners_poll[listener_index].events = POLLIN | POLLERR; + ks_log(KS_LOG_DEBUG, "Bound %s on port %d at index %d\n", ks_addr_get_host(addr), ks_addr_get_port(addr), listener_index); + done: if (ret != KS_STATUS_SUCCESS) { if (listener != KS_SOCK_INVALID) { @@ -506,7 +521,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) { blade_module_wss_t *bm_wss = NULL; blade_transport_wss_init_t *bt_wss_init = NULL; - blade_transport_wss_t *bt_wss = NULL; blade_connection_t *bc = NULL; ks_assert(thread); @@ -523,18 +537,22 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) if (bm_wss->listeners_poll[index].revents & POLLERR) { // @todo: error handling, just skip the listener for now, it might recover, could skip X times before closing? + ks_log(KS_LOG_DEBUG, "POLLERR on index %d\n", index); continue; } if (!(bm_wss->listeners_poll[index].revents & POLLIN)) continue; if ((sock = accept(bm_wss->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) { // @todo: error handling, just skip the socket for now as most causes are because remote side became unreachable + ks_log(KS_LOG_DEBUG, "Accept failed on index %d\n", index); continue; } - ks_log(KS_LOG_DEBUG, "Socket Accepted\n"); + // @todo getsockname and getpeername (getpeername can be skipped if passing to accept instead) + + ks_log(KS_LOG_DEBUG, "Socket accepted\n", index); - blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock); + blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, NULL); ks_assert(bt_wss_init); blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks); @@ -543,11 +561,14 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) blade_connection_read_lock(bc, KS_TRUE); if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc)); blade_connection_destroy(&bc); blade_transport_wss_init_destroy(&bt_wss_init); ks_socket_close(&sock); continue; } + ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc)); + blade_handle_connections_add(bc); list_append(&bm_wss->connected, bc); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW); @@ -555,18 +576,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) blade_connection_read_unlock(bc); } } - - while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) { - bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc); - bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); - - blade_handle_connections_remove(bc); - list_delete(&bm_wss->connected, bc); - - if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init); - blade_connection_destroy(&bc); - if (bt_wss) blade_transport_wss_destroy(&bt_wss); - } } ks_log(KS_LOG_DEBUG, "Stopped\n"); @@ -590,6 +599,8 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_mo *bt_wssP = bt_wss; + ks_log(KS_LOG_DEBUG, "Created\n"); + return KS_STATUS_SUCCESS; } @@ -607,10 +618,12 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP) ks_pool_free(bt_wss->pool, bt_wssP); + ks_log(KS_LOG_DEBUG, "Destroyed\n"); + return KS_STATUS_SUCCESS; } -ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target) +ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id) { ks_status_t ret = KS_STATUS_SUCCESS; blade_module_wss_t *bm_wss = NULL; @@ -640,6 +653,7 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul if (!ip) { // @todo: temporary, this should fall back on DNS SRV or whatever else can turn "a@b.com" into an ip (and port?) to connect to // also need to deal with hostname lookup, so identities with wss transport need to have a host parameter that is an IP for the moment + ks_log(KS_LOG_DEBUG, "No host provided\n"); ret = KS_STATUS_FAIL; goto done; } @@ -649,6 +663,7 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul ks_size_t len = strlen(ip); if (len <= 3) { + ks_log(KS_LOG_DEBUG, "Invalid host provided\n"); ret = KS_STATUS_FAIL; goto done; } @@ -660,36 +675,43 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul int p = atoi(portstr); if (p > 0 && p <= UINT16_MAX) port = p; } - + + ks_log(KS_LOG_DEBUG, "Connecting to %s on port %d\n", ip, port); + ks_addr_set(&addr, ip, port, family); if ((sock = ks_socket_connect(SOCK_STREAM, IPPROTO_TCP, &addr)) == KS_SOCK_INVALID) { // @todo: error handling, just fail for now as most causes are because remote side became unreachable + ks_log(KS_LOG_DEBUG, "Connect failed\n"); ret = KS_STATUS_FAIL; goto done; } - ks_log(KS_LOG_DEBUG, "Socket Connected\n"); + ks_log(KS_LOG_DEBUG, "Socket connected\n"); - blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock); + blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, session_id); ks_assert(bt_wss_init); blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks); ks_assert(bc); if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_OUTBOUND) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc)); blade_connection_destroy(&bc); blade_transport_wss_init_destroy(&bt_wss_init); ks_socket_close(&sock); ret = KS_STATUS_FAIL; goto done; } + ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc)); // @todo make sure it's sensible to be mixing outbound and inbound connections in the same list, but this allows entering the destruction pipeline // for module shutdown, disconnects and errors without special considerations + blade_handle_connections_add(bc); list_append(&bm_wss->connected, bc); - *bcP = bc; - + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW); + *bcP = bc; + done: return ret; } @@ -708,16 +730,18 @@ ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json char *json_str = cJSON_PrintUnformatted(json); ks_size_t json_str_len = 0; if (!json_str) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Failed to generate json string\n"); ret = KS_STATUS_FAIL; goto done; } - json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this + // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this + json_str_len = strlen(json_str) + 1; if (kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len) != json_str_len) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Failed to write frame\n"); ret = KS_STATUS_FAIL; goto done; } + ks_log(KS_LOG_DEBUG, "Frame written %d bytes\n", json_str_len); done: if (json_str) free(json_str); @@ -733,13 +757,10 @@ ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json) ks_assert(bc); ks_assert(json); - ks_log(KS_LOG_DEBUG, "Send Callback\n"); - bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); ret = blade_transport_wss_write(bt_wss, json); - // @todo use reference counting on blade_identity_t and cJSON objects cJSON_Delete(json); return ret; @@ -753,7 +774,7 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json *json = NULL; if (poll_flags & KS_POLL_ERROR) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "POLLERR\n"); return KS_STATUS_FAIL; } if (poll_flags & KS_POLL_READ) { @@ -768,10 +789,13 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json // -2 means nonblocking wait // other values are based on WS_XXX reasons // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and + ks_log(KS_LOG_DEBUG, "Failed to read frame\n"); return KS_STATUS_FAIL; } + ks_log(KS_LOG_DEBUG, "Frame read %d bytes\n", frame_data_len); if (!(*json = cJSON_Parse((char *)frame_data))) { + ks_log(KS_LOG_DEBUG, "Failed to parse frame\n"); return KS_STATUS_FAIL; } } @@ -785,8 +809,6 @@ ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json) ks_assert(bc); ks_assert(json); - ks_log(KS_LOG_DEBUG, "Receive Callback\n"); - bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); return blade_transport_wss_read(bt_wss, json); @@ -795,6 +817,7 @@ ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json) blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition) { blade_transport_wss_t *bt_wss = NULL; + blade_transport_wss_init_t *bt_wss_init = NULL; ks_assert(bc); @@ -803,8 +826,12 @@ blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_conn if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); + bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc); + + list_delete(&bt_wss->module->connected, bc); - ks_q_push(bt_wss->module->disconnected, bc); + if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init); + if (bt_wss) blade_transport_wss_destroy(&bt_wss); return BLADE_CONNECTION_STATE_HOOK_SUCCESS; } @@ -865,7 +892,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade // @todo: SSL init stuffs based on data from config to pass into kws_init if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Failed websocket init\n"); return BLADE_CONNECTION_STATE_HOOK_DISCONNECT; } @@ -886,7 +913,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blad // @todo: SSL init stuffs based on data from config to pass into kws_init if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, "/blade:blade.invalid:blade", KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Failed websocket init\n"); return BLADE_CONNECTION_STATE_HOOK_DISCONNECT; } @@ -897,13 +924,16 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_ { blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS; blade_transport_wss_t *bt_wss = NULL; - cJSON *json = NULL; + cJSON *json_req = NULL; + cJSON *json_res = NULL; cJSON *params = NULL; + cJSON *result = NULL; + //cJSON *error = NULL; blade_session_t *bs = NULL; blade_handle_t *bh = NULL; const char *jsonrpc = NULL; - const char *method = NULL; const char *id = NULL; + const char *method = NULL; const char *sid = NULL; ks_time_t timeout; @@ -914,114 +944,258 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; + bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); // @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config timeout = ks_time_now() + (5 * KS_USEC_PER_SEC); - while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) { - if (json) break; - ks_sleep(250); + while (blade_transport_wss_read(bt_wss, &json_req) == KS_STATUS_SUCCESS) { + if (json_req) break; + ks_sleep_ms(250); if (ks_time_now() >= timeout) break; } - if (!json) { - // @todo error logging + if (!json_req) { + ks_log(KS_LOG_DEBUG, "Failed to receive message before timeout\n"); ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; goto done; } // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is - jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values + jsonrpc = cJSON_GetObjectCstr(json_req, "jsonrpc"); // @todo check for definitions of these keys and fixed values if (!jsonrpc || strcmp(jsonrpc, "2.0")) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Received message is not the expected protocol\n"); + // @todo send error response before disconnecting, code = -32600 (invalid request) ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; goto done; } - id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id + id = cJSON_GetObjectCstr(json_req, "id"); // @todo switch to number if we are not using a uuid for message id if (!id) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Received message is missing 'id'\n"); + // @todo send error response before disconnecting, code = -32600 (invalid request) ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; goto done; } - method = cJSON_GetObjectCstr(json, "method"); + method = cJSON_GetObjectCstr(json_req, "method"); if (!method || strcasecmp(method, "blade.session.attach")) { - // @todo error logging + ks_log(KS_LOG_DEBUG, "Received message is missing 'method' or is an unexpected method\n"); + // @todo send error response before disconnecting, code = -32601 (method not found) ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; goto done; } - params = cJSON_GetObjectItem(json, "params"); + params = cJSON_GetObjectItem(json_req, "params"); if (params) { sid = cJSON_GetObjectCstr(params, "session-id"); if (sid) { - // @todo validate uuid format by parsing, not currently available in uuid functions - ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid); + // @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid + ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sid); } } if (sid) { bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done if (bs) { - ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs)); + if (blade_session_terminating(bs)) { + blade_session_read_unlock(bs); + ks_log(KS_LOG_DEBUG, "Session (%s) terminating\n", blade_session_id_get(bs)); + bs = NULL; + } else { + ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs)); + } } } - + if (!bs) { blade_session_create(&bs, bh); ks_assert(bs); - ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs)); + ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs)); blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise if (blade_session_startup(bs) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bs)); + // @todo send error response before disconnecting, code = -32603 (internal error) blade_session_read_unlock(bs); blade_session_destroy(&bs); ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; goto done; } + ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs)); blade_handle_sessions_add(bs); } + // @todo wrapper to generate request and response + json_res = cJSON_CreateObject(); + cJSON_AddStringToObject(json_res, "jsonrpc", "2.0"); + cJSON_AddStringToObject(json_res, "id", id); + + result = cJSON_CreateObject(); + cJSON_AddStringToObject(result, "session-id", blade_session_id_get(bs)); + cJSON_AddItemToObject(json_res, "result", result); + + // @todo send response + if (blade_transport_wss_write(bt_wss, json_res) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Failed to write message\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } + blade_connection_session_set(bc, blade_session_id_get(bs)); done: // @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state // machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard // behaviour to simply go as far as assigning a session to the connection and let the system handle the rest - if (json) cJSON_Delete(json); + if (json_req) cJSON_Delete(json_req); + if (json_res) cJSON_Delete(json_res); return ret; } blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition) { + blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + blade_handle_t *bh = NULL; + blade_transport_wss_t *bt_wss = NULL; + blade_transport_wss_init_t *bt_wss_init = NULL; + ks_pool_t *pool = NULL; + cJSON *json_req = NULL; + cJSON *json_res = NULL; + uuid_t msgid; + const char *mid = NULL; + ks_time_t timeout; + const char *jsonrpc = NULL; + const char *id = NULL; + cJSON *error = NULL; + cJSON *result = NULL; + const char *sid = NULL; + blade_session_t *bs = NULL; + ks_assert(bc); ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); - // @todo produce jsonrpc compliant message to call method "blade.session.attach" + if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS; - // @todo add params with nested session-id and session-token if attempting to reconnect as a client, this should probably be passed in from - // the blade_handle_connect() call and then through the init parameters for the transport (do not directly use the old session, but copy the id and token) + bh = blade_connection_handle_get(bc); + bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); + bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc); + pool = blade_connection_pool_get(bc); + + + // @todo wrapper to build a request and response/error + json_req = cJSON_CreateObject(); + cJSON_AddStringToObject(json_req, "jsonrpc", "2.0"); + cJSON_AddStringToObject(json_req, "method", "blade.session.attach"); + + ks_uuid(&msgid); + mid = ks_uuid_str(pool, &msgid); + cJSON_AddStringToObject(json_req, "id", mid); + + if (bt_wss_init->session_id) { + cJSON *params = cJSON_CreateObject(); + cJSON_AddStringToObject(params, "session-id", bt_wss_init->session_id); + cJSON_AddItemToObject(json_req, "params", params); + } + + ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss_init->session_id ? bt_wss_init->session_id : "none")); + + if (blade_transport_wss_write(bt_wss, json_req) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Failed to write message\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo block while sending message with blade_transport_wss_write(bt_wss, json) - // @todo block while receiving expected response with blade_transport_wss_read(bt_wss, json) + timeout = ks_time_now() + (5 * KS_USEC_PER_SEC); + while (blade_transport_wss_read(bt_wss, &json_res) == KS_STATUS_SUCCESS) { + if (json_res) break; + ks_sleep_ms(250); + if (ks_time_now() >= timeout) break; + } + + if (!json_res) { + ks_log(KS_LOG_DEBUG, "Failed to receive message before timeout\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } + + // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is + jsonrpc = cJSON_GetObjectCstr(json_res, "jsonrpc"); // @todo check for definitions of these keys and fixed values + if (!jsonrpc || strcmp(jsonrpc, "2.0")) { + ks_log(KS_LOG_DEBUG, "Received message is not the expected protocol\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo check for error field, log and return HOOK_DISCONNECT if any errors occur + id = cJSON_GetObjectCstr(json_res, "id"); // @todo switch to number if we are not using a uuid for message id + if (!id || strcasecmp(mid, id)) { + ks_log(KS_LOG_DEBUG, "Received message is missing 'id'\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo check for result field, and nested session-id and session-token + error = cJSON_GetObjectItem(json_res, "error"); + if (error) { + ks_log(KS_LOG_DEBUG, "Error message ... add the details\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo lookup the old session from the blade_handle_t, if it still exists then use this session + result = cJSON_GetObjectItem(json_res, "result"); + if (!result) { + ks_log(KS_LOG_DEBUG, "Received message is missing 'result'\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo if the old session does not exist, then create a new session and populate with the parameters from the results + sid = cJSON_GetObjectCstr(result, "session-id"); + if (!sid) { + ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'session-id'\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo once session is established, associate it to the connection, see attach_inbound for notes regarding universal actions after returning SUCCESS + if (sid) { + // @todo validate uuid format by parsing, not currently available in uuid functions + bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done + if (bs) { + ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs)); + } + } + + if (!bs) { + blade_session_create(&bs, bh); // @todo let sid be passed to constructor, NULL to generate + ks_assert(bs); + + blade_session_id_set(bs, sid); + + ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs)); + + blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise + + if (blade_session_startup(bs) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bs)); + blade_session_read_unlock(bs); + blade_session_destroy(&bs); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } + ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs)); + blade_handle_sessions_add(bs); + } + + blade_connection_session_set(bc, blade_session_id_get(bs)); - ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done - return BLADE_CONNECTION_STATE_HOOK_BYPASS; + done: + if (mid) ks_pool_free(pool, &mid); + if (json_req) cJSON_Delete(json_req); + if (json_res) cJSON_Delete(json_res); + return ret; } blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition) @@ -1029,8 +1203,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connecti ks_assert(bc); ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); - - ks_sleep(1000); + return BLADE_CONNECTION_STATE_HOOK_SUCCESS; } @@ -1040,7 +1213,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); - ks_sleep(1000); + ks_sleep_ms(1000); return BLADE_CONNECTION_STATE_HOOK_SUCCESS; } diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index fbfd227254..06f500a9d7 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -81,6 +81,8 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle *bsP = bs; + ks_log(KS_LOG_DEBUG, "Created\n"); + return KS_STATUS_SUCCESS; } @@ -105,6 +107,8 @@ KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP) ks_pool_free(bs->pool, bsP); + ks_log(KS_LOG_DEBUG, "Destroyed\n"); + return KS_STATUS_SUCCESS; } @@ -125,6 +129,8 @@ KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs) return KS_STATUS_FAIL; } + ks_log(KS_LOG_DEBUG, "Started\n"); + return KS_STATUS_SUCCESS; } @@ -152,6 +158,8 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs) list_iterator_stop(&bs->connections); list_clear(&bs->connections); + ks_log(KS_LOG_DEBUG, "Stopped\n"); + return KS_STATUS_SUCCESS; } @@ -226,8 +234,17 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs) { ks_assert(bs); - if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY) + if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY) { + ks_log(KS_LOG_DEBUG, "Session (%s) hanging up\n", bs->id); blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP); + } +} + +KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs) +{ + ks_assert(bs); + + return bs->state == BLADE_SESSION_STATE_HANGUP || bs->state == BLADE_SESSION_STATE_DESTROY; } KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id) @@ -242,6 +259,8 @@ KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const list_append(&bs->connections, cid); + ks_log(KS_LOG_DEBUG, "Session (%s) connection added (%s)\n", bs->id, id); + return ret; } @@ -256,6 +275,7 @@ KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, co for (uint32_t i = 0; i < size; ++i) { const char *cid = (const char *)list_get_at(&bs->connections, i); if (!strcasecmp(cid, id)) { + ks_log(KS_LOG_DEBUG, "Session (%s) connection removed (%s)\n", bs->id, id); list_delete_at(&bs->connections, i); ks_pool_free(bs->pool, &cid); break; @@ -278,7 +298,7 @@ ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, b // later there will need to be a way to pick which connection to use cid = list_get_at(&bs->connections, 0); if (!cid) { - // @todo error logging... this shouldn't happen + // no connections available return KS_STATUS_FAIL; } @@ -310,6 +330,7 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json) if (blade_session_connections_choose(bs, json, &bc) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; // @todo cache the blade_request_t here if it exists to gaurentee it's cached before a response could be received blade_connection_sending_push(bc, json); + blade_connection_read_unlock(bc); } return KS_STATUS_SUCCESS; @@ -334,7 +355,25 @@ KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **j return ks_q_trypop(bs->sending, (void **)json); } -// @todo receive queue push and pop +KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json) +{ + cJSON *json_copy = NULL; + + ks_assert(bs); + ks_assert(json); + + json_copy = cJSON_Duplicate(json, 1); + return ks_q_push(bs->receiving, json_copy); +} + +KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json) +{ + ks_assert(bs); + ks_assert(json); + + return ks_q_trypop(bs->receiving, (void **)json); +} + void *blade_session_state_thread(ks_thread_t *thread, void *data) { @@ -354,26 +393,56 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) if (!list_empty(&bs->connections)) { while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) { blade_connection_t *bc = NULL; - if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) blade_connection_sending_push(bc, json); + if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) { + blade_connection_sending_push(bc, json); + blade_connection_read_unlock(bc); + } cJSON_Delete(json); } } switch (state) { case BLADE_SESSION_STATE_DESTROY: + ks_log(KS_LOG_DEBUG, "Session (%s) state destroy\n", bs->id); + blade_handle_sessions_remove(bs); + blade_session_destroy(&bs); return NULL; case BLADE_SESSION_STATE_HANGUP: - // @todo detach from session if this connection is attached - blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY); - break; + { + ks_log(KS_LOG_DEBUG, "Session (%s) state hangup\n", bs->id); + + list_iterator_start(&bs->connections); + while (list_iterator_hasnext(&bs->connections)) { + const char *cid = (const char *)list_iterator_next(&bs->connections); + blade_connection_t *bc = blade_handle_connections_get(bs->handle, cid); + ks_assert(bc); + + blade_connection_disconnect(bc); + blade_connection_read_unlock(bc); + } + list_iterator_stop(&bs->connections); + + while (!list_empty(&bs->connections)) ks_sleep(100); + + blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY); + break; + } case BLADE_SESSION_STATE_CONNECT: + ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id); + ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_ATTACH: + ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id); + ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_DETACH: + ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id); + ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_READY: - // @todo pop from session receiving queue and pass to blade_protocol_process() + ks_log(KS_LOG_DEBUG, "Session (%s) state ready\n", bs->id); + // @todo pop from session receiving queue and pass into protocol layer through something like blade_protocol_process() + ks_sleep_ms(1000); break; default: break; } diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 5977c091d7..1a680a0ff5 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -244,8 +244,17 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) blade_request_destroy(&value); } - // @todo terminate all sessions, which will disconnect all attached connections - + for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + void *key = NULL; + blade_session_t *value = NULL; + + ks_hash_this(it, (const void **)&key, NULL, (void **)&value); + ks_hash_remove(bh->requests, key); + + blade_session_hangup(value); + } + while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100); + // @todo call onshutdown and onunload callbacks for modules from DSOs, which will unregister transports and disconnect remaining unattached connections // @todo unload DSOs @@ -312,7 +321,7 @@ KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, co return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target) +KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id) { ks_status_t ret = KS_STATUS_SUCCESS; blade_handle_transport_registration_t *bhtr = NULL; @@ -358,7 +367,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio // @todo need to be able to get to the blade_module_t from the callbacks, may require envelope around registration of callbacks to include module // this is required because onconnect transport callback needs to be able to get back to the module data to create the connection being returned - if (bhtr) ret = bhtr->callbacks->onconnect(bcP, bhtr->module, target); + if (bhtr) ret = bhtr->callbacks->onconnect(bcP, bhtr->module, target, session_id); else ret = KS_STATUS_FAIL; return ret; @@ -423,7 +432,7 @@ KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, cons { blade_session_t *bs = NULL; - ks_assert(bs); + ks_assert(bh); ks_assert(sid); ks_hash_read_lock(bh->sessions); diff --git a/libs/libblade/src/include/blade_connection.h b/libs/libblade/src/include/blade_connection.h index 2242197861..8b8660f9cb 100644 --- a/libs/libblade/src/include/blade_connection.h +++ b/libs/libblade/src/include/blade_connection.h @@ -44,6 +44,7 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP); KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction); KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc); KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc); +KS_DECLARE(ks_pool_t *) blade_connection_pool_get(blade_connection_t *bc); KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bool_t block); KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc); diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h index c6d376acd9..f4e92775b3 100644 --- a/libs/libblade/src/include/blade_session.h +++ b/libs/libblade/src/include/blade_session.h @@ -49,11 +49,14 @@ KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs); KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state); KS_DECLARE(void) blade_session_hangup(blade_session_t *bs); +KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id); KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id); KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json); +KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json); +KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 7b87fa84f3..dd1affb71d 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -50,7 +50,7 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks); KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name); -KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target); +KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id); KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid); KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc); diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 2442d88a06..66e8ea0d01 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -113,7 +113,7 @@ struct blade_module_callbacks_s { }; -typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); +typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id); typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target); typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, cJSON *json); typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json); diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index 510897971d..46bfda9969 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -253,7 +253,7 @@ void command_connect(blade_handle_t *bh, char *args) blade_identity_create(&target, blade_handle_pool_get(bh)); - if (blade_identity_parse(target, args) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target); + if (blade_identity_parse(target, args) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target, NULL); blade_identity_destroy(&target); }