blade_module_callbacks_t *module_callbacks;
};
+static void blade_module_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+ blade_module_t *bm = (blade_module_t *)ptr;
+
+ ks_assert(bm);
+
+ switch (action) {
+ case KS_MPCL_ANNOUNCE:
+ break;
+ case KS_MPCL_TEARDOWN:
+ break;
+ case KS_MPCL_DESTROY:
+ break;
+ }
+}
-KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks)
+KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks)
{
blade_module_t *bm = NULL;
- ks_pool_t *pool = NULL;
ks_assert(bmP);
ks_assert(bh);
+ ks_assert(pool);
ks_assert(module_data);
ks_assert(module_callbacks);
- pool = blade_handle_pool_get(bh);
-
bm = ks_pool_alloc(pool, sizeof(blade_module_t));
bm->handle = bh;
bm->pool = pool;
bm->module_data = module_data;
bm->module_callbacks = module_callbacks;
- *bmP = bm;
- return KS_STATUS_SUCCESS;
-}
+ ks_assert(ks_pool_set_cleanup(pool, bm, NULL, blade_module_cleanup) == KS_STATUS_SUCCESS);
-KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP)
-{
- blade_module_t *bm = NULL;
+ ks_log(KS_LOG_DEBUG, "Created\n");
- ks_assert(bmP);
- ks_assert(*bmP);
-
- bm = *bmP;
-
- ks_pool_free(bm->pool, bmP);
+ *bmP = bm;
return KS_STATUS_SUCCESS;
}
};
+static void blade_module_chat_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+ blade_module_chat_t *bm_chat = (blade_module_chat_t *)ptr;
+
+ ks_assert(bm_chat);
+
+ switch (action) {
+ case KS_MPCL_ANNOUNCE:
+ break;
+ case KS_MPCL_TEARDOWN:
+ //ks_list_destroy(&bm_chat->participants);
+ blade_module_chat_on_shutdown(bm_chat->module);
+ break;
+ case KS_MPCL_DESTROY:
+ break;
+ }
+}
+
ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handle_t *bh)
{
ks_assert(bm_chatP);
ks_assert(bh);
- pool = blade_handle_pool_get(bh);
+ ks_pool_open(&pool);
+ ks_assert(pool);
bm_chat = ks_pool_alloc(pool, sizeof(blade_module_chat_t));
bm_chat->handle = bh;
ks_list_create(&bm_chat->participants, pool);
ks_assert(bm_chat->participants);
- blade_module_create(&bm_chat->module, bh, bm_chat, &g_module_chat_callbacks);
+ blade_module_create(&bm_chat->module, bh, pool, bm_chat, &g_module_chat_callbacks);
bm_chat->module_callbacks = &g_module_chat_callbacks;
- *bm_chatP = bm_chat;
+ ks_assert(ks_pool_set_cleanup(pool, bm_chat, NULL, blade_module_chat_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n");
+ *bm_chatP = bm_chat;
+
return KS_STATUS_SUCCESS;
}
ks_status_t blade_module_chat_destroy(blade_module_chat_t **bm_chatP)
{
blade_module_chat_t *bm_chat = NULL;
+ ks_pool_t *pool = NULL;
ks_assert(bm_chatP);
ks_assert(*bm_chatP);
bm_chat = *bm_chatP;
- blade_module_chat_on_shutdown(bm_chat->module);
-
- ks_list_destroy(&bm_chat->participants);
-
- blade_module_destroy(&bm_chat->module);
-
- ks_pool_free(bm_chat->pool, bm_chatP);
+ pool = bm_chat->pool;
+ //ks_pool_free(bm_chat->pool, bm_chatP);
+ ks_pool_close(&pool);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
+ *bm_chatP = NULL;
+
return KS_STATUS_SUCCESS;
}
struct blade_module_wss_s {
blade_handle_t *handle;
ks_pool_t *pool;
- ks_thread_pool_t *tpool;
blade_module_t *module;
blade_module_callbacks_t *module_callbacks;
blade_transport_callbacks_t *transport_callbacks;
int32_t config_wss_endpoints_ipv6_length;
int32_t config_wss_endpoints_backlog;
- ks_bool_t shutdown;
+ volatile ks_bool_t shutdown;
- ks_thread_t *listeners_thread;
struct pollfd *listeners_poll;
int32_t listeners_count;
};
ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
-//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
};
+static void blade_module_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+ blade_module_wss_t *bm_wss = (blade_module_wss_t *)ptr;
+
+ ks_assert(bm_wss);
+
+ switch (action) {
+ case KS_MPCL_ANNOUNCE:
+ break;
+ case KS_MPCL_TEARDOWN:
+ blade_module_wss_on_shutdown(bm_wss->module);
+ break;
+ case KS_MPCL_DESTROY:
+ break;
+ }
+}
ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh)
{
ks_assert(bm_wssP);
ks_assert(bh);
- pool = blade_handle_pool_get(bh);
+ ks_pool_open(&pool);
+ ks_assert(pool);
bm_wss = ks_pool_alloc(pool, sizeof(blade_module_wss_t));
bm_wss->handle = bh;
bm_wss->pool = pool;
- bm_wss->tpool = blade_handle_tpool_get(bh);
- blade_module_create(&bm_wss->module, bh, bm_wss, &g_module_wss_callbacks);
+ blade_module_create(&bm_wss->module, bh, pool, bm_wss, &g_module_wss_callbacks);
bm_wss->module_callbacks = &g_module_wss_callbacks;
bm_wss->transport_callbacks = &g_transport_wss_callbacks;
+ ks_assert(ks_pool_set_cleanup(pool, bm_wss, NULL, blade_module_wss_cleanup) == KS_STATUS_SUCCESS);
+
ks_log(KS_LOG_DEBUG, "Created\n");
*bm_wssP = bm_wss;
ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
{
blade_module_wss_t *bm_wss = NULL;
+ ks_pool_t *pool = NULL;
ks_assert(bm_wssP);
ks_assert(*bm_wssP);
bm_wss = *bm_wssP;
- blade_module_wss_on_shutdown(bm_wss->module);
-
- blade_module_destroy(&bm_wss->module);
-
- ks_pool_free(bm_wss->pool, bm_wssP);
+ pool = bm_wss->pool;
+ //ks_pool_free(bm_wss->pool, bm_wssP);
+ ks_pool_close(&pool);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
+ *bm_wssP = NULL;
+
return KS_STATUS_SUCCESS;
}
}
}
+
if (bm_wss->listeners_count > 0 &&
- ks_thread_create_ex(&bm_wss->listeners_thread,
- blade_module_wss_listeners_thread,
- bm_wss,
- KS_THREAD_FLAG_DEFAULT,
- KS_THREAD_DEFAULT_STACK,
- KS_PRI_NORMAL,
- bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ ks_thread_pool_add_job(blade_handle_tpool_get(bm_wss->handle), blade_module_wss_listeners_thread, bm_wss) != KS_STATUS_SUCCESS) {
+ // @todo error logging
+ return KS_STATUS_FAIL;
+ }
blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
{
blade_module_wss_t *bm_wss = NULL;
- ks_bool_t stopped = KS_FALSE;
ks_assert(bm);
bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
- blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME);
-
- if (bm_wss->listeners_thread) {
+ if (bm_wss->listeners_count > 0) {
bm_wss->shutdown = KS_TRUE;
- ks_thread_join(bm_wss->listeners_thread);
- ks_pool_free(bm_wss->pool, &bm_wss->listeners_thread);
- bm_wss->shutdown = KS_FALSE;
- stopped = KS_TRUE;
+ while (bm_wss->shutdown) ks_sleep_ms(1);
}
+ blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME);
+
for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
ks_socket_t sock = bm_wss->listeners_poll[index].fd;
ks_socket_shutdown(sock, SHUT_RDWR);
ks_socket_close(&sock);
}
- bm_wss->listeners_count = 0;
- if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
- if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n");
+ ks_log(KS_LOG_DEBUG, "Stopped\n");
return KS_STATUS_SUCCESS;
}
ks_log(KS_LOG_DEBUG, "Started\n");
while (!bm_wss->shutdown) {
- //if (bm_wss->listeners_count == 0) {
- // ks_sleep_ms(500);
- // continue;
- //}
// @todo take exact timeout from a setting in config_wss_endpoints
if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) {
for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
}
ks_log(KS_LOG_DEBUG, "Stopped\n");
+ bm_wss->shutdown = KS_FALSE;
+
return NULL;
}
return KS_STATUS_SUCCESS;
}
-//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
-//{
-// blade_transport_wss_t *bt_wss = NULL;
-//
-// ks_assert(bt_wssP);
-// ks_assert(*bt_wssP);
-//
-// bt_wss = *bt_wssP;
-//
-// ks_pool_free(bt_wss->pool, bt_wssP);
-//
-// ks_log(KS_LOG_DEBUG, "Destroyed\n");
-//
-// *bt_wssP = NULL;
-//
-// return KS_STATUS_SUCCESS;
-//}
-
ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
{
ks_status_t ret = KS_STATUS_SUCCESS;
while (!shutdown) {
// Entering the call below, the mutex is expected to be locked and will be unlocked by the call
- ks_cond_timedwait(bs->cond, 500);
+ ks_cond_timedwait(bs->cond, 100);
// Leaving the call above, the mutex will be locked after being signalled, timing out, or woken up for any reason
state = bs->state;
if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) {
blade_connection_sending_push(bc, json);
blade_connection_read_unlock(bc);
+ } else {
+ // @todo review this, possible the connection is dropped after popping a message, which results in it just being deleted without sending
}
cJSON_Delete(json);
}
break;
case BLADE_SESSION_STATE_CONNECT:
ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id);
- //ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_ATTACH:
ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id);
- //ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_DETACH:
ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id);
- //ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_READY:
blade_session_state_on_ready(bs);