From: Shane Bryldt Date: Tue, 18 Apr 2017 23:02:34 +0000 (-0600) Subject: FS-10167: Adjusted modules to utilize an isolated pool with auto cleanup per module... X-Git-Tag: v1.8.0~593 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2e02f3b49820648dd6dadf06e679e79af793c0c0;p=thirdparty%2Ffreeswitch.git FS-10167: Adjusted modules to utilize an isolated pool with auto cleanup per module, which also contains the implementation specific module data. Also changed the thread model of the listener for the wss module to utilize the thread pool, alleviating ownership issues during cleanup. --- diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index 66d354e00f..ec68cda642 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -41,7 +41,6 @@ struct blade_connection_s { blade_transport_callbacks_t *transport_callbacks; blade_connection_direction_t direction; - ks_thread_t *state_thread; volatile blade_connection_state_t state; const char *id; diff --git a/libs/libblade/src/blade_module.c b/libs/libblade/src/blade_module.c index 403748b0fc..2101420d44 100644 --- a/libs/libblade/src/blade_module.c +++ b/libs/libblade/src/blade_module.c @@ -41,39 +41,43 @@ struct blade_module_s { blade_module_callbacks_t *module_callbacks; }; +static void blade_module_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_module_t *bm = (blade_module_t *)ptr; + + ks_assert(bm); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + break; + case KS_MPCL_DESTROY: + break; + } +} -KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks) +KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks) { blade_module_t *bm = NULL; - ks_pool_t *pool = NULL; ks_assert(bmP); ks_assert(bh); + ks_assert(pool); ks_assert(module_data); ks_assert(module_callbacks); - pool = blade_handle_pool_get(bh); - bm = ks_pool_alloc(pool, sizeof(blade_module_t)); bm->handle = bh; bm->pool = pool; bm->module_data = module_data; bm->module_callbacks = module_callbacks; - *bmP = bm; - return KS_STATUS_SUCCESS; -} + ks_assert(ks_pool_set_cleanup(pool, bm, NULL, blade_module_cleanup) == KS_STATUS_SUCCESS); -KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP) -{ - blade_module_t *bm = NULL; + ks_log(KS_LOG_DEBUG, "Created\n"); - ks_assert(bmP); - ks_assert(*bmP); - - bm = *bmP; - - ks_pool_free(bm->pool, bmP); + *bmP = bm; return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_module_chat.c b/libs/libblade/src/blade_module_chat.c index 9b085b1f75..ea5579dc9d 100644 --- a/libs/libblade/src/blade_module_chat.c +++ b/libs/libblade/src/blade_module_chat.c @@ -71,6 +71,24 @@ static blade_module_callbacks_t g_module_chat_callbacks = }; +static void blade_module_chat_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_module_chat_t *bm_chat = (blade_module_chat_t *)ptr; + + ks_assert(bm_chat); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + //ks_list_destroy(&bm_chat->participants); + blade_module_chat_on_shutdown(bm_chat->module); + break; + case KS_MPCL_DESTROY: + break; + } +} + ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handle_t *bh) { @@ -80,7 +98,8 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl ks_assert(bm_chatP); ks_assert(bh); - pool = blade_handle_pool_get(bh); + ks_pool_open(&pool); + ks_assert(pool); bm_chat = ks_pool_alloc(pool, sizeof(blade_module_chat_t)); bm_chat->handle = bh; @@ -91,35 +110,36 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl ks_list_create(&bm_chat->participants, pool); ks_assert(bm_chat->participants); - blade_module_create(&bm_chat->module, bh, bm_chat, &g_module_chat_callbacks); + blade_module_create(&bm_chat->module, bh, pool, bm_chat, &g_module_chat_callbacks); bm_chat->module_callbacks = &g_module_chat_callbacks; - *bm_chatP = bm_chat; + ks_assert(ks_pool_set_cleanup(pool, bm_chat, NULL, blade_module_chat_cleanup) == KS_STATUS_SUCCESS); ks_log(KS_LOG_DEBUG, "Created\n"); + *bm_chatP = bm_chat; + return KS_STATUS_SUCCESS; } ks_status_t blade_module_chat_destroy(blade_module_chat_t **bm_chatP) { blade_module_chat_t *bm_chat = NULL; + ks_pool_t *pool = NULL; ks_assert(bm_chatP); ks_assert(*bm_chatP); bm_chat = *bm_chatP; - blade_module_chat_on_shutdown(bm_chat->module); - - ks_list_destroy(&bm_chat->participants); - - blade_module_destroy(&bm_chat->module); - - ks_pool_free(bm_chat->pool, bm_chatP); + pool = bm_chat->pool; + //ks_pool_free(bm_chat->pool, bm_chatP); + ks_pool_close(&pool); ks_log(KS_LOG_DEBUG, "Destroyed\n"); + *bm_chatP = NULL; + return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 881346c0ff..e23dd9aa56 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -42,7 +42,6 @@ typedef struct blade_transport_wss_s blade_transport_wss_t; struct blade_module_wss_s { blade_handle_t *handle; ks_pool_t *pool; - ks_thread_pool_t *tpool; blade_module_t *module; blade_module_callbacks_t *module_callbacks; blade_transport_callbacks_t *transport_callbacks; @@ -53,9 +52,8 @@ struct blade_module_wss_s { int32_t config_wss_endpoints_ipv6_length; int32_t config_wss_endpoints_backlog; - ks_bool_t shutdown; + volatile ks_bool_t shutdown; - ks_thread_t *listeners_thread; struct pollfd *listeners_poll; int32_t listeners_count; }; @@ -86,7 +84,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data); ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id); -//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP); ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id); blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target); @@ -138,6 +135,22 @@ static blade_transport_callbacks_t g_transport_wss_callbacks = }; +static void blade_module_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_module_wss_t *bm_wss = (blade_module_wss_t *)ptr; + + ks_assert(bm_wss); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + blade_module_wss_on_shutdown(bm_wss->module); + break; + case KS_MPCL_DESTROY: + break; + } +} ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh) { @@ -147,17 +160,19 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t ks_assert(bm_wssP); ks_assert(bh); - pool = blade_handle_pool_get(bh); + ks_pool_open(&pool); + ks_assert(pool); bm_wss = ks_pool_alloc(pool, sizeof(blade_module_wss_t)); bm_wss->handle = bh; bm_wss->pool = pool; - bm_wss->tpool = blade_handle_tpool_get(bh); - blade_module_create(&bm_wss->module, bh, bm_wss, &g_module_wss_callbacks); + blade_module_create(&bm_wss->module, bh, pool, bm_wss, &g_module_wss_callbacks); bm_wss->module_callbacks = &g_module_wss_callbacks; bm_wss->transport_callbacks = &g_transport_wss_callbacks; + ks_assert(ks_pool_set_cleanup(pool, bm_wss, NULL, blade_module_wss_cleanup) == KS_STATUS_SUCCESS); + ks_log(KS_LOG_DEBUG, "Created\n"); *bm_wssP = bm_wss; @@ -168,20 +183,21 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP) { blade_module_wss_t *bm_wss = NULL; + ks_pool_t *pool = NULL; ks_assert(bm_wssP); ks_assert(*bm_wssP); bm_wss = *bm_wssP; - blade_module_wss_on_shutdown(bm_wss->module); - - blade_module_destroy(&bm_wss->module); - - ks_pool_free(bm_wss->pool, bm_wssP); + pool = bm_wss->pool; + //ks_pool_free(bm_wss->pool, bm_wssP); + ks_pool_close(&pool); ks_log(KS_LOG_DEBUG, "Destroyed\n"); + *bm_wssP = NULL; + return KS_STATUS_SUCCESS; } @@ -353,14 +369,12 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s } } + if (bm_wss->listeners_count > 0 && - ks_thread_create_ex(&bm_wss->listeners_thread, - blade_module_wss_listeners_thread, - bm_wss, - KS_THREAD_FLAG_DEFAULT, - KS_THREAD_DEFAULT_STACK, - KS_PRI_NORMAL, - bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + ks_thread_pool_add_job(blade_handle_tpool_get(bm_wss->handle), blade_module_wss_listeners_thread, bm_wss) != KS_STATUS_SUCCESS) { + // @todo error logging + return KS_STATUS_FAIL; + } blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks); @@ -372,31 +386,25 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm) { blade_module_wss_t *bm_wss = NULL; - ks_bool_t stopped = KS_FALSE; ks_assert(bm); bm_wss = (blade_module_wss_t *)blade_module_data_get(bm); - blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME); - - if (bm_wss->listeners_thread) { + if (bm_wss->listeners_count > 0) { bm_wss->shutdown = KS_TRUE; - ks_thread_join(bm_wss->listeners_thread); - ks_pool_free(bm_wss->pool, &bm_wss->listeners_thread); - bm_wss->shutdown = KS_FALSE; - stopped = KS_TRUE; + while (bm_wss->shutdown) ks_sleep_ms(1); } + blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME); + for (int32_t index = 0; index < bm_wss->listeners_count; ++index) { ks_socket_t sock = bm_wss->listeners_poll[index].fd; ks_socket_shutdown(sock, SHUT_RDWR); ks_socket_close(&sock); } - bm_wss->listeners_count = 0; - if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll); - if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n"); + ks_log(KS_LOG_DEBUG, "Stopped\n"); return KS_STATUS_SUCCESS; } @@ -465,10 +473,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) ks_log(KS_LOG_DEBUG, "Started\n"); while (!bm_wss->shutdown) { - //if (bm_wss->listeners_count == 0) { - // ks_sleep_ms(500); - // continue; - //} // @todo take exact timeout from a setting in config_wss_endpoints if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) { for (int32_t index = 0; index < bm_wss->listeners_count; ++index) { @@ -521,6 +525,8 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) } ks_log(KS_LOG_DEBUG, "Stopped\n"); + bm_wss->shutdown = KS_FALSE; + return NULL; } @@ -567,24 +573,6 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_ return KS_STATUS_SUCCESS; } -//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP) -//{ -// blade_transport_wss_t *bt_wss = NULL; -// -// ks_assert(bt_wssP); -// ks_assert(*bt_wssP); -// -// bt_wss = *bt_wssP; -// -// ks_pool_free(bt_wss->pool, bt_wssP); -// -// ks_log(KS_LOG_DEBUG, "Destroyed\n"); -// -// *bt_wssP = NULL; -// -// return KS_STATUS_SUCCESS; -//} - ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id) { ks_status_t ret = KS_STATUS_SUCCESS; diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index 04aac39dfa..a2412613ec 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -475,7 +475,7 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) while (!shutdown) { // Entering the call below, the mutex is expected to be locked and will be unlocked by the call - ks_cond_timedwait(bs->cond, 500); + ks_cond_timedwait(bs->cond, 100); // Leaving the call above, the mutex will be locked after being signalled, timing out, or woken up for any reason state = bs->state; @@ -486,6 +486,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) { blade_connection_sending_push(bc, json); blade_connection_read_unlock(bc); + } else { + // @todo review this, possible the connection is dropped after popping a message, which results in it just being deleted without sending } cJSON_Delete(json); } @@ -503,15 +505,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) break; case BLADE_SESSION_STATE_CONNECT: ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id); - //ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_ATTACH: ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id); - //ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_DETACH: ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id); - //ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_READY: blade_session_state_on_ready(bs); diff --git a/libs/libblade/src/include/blade_module.h b/libs/libblade/src/include/blade_module.h index e5fc558f16..ff2b942f48 100644 --- a/libs/libblade/src/include/blade_module.h +++ b/libs/libblade/src/include/blade_module.h @@ -36,8 +36,7 @@ #include KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks); -KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP); +KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks); KS_DECLARE(blade_handle_t *) blade_module_handle_get(blade_module_t *bm); KS_DECLARE(void *) blade_module_data_get(blade_module_t *bm); diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index 10c000f086..c5f8c01065 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -87,8 +87,6 @@ int main(int argc, char **argv) blade_handle_session_state_callback_unregister(bh, session_state_callback_id); - blade_module_wss_on_shutdown(mod_wss); - blade_module_wss_on_unload(mod_wss); blade_handle_destroy(&bh); diff --git a/libs/libblade/test/blades.c b/libs/libblade/test/blades.c index 638074db05..42af3a8448 100644 --- a/libs/libblade/test/blades.c +++ b/libs/libblade/test/blades.c @@ -80,8 +80,6 @@ int main(int argc, char **argv) //blade_module_chat_on_shutdown(mod_chat); //blade_module_chat_on_unload(mod_chat); - blade_module_wss_on_shutdown(mod_wss); - blade_module_wss_on_unload(mod_wss); blade_handle_destroy(&bh);