]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: Adjusted modules to utilize an isolated pool with auto cleanup per module...
authorShane Bryldt <astaelan@gmail.com>
Tue, 18 Apr 2017 23:02:34 +0000 (17:02 -0600)
committerShane Bryldt <astaelan@gmail.com>
Tue, 18 Apr 2017 23:02:34 +0000 (17:02 -0600)
libs/libblade/src/blade_connection.c
libs/libblade/src/blade_module.c
libs/libblade/src/blade_module_chat.c
libs/libblade/src/blade_module_wss.c
libs/libblade/src/blade_session.c
libs/libblade/src/include/blade_module.h
libs/libblade/test/bladec.c
libs/libblade/test/blades.c

index 66d354e00f8fb434df67b6af2b8c316a737d8c95..ec68cda64200524e92700a481951a2e16126ce56 100644 (file)
@@ -41,7 +41,6 @@ struct blade_connection_s {
        blade_transport_callbacks_t *transport_callbacks;
 
        blade_connection_direction_t direction;
-    ks_thread_t *state_thread;
        volatile blade_connection_state_t state;
 
        const char *id;
index 403748b0fc1677e5c830942b2412efb704f49718..2101420d44c2e723efcae287f0a81edc69e85a5f 100644 (file)
@@ -41,39 +41,43 @@ struct blade_module_s {
        blade_module_callbacks_t *module_callbacks;
 };
 
+static void blade_module_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_module_t *bm = (blade_module_t *)ptr;
+
+       ks_assert(bm);
+
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               break;
+       case KS_MPCL_DESTROY:
+               break;
+       }
+}
 
-KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks)
+KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks)
 {
        blade_module_t *bm = NULL;
-       ks_pool_t *pool = NULL;
 
        ks_assert(bmP);
        ks_assert(bh);
+       ks_assert(pool);
        ks_assert(module_data);
        ks_assert(module_callbacks);
 
-       pool = blade_handle_pool_get(bh);
-
        bm = ks_pool_alloc(pool, sizeof(blade_module_t));
        bm->handle = bh;
        bm->pool = pool;
        bm->module_data = module_data;
        bm->module_callbacks = module_callbacks;
-       *bmP = bm;
 
-       return KS_STATUS_SUCCESS;
-}
+       ks_assert(ks_pool_set_cleanup(pool, bm, NULL, blade_module_cleanup) == KS_STATUS_SUCCESS);
 
-KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP)
-{
-       blade_module_t *bm = NULL;
+       ks_log(KS_LOG_DEBUG, "Created\n");
 
-       ks_assert(bmP);
-       ks_assert(*bmP);
-
-       bm = *bmP;
-
-       ks_pool_free(bm->pool, bmP);
+       *bmP = bm;
 
        return KS_STATUS_SUCCESS;
 }
index 9b085b1f7567deb57b6d089cfed81c51c0fd12fb..ea5579dc9d740b59e1c99f672c8eb77d2012c300 100644 (file)
@@ -71,6 +71,24 @@ static blade_module_callbacks_t g_module_chat_callbacks =
 };
 
 
+static void blade_module_chat_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_module_chat_t *bm_chat = (blade_module_chat_t *)ptr;
+
+       ks_assert(bm_chat);
+
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               //ks_list_destroy(&bm_chat->participants);
+               blade_module_chat_on_shutdown(bm_chat->module);
+               break;
+       case KS_MPCL_DESTROY:
+               break;
+       }
+}
+
 
 ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handle_t *bh)
 {
@@ -80,7 +98,8 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl
        ks_assert(bm_chatP);
        ks_assert(bh);
 
-       pool = blade_handle_pool_get(bh);
+       ks_pool_open(&pool);
+       ks_assert(pool);
 
     bm_chat = ks_pool_alloc(pool, sizeof(blade_module_chat_t));
        bm_chat->handle = bh;
@@ -91,35 +110,36 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl
        ks_list_create(&bm_chat->participants, pool);
        ks_assert(bm_chat->participants);
 
-       blade_module_create(&bm_chat->module, bh, bm_chat, &g_module_chat_callbacks);
+       blade_module_create(&bm_chat->module, bh, pool, bm_chat, &g_module_chat_callbacks);
        bm_chat->module_callbacks = &g_module_chat_callbacks;
 
-       *bm_chatP = bm_chat;
+       ks_assert(ks_pool_set_cleanup(pool, bm_chat, NULL, blade_module_chat_cleanup) == KS_STATUS_SUCCESS);
 
        ks_log(KS_LOG_DEBUG, "Created\n");
 
+       *bm_chatP = bm_chat;
+
        return KS_STATUS_SUCCESS;
 }
 
 ks_status_t blade_module_chat_destroy(blade_module_chat_t **bm_chatP)
 {
        blade_module_chat_t *bm_chat = NULL;
+       ks_pool_t *pool = NULL;
 
        ks_assert(bm_chatP);
        ks_assert(*bm_chatP);
 
        bm_chat = *bm_chatP;
 
-       blade_module_chat_on_shutdown(bm_chat->module);
-
-       ks_list_destroy(&bm_chat->participants);
-
-       blade_module_destroy(&bm_chat->module);
-
-       ks_pool_free(bm_chat->pool, bm_chatP);
+       pool = bm_chat->pool;
+       //ks_pool_free(bm_chat->pool, bm_chatP);
+       ks_pool_close(&pool);
 
        ks_log(KS_LOG_DEBUG, "Destroyed\n");
 
+       *bm_chatP = NULL;
+
        return KS_STATUS_SUCCESS;
 }
 
index 881346c0fffb6b32da7d7972027a0b424f4eff95..e23dd9aa561c5a9309ea6b5e9245f2d122ebbda8 100644 (file)
@@ -42,7 +42,6 @@ typedef struct blade_transport_wss_s blade_transport_wss_t;
 struct blade_module_wss_s {
        blade_handle_t *handle;
        ks_pool_t *pool;
-       ks_thread_pool_t *tpool;
        blade_module_t *module;
        blade_module_callbacks_t *module_callbacks;
        blade_transport_callbacks_t *transport_callbacks;
@@ -53,9 +52,8 @@ struct blade_module_wss_s {
        int32_t config_wss_endpoints_ipv6_length;
        int32_t config_wss_endpoints_backlog;
 
-       ks_bool_t shutdown;
+       volatile ks_bool_t shutdown;
 
-       ks_thread_t *listeners_thread;
        struct pollfd *listeners_poll;
        int32_t listeners_count;
 };
@@ -86,7 +84,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
 
 
 ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
-//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
 
 ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
 blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
@@ -138,6 +135,22 @@ static blade_transport_callbacks_t g_transport_wss_callbacks =
 };
 
 
+static void blade_module_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_module_wss_t *bm_wss = (blade_module_wss_t *)ptr;
+
+       ks_assert(bm_wss);
+
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               blade_module_wss_on_shutdown(bm_wss->module);
+               break;
+       case KS_MPCL_DESTROY:
+               break;
+       }
+}
 
 ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh)
 {
@@ -147,17 +160,19 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t
        ks_assert(bm_wssP);
        ks_assert(bh);
 
-       pool = blade_handle_pool_get(bh);
+       ks_pool_open(&pool);
+       ks_assert(pool);
 
     bm_wss = ks_pool_alloc(pool, sizeof(blade_module_wss_t));
        bm_wss->handle = bh;
        bm_wss->pool = pool;
-       bm_wss->tpool = blade_handle_tpool_get(bh);
 
-       blade_module_create(&bm_wss->module, bh, bm_wss, &g_module_wss_callbacks);
+       blade_module_create(&bm_wss->module, bh, pool, bm_wss, &g_module_wss_callbacks);
        bm_wss->module_callbacks = &g_module_wss_callbacks;
        bm_wss->transport_callbacks = &g_transport_wss_callbacks;
 
+       ks_assert(ks_pool_set_cleanup(pool, bm_wss, NULL, blade_module_wss_cleanup) == KS_STATUS_SUCCESS);
+
        ks_log(KS_LOG_DEBUG, "Created\n");
 
        *bm_wssP = bm_wss;
@@ -168,20 +183,21 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t
 ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
 {
        blade_module_wss_t *bm_wss = NULL;
+       ks_pool_t *pool = NULL;
 
        ks_assert(bm_wssP);
        ks_assert(*bm_wssP);
 
        bm_wss = *bm_wssP;
 
-       blade_module_wss_on_shutdown(bm_wss->module);
-
-       blade_module_destroy(&bm_wss->module);
-
-       ks_pool_free(bm_wss->pool, bm_wssP);
+       pool = bm_wss->pool;
+       //ks_pool_free(bm_wss->pool, bm_wssP);
+       ks_pool_close(&pool);
 
        ks_log(KS_LOG_DEBUG, "Destroyed\n");
 
+       *bm_wssP = NULL;
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -353,14 +369,12 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
                }
        }
 
+
        if (bm_wss->listeners_count > 0 &&
-               ks_thread_create_ex(&bm_wss->listeners_thread,
-                                                       blade_module_wss_listeners_thread,
-                                                       bm_wss,
-                                                       KS_THREAD_FLAG_DEFAULT,
-                                                       KS_THREAD_DEFAULT_STACK,
-                                                       KS_PRI_NORMAL,
-                                                       bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+               ks_thread_pool_add_job(blade_handle_tpool_get(bm_wss->handle), blade_module_wss_listeners_thread, bm_wss) != KS_STATUS_SUCCESS) {
+               // @todo error logging
+               return KS_STATUS_FAIL;
+       }
 
        blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
 
@@ -372,31 +386,25 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
 KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
 {
        blade_module_wss_t *bm_wss = NULL;
-       ks_bool_t stopped = KS_FALSE;
 
        ks_assert(bm);
 
        bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
 
-       blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME);
-
-       if (bm_wss->listeners_thread) {
+       if (bm_wss->listeners_count > 0) {
                bm_wss->shutdown = KS_TRUE;
-               ks_thread_join(bm_wss->listeners_thread);
-               ks_pool_free(bm_wss->pool, &bm_wss->listeners_thread);
-               bm_wss->shutdown = KS_FALSE;
-               stopped = KS_TRUE;
+               while (bm_wss->shutdown) ks_sleep_ms(1);
        }
 
+       blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME);
+
        for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
                ks_socket_t sock = bm_wss->listeners_poll[index].fd;
                ks_socket_shutdown(sock, SHUT_RDWR);
                ks_socket_close(&sock);
        }
-       bm_wss->listeners_count = 0;
-       if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
 
-       if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n");
+       ks_log(KS_LOG_DEBUG, "Stopped\n");
 
        return KS_STATUS_SUCCESS;
 }
@@ -465,10 +473,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
 
        ks_log(KS_LOG_DEBUG, "Started\n");
        while (!bm_wss->shutdown) {
-               //if (bm_wss->listeners_count == 0) {
-               //      ks_sleep_ms(500);
-               //      continue;
-               //}
                // @todo take exact timeout from a setting in config_wss_endpoints
                if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) {
                        for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
@@ -521,6 +525,8 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
        }
        ks_log(KS_LOG_DEBUG, "Stopped\n");
 
+       bm_wss->shutdown = KS_FALSE;
+
     return NULL;
 }
 
@@ -567,24 +573,6 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_
        return KS_STATUS_SUCCESS;
 }
 
-//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
-//{
-//     blade_transport_wss_t *bt_wss = NULL;
-//
-//     ks_assert(bt_wssP);
-//     ks_assert(*bt_wssP);
-//
-//     bt_wss = *bt_wssP;
-//
-//     ks_pool_free(bt_wss->pool, bt_wssP);
-//
-//     ks_log(KS_LOG_DEBUG, "Destroyed\n");
-//
-//     *bt_wssP = NULL;
-//
-//     return KS_STATUS_SUCCESS;
-//}
-
 ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
index 04aac39dfa5e8d365663728e410c1a273bdf13fb..a2412613eca92bf05ec9914430f16f9894ad692c 100644 (file)
@@ -475,7 +475,7 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
 
        while (!shutdown) {
                // Entering the call below, the mutex is expected to be locked and will be unlocked by the call
-               ks_cond_timedwait(bs->cond, 500);
+               ks_cond_timedwait(bs->cond, 100);
                // Leaving the call above, the mutex will be locked after being signalled, timing out, or woken up for any reason
 
                state = bs->state;
@@ -486,6 +486,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
                                if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) {
                                        blade_connection_sending_push(bc, json);
                                        blade_connection_read_unlock(bc);
+                               } else {
+                                       // @todo review this, possible the connection is dropped after popping a message, which results in it just being deleted without sending
                                }
                                cJSON_Delete(json);
                        }
@@ -503,15 +505,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
                        break;
                case BLADE_SESSION_STATE_CONNECT:
                        ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id);
-                       //ks_sleep_ms(1000);
                        break;
                case BLADE_SESSION_STATE_ATTACH:
                        ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id);
-                       //ks_sleep_ms(1000);
                        break;
                case BLADE_SESSION_STATE_DETACH:
                        ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id);
-                       //ks_sleep_ms(1000);
                        break;
                case BLADE_SESSION_STATE_READY:
                        blade_session_state_on_ready(bs);
index e5fc558f16c9f7b47aaf263adc8259b2f219b9c8..ff2b942f4811029dc6e756fcf4f428098a0b9d9b 100644 (file)
@@ -36,8 +36,7 @@
 #include <blade.h>
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks);
-KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP);
+KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks);
 KS_DECLARE(blade_handle_t *) blade_module_handle_get(blade_module_t *bm);
 KS_DECLARE(void *) blade_module_data_get(blade_module_t *bm);
 
index 10c000f086d2892100afc4ddaca8b8697c7669ac..c5f8c010659dca7f5715e12ac0e6d1b0bb544e3a 100644 (file)
@@ -87,8 +87,6 @@ int main(int argc, char **argv)
 
        blade_handle_session_state_callback_unregister(bh, session_state_callback_id);
 
-       blade_module_wss_on_shutdown(mod_wss);
-
        blade_module_wss_on_unload(mod_wss);
 
        blade_handle_destroy(&bh);
index 638074db0566fedd765b16369ae8cc6962787bd7..42af3a8448b0ede49d33d46d2e862753e4bbc8b3 100644 (file)
@@ -80,8 +80,6 @@ int main(int argc, char **argv)
        //blade_module_chat_on_shutdown(mod_chat);
        //blade_module_chat_on_unload(mod_chat);
 
-       blade_module_wss_on_shutdown(mod_wss);
-
        blade_module_wss_on_unload(mod_wss);
 
        blade_handle_destroy(&bh);