]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: Switched connection lifecycle to an isolated pool similar to sessions,...
authorShane Bryldt <astaelan@gmail.com>
Tue, 18 Apr 2017 19:41:00 +0000 (13:41 -0600)
committerShane Bryldt <astaelan@gmail.com>
Tue, 18 Apr 2017 19:41:00 +0000 (13:41 -0600)
libs/libblade/src/blade_connection.c
libs/libblade/src/blade_module_chat.c
libs/libblade/src/blade_module_wss.c
libs/libblade/src/blade_protocol.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade_connection.h
libs/libblade/src/include/blade_protocol.h
libs/libks/src/ks_time.c

index 2b7995d1956f887056542b72974ce135ecac3981..61c4b481f3e2718b33cb483f52aab42f1dea0dd2 100644 (file)
@@ -37,11 +37,9 @@ struct blade_connection_s {
        blade_handle_t *handle;
        ks_pool_t *pool;
 
-       void *transport_init_data;
        void *transport_data;
        blade_transport_callbacks_t *transport_callbacks;
 
-       ks_bool_t shutdown;
        blade_connection_direction_t direction;
     ks_thread_t *state_thread;
        blade_connection_state_t state;
@@ -63,10 +61,30 @@ ks_status_t blade_connection_state_on_detach(blade_connection_t *bc);
 ks_status_t blade_connection_state_on_ready(blade_connection_t *bc);
 
 
-KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
-                                                                                               blade_handle_t *bh,
-                                                                                               void *transport_init_data,
-                                                                                               blade_transport_callbacks_t *transport_callbacks)
+static void blade_connection_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_connection_t *bc = (blade_connection_t *)ptr;
+
+       ks_assert(bc);
+
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               blade_connection_shutdown(bc);
+               break;
+       case KS_MPCL_DESTROY:
+               // @todo remove this, it's just for posterity in debugging
+               bc->sending = NULL;
+               bc->lock = NULL;
+
+               //ks_pool_free(bc->pool, &bc->id);
+               bc->id = NULL;
+               break;
+       }
+}
+
+KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, blade_handle_t *bh)
 {
        blade_connection_t *bc = NULL;
        ks_pool_t *pool = NULL;
@@ -74,15 +92,13 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
 
        ks_assert(bcP);
        ks_assert(bh);
-       ks_assert(transport_callbacks);
 
-       pool = blade_handle_pool_get(bh);
+       ks_pool_open(&pool);
+       ks_assert(pool);
 
        bc = ks_pool_alloc(pool, sizeof(blade_connection_t));
        bc->handle = bh;
        bc->pool = pool;
-       bc->transport_init_data = transport_init_data;
-       bc->transport_callbacks = transport_callbacks;
 
        ks_uuid(&id);
        bc->id = ks_uuid_str(pool, &id);
@@ -94,51 +110,48 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
        ks_q_create(&bc->sending, pool, 0);
        ks_assert(bc->sending);
 
-       *bcP = bc;
+       ks_assert(ks_pool_set_cleanup(pool, bc, NULL, blade_connection_cleanup) == KS_STATUS_SUCCESS);
 
        ks_log(KS_LOG_DEBUG, "Created\n");
 
+       *bcP = bc;
+
        return KS_STATUS_SUCCESS;
 }
 
 KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
 {
        blade_connection_t *bc = NULL;
+       ks_pool_t *pool = NULL;
 
        ks_assert(bcP);
        ks_assert(*bcP);
 
        bc = *bcP;
 
-       blade_connection_shutdown(bc);
-
-       ks_q_destroy(&bc->sending);
-
-       ks_rwl_destroy(&bc->lock);
-
-       ks_pool_free(bc->pool, &bc->id);
-
-       ks_pool_free(bc->pool, bcP);
+       pool = bc->pool;
+       //ks_pool_free(bc->pool, bcP);
+       ks_pool_close(&pool);
 
        ks_log(KS_LOG_DEBUG, "Destroyed\n");
 
+       *bcP = NULL;
+
        return KS_STATUS_SUCCESS;
 }
 
 KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction)
 {
+       blade_handle_t *bh = NULL;
+
        ks_assert(bc);
 
+       bh = blade_connection_handle_get(bc);
+
        bc->direction = direction;
        blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
 
-    if (ks_thread_create_ex(&bc->state_thread,
-                                                       blade_connection_state_thread,
-                                                       bc,
-                                                       KS_THREAD_FLAG_DEFAULT,
-                                                       KS_THREAD_DEFAULT_STACK,
-                                                       KS_PRI_NORMAL,
-                                                       bc->pool) != KS_STATUS_SUCCESS) {
+       if (ks_thread_pool_add_job(blade_handle_tpool_get(bh), blade_connection_state_thread, bc) != KS_STATUS_SUCCESS) {
                // @todo error logging
                return KS_STATUS_FAIL;
        }
@@ -154,14 +167,7 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
 
        ks_assert(bc);
 
-       if (bc->state_thread) {
-               bc->shutdown = KS_TRUE;
-               ks_thread_join(bc->state_thread);
-               ks_pool_free(bc->pool, &bc->state_thread);
-               bc->shutdown = KS_FALSE;
-       }
-
-       if (bc->session) ks_pool_free(bc->pool, &bc->session);
+       blade_handle_connections_remove(bc);
 
        while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
 
@@ -228,13 +234,6 @@ KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc)
 }
 
 
-KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc)
-{
-       ks_assert(bc);
-
-       return bc->transport_init_data;
-}
-
 KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
 {
        ks_assert(bc);
@@ -242,11 +241,14 @@ KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
        return bc->transport_data;
 }
 
-KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data)
+KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data, blade_transport_callbacks_t *transport_callbacks)
 {
        ks_assert(bc);
+       ks_assert(transport_data);
+       ks_assert(transport_callbacks);
 
        bc->transport_data = transport_data;
+       bc->transport_callbacks = transport_callbacks;
 }
 
 blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state)
@@ -356,18 +358,20 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
 {
        blade_connection_t *bc = NULL;
        blade_connection_state_t state;
+       ks_bool_t shutdown = KS_FALSE;
 
        ks_assert(thread);
        ks_assert(data);
 
        bc = (blade_connection_t *)data;
 
-       while (!bc->shutdown) {
+       while (!shutdown) {
                state = bc->state;
 
                switch (state) {
                case BLADE_CONNECTION_STATE_DISCONNECT:
                        blade_connection_state_on_disconnect(bc);
+                       shutdown = KS_TRUE;
                        break;
                case BLADE_CONNECTION_STATE_NEW:
                        blade_connection_state_on_new(bc);
@@ -386,10 +390,10 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                        break;
                default: break;
                }
-
-               if (state == BLADE_CONNECTION_STATE_DISCONNECT) break;
        }
 
+       blade_connection_destroy(&bc);
+
        return NULL;
 }
 
@@ -536,7 +540,6 @@ ks_status_t blade_connection_state_on_ready(blade_connection_t *bc)
        if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
 
        if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT)     blade_connection_disconnect(bc);
-       else ks_sleep_ms(1);
 
        return KS_STATUS_SUCCESS;
 }
index fd3934b274592c13352a8bdc1a8357d80b9c5147..9b085b1f7567deb57b6d089cfed81c51c0fd12fb 100644 (file)
@@ -291,7 +291,7 @@ ks_bool_t blade_chat_join_request_handler(blade_module_t *bm, blade_request_t *b
        props_participant = cJSON_GetObjectItem(props, "blade.chat.participant");
        if (props_participant && props_participant->type == cJSON_True) {
                ks_log(KS_LOG_DEBUG, "Session (%s) attempted to join chat but is already a participant\n", blade_session_id_get(bs));
-               blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -10000, "Already a participant of chat");
+               blade_rpc_error_create(&res, NULL, breq->message_id, -10000, "Already a participant of chat");
        } else {
                ks_log(KS_LOG_DEBUG, "Session (%s) joined chat\n", blade_session_id_get(bs));
 
@@ -300,7 +300,7 @@ ks_bool_t blade_chat_join_request_handler(blade_module_t *bm, blade_request_t *b
 
                ks_list_append(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and cleanup when removed
 
-               blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
+               blade_rpc_response_create(&res, NULL, breq->message_id);
 
                // @todo create an event to send to participants when a session joins and leaves, send after main response though
        }
@@ -343,7 +343,7 @@ ks_bool_t blade_chat_leave_request_handler(blade_module_t *bm, blade_request_t *
        props_participant = cJSON_GetObjectItem(props, "blade.chat.participant");
        if (!props_participant || props_participant->type == cJSON_False) {
                ks_log(KS_LOG_DEBUG, "Session (%s) attempted to leave chat but is not a participant\n", blade_session_id_get(bs));
-               blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -10000, "Not a participant of chat");
+               blade_rpc_error_create(&res, NULL, breq->message_id, -10000, "Not a participant of chat");
        } else {
                ks_log(KS_LOG_DEBUG, "Session (%s) left chat\n", blade_session_id_get(bs));
 
@@ -351,7 +351,7 @@ ks_bool_t blade_chat_leave_request_handler(blade_module_t *bm, blade_request_t *
 
                ks_list_delete(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and search manually, also free the id
 
-               blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
+               blade_rpc_response_create(&res, NULL, breq->message_id);
 
                // @todo create an event to send to participants when a session joins and leaves, send after main response though
        }
@@ -388,17 +388,17 @@ ks_bool_t blade_chat_send_request_handler(blade_module_t *bm, blade_request_t *b
        params = cJSON_GetObjectItem(breq->message, "params"); // @todo cache this in blade_request_t for quicker/easier access
        if (!params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'params' object\n", blade_session_id_get(bs));
-               blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -32602, "Missing params object");
+               blade_rpc_error_create(&res, NULL, breq->message_id, -32602, "Missing params object");
        } else if (!(message = cJSON_GetObjectCstr(params, "message"))) {
                ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'message'\n", blade_session_id_get(bs));
-               blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -32602, "Missing params message string");
+               blade_rpc_error_create(&res, NULL, breq->message_id, -32602, "Missing params message string");
        }
 
        bs = blade_handle_sessions_get(breq->handle, breq->session_id);
        ks_assert(bs);
 
        if (!res) {
-               blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
+               blade_rpc_response_create(&res, NULL, breq->message_id);
                sendevent = KS_TRUE;
        }
        blade_session_send(bs, res, NULL);
@@ -408,7 +408,7 @@ ks_bool_t blade_chat_send_request_handler(blade_module_t *bm, blade_request_t *b
        cJSON_Delete(res);
 
        if (sendevent) {
-               blade_rpc_event_create(breq->pool, &event, &res, "blade.chat.message");
+               blade_rpc_event_create(&event, &res, "blade.chat.message");
                ks_assert(event);
                cJSON_AddStringToObject(res, "from", breq->session_id); // @todo should really be the identity, but we don't have that in place yet
                cJSON_AddStringToObject(res, "message", message);
index 9f70f2b94626a15573b899e3ca8d531c8c3f74f0..b1dcb4242717aacb231aaf2f470fc427647592df 100644 (file)
@@ -38,7 +38,6 @@
 
 typedef struct blade_module_wss_s blade_module_wss_t;
 typedef struct blade_transport_wss_s blade_transport_wss_t;
-typedef struct blade_transport_wss_init_s blade_transport_wss_init_t;
 
 struct blade_module_wss_s {
        blade_handle_t *handle;
@@ -59,26 +58,17 @@ struct blade_module_wss_s {
        ks_thread_t *listeners_thread;
        struct pollfd *listeners_poll;
        int32_t listeners_count;
-
-       ks_list_t *connected; // @todo consider keeping this only as the list of connection id's, since the handle retains the pointer lookup
 };
 
 struct blade_transport_wss_s {
        blade_module_wss_t *module;
        ks_pool_t *pool;
 
+       const char *session_id;
        ks_socket_t sock;
        kws_t *kws;
 };
 
-struct blade_transport_wss_init_s {
-       blade_module_wss_t *module;
-       ks_pool_t *pool;
-
-       ks_socket_t sock;
-       const char *session_id;
-};
-
 
 
 ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh);
@@ -95,8 +85,8 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
 
 
 
-ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
-ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
+ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
+//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
 
 ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
 blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
@@ -118,14 +108,6 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready_outbound(blade_
 
 
 
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
-ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
-
-
-ks_bool_t blade_test_echo_request_handler(blade_module_t *bm, blade_request_t *breq);
-ks_bool_t blade_test_echo_response_handler(blade_response_t *bres);
-
-
 static blade_module_callbacks_t g_module_wss_callbacks =
 {
        blade_module_wss_on_load,
@@ -176,13 +158,10 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t
        bm_wss->module_callbacks = &g_module_wss_callbacks;
        bm_wss->transport_callbacks = &g_transport_wss_callbacks;
 
-       ks_list_create(&bm_wss->connected, pool);
-       ks_assert(bm_wss->connected);
+       ks_log(KS_LOG_DEBUG, "Created\n");
 
        *bm_wssP = bm_wss;
 
-       ks_log(KS_LOG_DEBUG, "Created\n");
-
        return KS_STATUS_SUCCESS;
 }
 
@@ -199,8 +178,6 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
 
        blade_module_destroy(&bm_wss->module);
 
-       ks_list_destroy(&bm_wss->connected);
-
        ks_pool_free(bm_wss->pool, bm_wssP);
 
        ks_log(KS_LOG_DEBUG, "Destroyed\n");
@@ -240,45 +217,6 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_unload(blade_module_t *bm)
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id)
-{
-       blade_transport_wss_init_t *bt_wssi = NULL;
-
-       ks_assert(bt_wssiP);
-       ks_assert(bm_wss);
-       ks_assert(sock != KS_SOCK_INVALID);
-
-    bt_wssi = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_init_t));
-       bt_wssi->module = bm_wss;
-       bt_wssi->pool = bm_wss->pool;
-       bt_wssi->sock = sock;
-       if (session_id) bt_wssi->session_id = ks_pstrdup(bt_wssi->pool, session_id);
-
-       *bt_wssiP = bt_wssi;
-
-       ks_log(KS_LOG_DEBUG, "Created\n");
-
-       return KS_STATUS_SUCCESS;
-}
-
-ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP)
-{
-       blade_transport_wss_init_t *bt_wssi = NULL;
-
-       ks_assert(bt_wssiP);
-       ks_assert(*bt_wssiP);
-
-       bt_wssi = *bt_wssiP;
-
-       if (bt_wssi->session_id) ks_pool_free(bt_wssi->pool, &bt_wssi->session_id);
-
-       ks_pool_free(bt_wssi->pool, bt_wssiP);
-
-       ks_log(KS_LOG_DEBUG, "Destroyed\n");
-
-       return KS_STATUS_SUCCESS;
-}
-
 ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t *config)
 {
        config_setting_t *wss = NULL;
@@ -391,8 +329,6 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t
 KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config)
 {
        blade_module_wss_t *bm_wss = NULL;
-       blade_space_t *space = NULL;
-       blade_method_t *method = NULL;
 
        ks_assert(bm);
        ks_assert(config);
@@ -417,7 +353,8 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
                }
        }
 
-       if (ks_thread_create_ex(&bm_wss->listeners_thread,
+       if (bm_wss->listeners_count > 0 &&
+               ks_thread_create_ex(&bm_wss->listeners_thread,
                                                        blade_module_wss_listeners_thread,
                                                        bm_wss,
                                                        KS_THREAD_FLAG_DEFAULT,
@@ -427,17 +364,6 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
 
        blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
 
-
-       blade_space_create(&space, bm_wss->handle, bm, "blade.test");
-       ks_assert(space);
-
-       blade_method_create(&method, space, "echo", blade_test_echo_request_handler);
-       ks_assert(method);
-
-       blade_space_methods_add(space, method);
-
-       blade_handle_space_register(space);
-
        ks_log(KS_LOG_DEBUG, "Started\n");
 
        return KS_STATUS_SUCCESS;
@@ -446,7 +372,6 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
 KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
 {
        blade_module_wss_t *bm_wss = NULL;
-       blade_connection_t *bc = NULL;
        ks_bool_t stopped = KS_FALSE;
 
        ks_assert(bm);
@@ -471,17 +396,6 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
        bm_wss->listeners_count = 0;
        if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
 
-       if (ks_list_size(bm_wss->connected) > 0) {
-               // this approach to shutdown is cleaner, ensures connections will detach from sessions and be destroyed all in the same places
-               ks_list_iterator_start(bm_wss->connected);
-               while (ks_list_iterator_hasnext(bm_wss->connected)) {
-                       bc = (blade_connection_t *)ks_list_iterator_next(bm_wss->connected);
-                       blade_connection_disconnect(bc);
-               }
-               ks_list_iterator_stop(bm_wss->connected);
-               while (ks_list_size(bm_wss->connected) > 0) ks_sleep_ms(100);
-       }
-
        if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n");
 
        return KS_STATUS_SUCCESS;
@@ -541,7 +455,7 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a
 void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 {
        blade_module_wss_t *bm_wss = NULL;
-       blade_transport_wss_init_t *bt_wss_init = NULL;
+       blade_transport_wss_t *bt_wss = NULL;
        blade_connection_t *bc = NULL;
 
        ks_assert(thread);
@@ -551,6 +465,10 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 
        ks_log(KS_LOG_DEBUG, "Started\n");
        while (!bm_wss->shutdown) {
+               //if (bm_wss->listeners_count == 0) {
+               //      ks_sleep_ms(500);
+               //      continue;
+               //}
                // @todo take exact timeout from a setting in config_wss_endpoints
                if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) {
                        for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
@@ -573,28 +491,31 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 
                                ks_log(KS_LOG_DEBUG, "Socket accepted\n", index);
 
-                               blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, NULL);
-                               ks_assert(bt_wss_init);
-
-                blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
+                               // @todo make new function to wrap the following code all the way through assigning initial state to reuse in outbound connects
+                blade_connection_create(&bc, bm_wss->handle);
                                ks_assert(bc);
 
+                               blade_transport_wss_create(&bt_wss, blade_connection_pool_get(bc), bm_wss, sock, NULL);
+                               ks_assert(bt_wss);
+
+                               blade_connection_transport_set(bc, bt_wss, bm_wss->transport_callbacks);
+
                                blade_connection_read_lock(bc, KS_TRUE);
 
                                if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
                                        ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
+                                       blade_connection_read_unlock(bc);
                                        blade_connection_destroy(&bc);
-                                       blade_transport_wss_init_destroy(&bt_wss_init);
-                                       ks_socket_close(&sock);
                                        continue;
                                }
                                ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
 
                                blade_handle_connections_add(bc);
-                               ks_list_append(bm_wss->connected, bc);
+
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
 
                                blade_connection_read_unlock(bc);
+                               // @todo end of reusable function, lock ensures it cannot be destroyed until this code finishes
                        }
                }
        }
@@ -603,9 +524,27 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
     return NULL;
 }
 
+static void blade_transport_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_transport_wss_t *bt_wss = (blade_transport_wss_t *)ptr;
 
+       ks_assert(bt_wss);
 
-ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock)
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               if (bt_wss->session_id) ks_pool_free(bt_wss->pool, &bt_wss->session_id);
+               if (bt_wss->kws) kws_destroy(&bt_wss->kws);
+               else ks_socket_close(&bt_wss->sock);
+               break;
+       case KS_MPCL_DESTROY:
+               break;
+       }
+}
+
+
+ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id)
 {
        blade_transport_wss_t *bt_wss = NULL;
 
@@ -613,37 +552,39 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_mo
        ks_assert(bm_wss);
        ks_assert(sock != KS_SOCK_INVALID);
 
-    bt_wss = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_t));
+    bt_wss = ks_pool_alloc(pool, sizeof(blade_transport_wss_t));
        bt_wss->module = bm_wss;
-       bt_wss->pool = bm_wss->pool;
+       bt_wss->pool = pool;
        bt_wss->sock = sock;
+       if (session_id) bt_wss->session_id = ks_pstrdup(pool, session_id);
 
-       *bt_wssP = bt_wss;
+       ks_assert(ks_pool_set_cleanup(pool, bt_wss, NULL, blade_transport_wss_cleanup) == KS_STATUS_SUCCESS);
 
        ks_log(KS_LOG_DEBUG, "Created\n");
 
-       return KS_STATUS_SUCCESS;
-}
-
-ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
-{
-       blade_transport_wss_t *bt_wss = NULL;
-
-       ks_assert(bt_wssP);
-       ks_assert(*bt_wssP);
-
-       bt_wss = *bt_wssP;
-
-       if (bt_wss->kws) kws_destroy(&bt_wss->kws);
-       else ks_socket_close(&bt_wss->sock);
-
-       ks_pool_free(bt_wss->pool, bt_wssP);
-
-       ks_log(KS_LOG_DEBUG, "Destroyed\n");
+       *bt_wssP = bt_wss;
 
        return KS_STATUS_SUCCESS;
 }
 
+//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
+//{
+//     blade_transport_wss_t *bt_wss = NULL;
+//
+//     ks_assert(bt_wssP);
+//     ks_assert(*bt_wssP);
+//
+//     bt_wss = *bt_wssP;
+//
+//     ks_pool_free(bt_wss->pool, bt_wssP);
+//
+//     ks_log(KS_LOG_DEBUG, "Destroyed\n");
+//
+//     *bt_wssP = NULL;
+//
+//     return KS_STATUS_SUCCESS;
+//}
+
 ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -654,7 +595,7 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul
        const char *ip = NULL;
        const char *portstr = NULL;
        ks_port_t port = 1234;
-       blade_transport_wss_init_t *bt_wss_init = NULL;
+       blade_transport_wss_t *bt_wss = NULL;
        blade_connection_t *bc = NULL;
 
        ks_assert(bcP);
@@ -709,28 +650,35 @@ ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_modul
 
        ks_log(KS_LOG_DEBUG, "Socket connected\n");
 
-       blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, session_id);
-       ks_assert(bt_wss_init);
-
-       blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
+       // @todo see above listener code, make reusable function for the following code
+       blade_connection_create(&bc, bm_wss->handle);
        ks_assert(bc);
 
+       blade_transport_wss_create(&bt_wss, blade_connection_pool_get(bc), bm_wss, sock, session_id);
+       ks_assert(bt_wss);
+
+       blade_connection_transport_set(bc, bt_wss, bm_wss->transport_callbacks);
+
+       blade_connection_read_lock(bc, KS_TRUE);
+
        if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_OUTBOUND) != KS_STATUS_SUCCESS) {
                ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
+               blade_connection_read_unlock(bc);
                blade_connection_destroy(&bc);
-               blade_transport_wss_init_destroy(&bt_wss_init);
-               ks_socket_close(&sock);
                ret = KS_STATUS_FAIL;
                goto done;
        }
        ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
-       // @todo make sure it's sensible to be mixing outbound and inbound connections in the same list, but this allows entering the destruction pipeline
-       // for module shutdown, disconnects and errors without special considerations
+
        blade_handle_connections_add(bc);
-       ks_list_append(bm_wss->connected, bc);
 
        blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
 
+       blade_connection_read_unlock(bc);
+
+       // @todo consider ramification of unlocking above, while returning the new connection object back to the framework, thread might run and disconnect quickly
+       // @todo have blade_handle_connect and blade_transport_wss_on_connect (and the abstracted callback) return a copy of the connection id (allocated from blade_handle_t's pool temporarily) rather than the connection pointer itself
+       // which will then require getting the connection and thus relock it for any further use, if it disconnects during that time the connection will be locked preventing obtaining and then return NULL if removed
        *bcP = bc;
 
  done:
@@ -842,7 +790,7 @@ ks_status_t blade_transport_wss_rpc_error_send(blade_connection_t *bc, const cha
 
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
-       blade_rpc_error_create(blade_connection_pool_get(bc), &json, NULL, id, code, message);
+       blade_rpc_error_create(&json, NULL, id, code, message);
 
     if (blade_transport_wss_write(bt_wss, json) != KS_STATUS_SUCCESS) {
                ks_log(KS_LOG_DEBUG, "Failed to write error message\n");
@@ -855,8 +803,7 @@ ks_status_t blade_transport_wss_rpc_error_send(blade_connection_t *bc, const cha
 
 blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
-       blade_transport_wss_t *bt_wss = NULL;
-       blade_transport_wss_init_t *bt_wss_init = NULL;
+       //blade_transport_wss_t *bt_wss = NULL;
 
        ks_assert(bc);
 
@@ -864,56 +811,32 @@ blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_conn
 
        if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 
-       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
-       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-
-       ks_list_delete(bt_wss->module->connected, bc);
+       //bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
-       if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
-       if (bt_wss) blade_transport_wss_destroy(&bt_wss); // @TODO: Scream at this very loudly until I feel better for it wasting 2 days to track down, and then fix the issue it's causing
+       //blade_transport_wss_destroy(&bt_wss);
 
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
 blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
-       blade_transport_wss_t *bt_wss = NULL;
-       blade_transport_wss_init_t *bt_wss_init = NULL;
-
        ks_assert(bc);
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
        if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 
-       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-
-       blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
-       ks_assert(bt_wss);
-
-       blade_connection_transport_set(bc, bt_wss);
-
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
 blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
-       blade_transport_wss_t *bt_wss = NULL;
-       blade_transport_wss_init_t *bt_wss_init = NULL;
-
        ks_assert(bc);
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
        if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 
-       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-
-       blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
-       ks_assert(bt_wss);
-
-       blade_connection_transport_set(bc, bt_wss);
-
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
@@ -963,7 +886,6 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_
 {
        blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
        blade_transport_wss_t *bt_wss = NULL;
-       ks_pool_t *pool = NULL;
        cJSON *json_req = NULL;
        cJSON *json_res = NULL;
        cJSON *json_params = NULL;
@@ -988,8 +910,6 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_
 
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
-       pool = blade_connection_pool_get(bc);
-
        // @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
        timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
        while (blade_transport_wss_read(bt_wss, &json_req) == KS_STATUS_SUCCESS) {
@@ -1072,13 +992,11 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_
                blade_handle_sessions_add(bs);
        }
 
-       // @todo wrapper to generate request and response
-       blade_rpc_response_create(pool, &json_res, &json_result, id);
+       blade_rpc_response_create(&json_res, &json_result, id);
        ks_assert(json_res);
 
        cJSON_AddStringToObject(json_result, "session-id", blade_session_id_get(bs));
 
-       // @todo send response
        if (blade_transport_wss_write(bt_wss, json_res) != KS_STATUS_SUCCESS) {
                ks_log(KS_LOG_DEBUG, "Failed to write response message\n");
                ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
@@ -1103,7 +1021,6 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade
        blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
        blade_handle_t *bh = NULL;
        blade_transport_wss_t *bt_wss = NULL;
-       blade_transport_wss_init_t *bt_wss_init = NULL;
        ks_pool_t *pool = NULL;
        cJSON *json_req = NULL;
        cJSON *json_params = NULL;
@@ -1125,16 +1042,15 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade
 
        bh = blade_connection_handle_get(bc);
        bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
-       bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-       pool = blade_connection_pool_get(bc);
+       pool = blade_handle_pool_get(bh);
 
 
        blade_rpc_request_create(pool, &json_req, &json_params, &mid, "blade.session.attach");
        ks_assert(json_req);
 
-       if (bt_wss_init->session_id) cJSON_AddStringToObject(json_params, "session-id", bt_wss_init->session_id);
+       if (bt_wss->session_id) cJSON_AddStringToObject(json_params, "session-id", bt_wss->session_id);
 
-       ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss_init->session_id ? bt_wss_init->session_id : "none"));
+       ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss->session_id ? bt_wss->session_id : "none"));
 
        if (blade_transport_wss_write(bt_wss, json_req) != KS_STATUS_SUCCESS) {
                ks_log(KS_LOG_DEBUG, "Failed to write request message\n");
@@ -1262,16 +1178,16 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready_outbound(blade_
        ks_assert(bc);
 
        if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) {
+               blade_handle_t *bh = NULL;
                blade_session_t *bs = NULL;
-               //cJSON *req = NULL;
 
                ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
-               bs = blade_handle_sessions_get(blade_connection_handle_get(bc), blade_connection_session_get(bc));
-               ks_assert(bs);
+               bh = blade_connection_handle_get(bc);
+               ks_assert(bh);
 
-               //blade_rpc_request_create(blade_connection_pool_get(bc), &req, NULL, NULL, "blade.test.echo");
-               //blade_session_send(bs, req, blade_test_echo_response_handler);
+               bs = blade_handle_sessions_get(bh, blade_connection_session_get(bc));
+               ks_assert(bs);
 
                blade_session_read_unlock(bs);
        }
@@ -1280,37 +1196,6 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready_outbound(blade_
 }
 
 
-
-ks_bool_t blade_test_echo_request_handler(blade_module_t *bm, blade_request_t *breq)
-{
-       blade_session_t *bs = NULL;
-       cJSON *res = NULL;
-
-       ks_assert(bm);
-       ks_assert(breq);
-
-       ks_log(KS_LOG_DEBUG, "Request Received!\n");
-
-       bs = blade_handle_sessions_get(breq->handle, breq->session_id);
-       ks_assert(bs);
-
-       blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
-       blade_session_send(bs, res, NULL);
-
-       blade_session_read_unlock(bs);
-
-       return KS_FALSE;
-}
-
-ks_bool_t blade_test_echo_response_handler(blade_response_t *bres)
-{
-       ks_assert(bres);
-
-       ks_log(KS_LOG_DEBUG, "Response Received!\n");
-
-       return KS_FALSE;
-}
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index 7a120a8d65a1612159a4c4a23166c214877ff845..191d485d51a6d080850c36a513a68618a38ca4b0 100644 (file)
@@ -205,12 +205,11 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(ks_pool_t *pool, cJSON **json,
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *id)
+KS_DECLARE(ks_status_t) blade_rpc_response_create(cJSON **json, cJSON **result, const char *id)
 {
        cJSON *root = NULL;
        cJSON *r = NULL;
 
-       ks_assert(pool);
        ks_assert(json);
        ks_assert(id);
 
@@ -229,12 +228,11 @@ KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json,
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJSON **error, const char *id, int32_t code, const char *message)
+KS_DECLARE(ks_status_t) blade_rpc_error_create(cJSON **json, cJSON **error, const char *id, int32_t code, const char *message)
 {
        cJSON *root = NULL;
        cJSON *e = NULL;
 
-       ks_assert(pool);
        ks_assert(json);
        //ks_assert(id);
        ks_assert(message);
@@ -256,13 +254,12 @@ KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJ
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_rpc_event_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *event)
+KS_DECLARE(ks_status_t) blade_rpc_event_create(cJSON **json, cJSON **result, const char *event)
 {
        cJSON *root = NULL;
        cJSON *b = NULL;
        cJSON *r = NULL;
 
-       ks_assert(pool);
        ks_assert(json);
        ks_assert(event);
 
index 41ddd0f64a6fd1cd02ee0b0b77927376d0949418..0af5eeaad4924d6ea08cc8bcc7e4b41b5231fb69 100644 (file)
@@ -90,7 +90,8 @@ static void blade_session_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool
                bs->mutex = NULL;
                bs->lock = NULL;
 
-               ks_pool_free(bs->pool, &bs->id);
+               //ks_pool_free(bs->pool, &bs->id);
+               bs->id = NULL;
                break;
        }
 }
@@ -139,7 +140,6 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
     ks_rwl_create(&bs->properties_lock, pool);
        ks_assert(bs->properties_lock);
 
-
        ks_assert(ks_pool_set_cleanup(pool, bs, NULL, blade_session_cleanup) == KS_STATUS_SUCCESS);
 
        ks_log(KS_LOG_DEBUG, "Created\n");
index 77a7cfe98c496b00bd8f8bcc12eb49ad2e5b678c..feef6f6adca8681fe74aabcaeab7a467175c9bef 100644 (file)
@@ -47,9 +47,6 @@ struct blade_handle_s {
        config_setting_t *config_directory;
        config_setting_t *config_datastore;
 
-       ks_thread_t *worker_thread;
-       ks_bool_t shutdown;
-
        ks_hash_t *transports; // registered transports exposed by modules, NOT active connections
        ks_hash_t *spaces; // registered method spaces exposed by modules
        // registered event callback registry
@@ -71,8 +68,6 @@ struct blade_handle_s {
        ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id
 };
 
-void *blade_handle_worker_thread(ks_thread_t *thread, void *data);
-
 typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t;
 struct blade_handle_transport_registration_s {
        ks_pool_t *pool;
@@ -303,17 +298,6 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
 
        // @todo load internal modules, call onload and onstartup
 
-       if (ks_thread_create_ex(&bh->worker_thread,
-               blade_handle_worker_thread,
-               bh,
-               KS_THREAD_FLAG_DEFAULT,
-               KS_THREAD_DEFAULT_STACK,
-               KS_PRI_NORMAL,
-               bh->pool) != KS_STATUS_SUCCESS) {
-               // @todo error logging
-               return KS_STATUS_FAIL;
-       }
-
        return KS_STATUS_SUCCESS;
 }
 
@@ -323,17 +307,11 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
 
        ks_assert(bh);
 
-       while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
-               void *key = NULL;
-               blade_request_t *value = NULL;
-
-               ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
-               ks_hash_remove(bh->requests, key);
+       // @todo call onshutdown for internal modules
 
-               blade_request_destroy(&value);
-               // @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
-       }
+       // @todo repeat the same as below for connections, this will catch all including those that have not yet been attached to a session for edge case cleanup
 
+       ks_hash_read_lock(bh->sessions);
        for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
                void *key = NULL;
                blade_session_t *value = NULL;
@@ -342,9 +320,21 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
 
                blade_session_hangup(value);
        }
+       ks_hash_read_unlock(bh->sessions);
        while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100);
 
-       // @todo unload internal modules, call onshutdown and onunload
+
+       // @todo call onunload for internal modules
+
+       while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
+               void *key = NULL;
+               blade_request_t *value = NULL;
+
+               ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
+               ks_hash_remove(bh->requests, key);
+
+               blade_request_destroy(&value);
+       }
 
        while ((it = ks_hash_first(bh->events, KS_UNLOCKED))) {
                void *key = NULL;
@@ -352,7 +342,6 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
 
                ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
                blade_handle_event_unregister(bh, (const char *)key);
-               // @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
        }
 
        while ((it = ks_hash_first(bh->spaces, KS_UNLOCKED))) {
@@ -361,20 +350,12 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
 
                ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
                blade_handle_space_unregister(value);
-               // @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
        }
 
        // @todo unload DSOs
 
        if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
 
-       if (bh->worker_thread) {
-               bh->shutdown = KS_TRUE;
-               ks_thread_join(bh->worker_thread);
-               ks_pool_free(bh->pool, &bh->worker_thread);
-               bh->shutdown = KS_FALSE;
-       }
-
        return KS_STATUS_SUCCESS;
 }
 
@@ -892,61 +873,6 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
        return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
 }
 
-void *blade_handle_worker_thread(ks_thread_t *thread, void *data)
-{
-       blade_handle_t *bh = NULL;
-       blade_connection_t *bc = NULL;
-       //blade_session_t *bs = NULL;
-       ks_hash_iterator_t *it = NULL;
-       ks_q_t *cleanup = NULL;
-
-       ks_assert(thread);
-       ks_assert(data);
-
-       bh = (blade_handle_t *)data;
-
-       ks_q_create(&cleanup, bh->pool, 0);
-       ks_assert(cleanup);
-
-       while (!bh->shutdown) {
-               ks_hash_write_lock(bh->connections);
-               for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       void *key = NULL;
-                       blade_connection_t *value = NULL;
-
-                       ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
-
-                       if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value);
-               }
-               ks_hash_write_unlock(bh->connections);
-
-               while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) {
-                       blade_handle_connections_remove(bc);
-                       blade_connection_destroy(&bc);
-               }
-
-               //ks_hash_write_lock(bh->sessions);
-               //for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-               //      void *key = NULL;
-               //      blade_session_t *value = NULL;
-
-               //      ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
-
-               //      if (blade_session_state_get(value) == BLADE_SESSION_STATE_CLEANUP) ks_q_push(cleanup, value);
-               //}
-               //ks_hash_write_unlock(bh->sessions);
-
-               //while (ks_q_trypop(cleanup, (void **)&bs) == KS_STATUS_SUCCESS) {
-               //      blade_handle_sessions_remove(bs);
-               //      blade_session_destroy(&bs);
-               //}
-
-               ks_sleep_ms(500);
-       }
-
-       return NULL;
-}
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index 6f580828d194d471b43470ce22463a5a4a3a5aa7..c2643a77833071e8e6f0c2b5e650b1a58a2f838c 100644 (file)
 #include <blade.h>
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
-                                                                                               blade_handle_t *bh,
-                                                                                               void *transport_data,
-                                                                                               blade_transport_callbacks_t *transport_callbacks);
+KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP);
 KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction);
 KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc);
@@ -50,9 +47,8 @@ KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bo
 KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc);
 KS_DECLARE(ks_status_t) blade_connection_write_lock(blade_connection_t *bc, ks_bool_t block);
 KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc);
-KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc);
 KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
-KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data);
+KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data, blade_transport_callbacks_t *transport_callbacks);
 KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
 KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc);
 KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
index cb4c6dff9bae87a02c69bf3750e4becd2734843f..0a8b97afb0d7f6b828f939dcedb2b734c7b4f36d 100644 (file)
@@ -47,9 +47,9 @@ KS_DECLARE(ks_status_t) blade_response_destroy(blade_response_t **bresP);
 KS_DECLARE(ks_status_t) blade_event_create(blade_event_t **bevP, blade_handle_t *bh, const char *session_id, cJSON *json);
 KS_DECLARE(ks_status_t) blade_event_destroy(blade_event_t **bevP);
 KS_DECLARE(ks_status_t) blade_rpc_request_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);
-KS_DECLARE(ks_status_t) blade_rpc_response_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *id);
-KS_DECLARE(ks_status_t) blade_rpc_error_create(ks_pool_t *pool, cJSON **json, cJSON **error, const char *id, int32_t code, const char *message);
-KS_DECLARE(ks_status_t) blade_rpc_event_create(ks_pool_t *pool, cJSON **json, cJSON **result, const char *event);
+KS_DECLARE(ks_status_t) blade_rpc_response_create(cJSON **json, cJSON **result, const char *id);
+KS_DECLARE(ks_status_t) blade_rpc_error_create(cJSON **json, cJSON **error, const char *id, int32_t code, const char *message);
+KS_DECLARE(ks_status_t) blade_rpc_event_create(cJSON **json, cJSON **result, const char *event);
 KS_END_EXTERN_C
 
 #endif
index 2ba82ca7b2928740b7bab2cbbcd1bacc73cd8f1a..3ce8cf1a08578c9082ea966a89272246c038de26 100644 (file)
@@ -171,7 +171,7 @@ KS_DECLARE(void) ks_sleep(ks_time_t microsec)
 
        do {
                QueryPerformanceCounter((LARGE_INTEGER*) &now);
-               SwitchToThread();
+               if (!SwitchToThread()) Sleep(1);
        } while ((now.QuadPart - start.QuadPart) / (float)(perfCnt.QuadPart) * 1000 * 1000 < (DWORD)microsec);
 
 }