blade_handle_t *handle;
ks_pool_t *pool;
- void *transport_init_data;
void *transport_data;
blade_transport_callbacks_t *transport_callbacks;
- ks_bool_t shutdown;
blade_connection_direction_t direction;
ks_thread_t *state_thread;
blade_connection_state_t state;
ks_status_t blade_connection_state_on_ready(blade_connection_t *bc);
-KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
- blade_handle_t *bh,
- void *transport_init_data,
- blade_transport_callbacks_t *transport_callbacks)
+static void blade_connection_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+ blade_connection_t *bc = (blade_connection_t *)ptr;
+
+ ks_assert(bc);
+
+ switch (action) {
+ case KS_MPCL_ANNOUNCE:
+ break;
+ case KS_MPCL_TEARDOWN:
+ blade_connection_shutdown(bc);
+ break;
+ case KS_MPCL_DESTROY:
+ // @todo remove this, it's just for posterity in debugging
+ bc->sending = NULL;
+ bc->lock = NULL;
+
+ //ks_pool_free(bc->pool, &bc->id);
+ bc->id = NULL;
+ break;
+ }
+}
+
+KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, blade_handle_t *bh)
{
blade_connection_t *bc = NULL;
ks_pool_t *pool = NULL;
ks_assert(bcP);
ks_assert(bh);
- ks_assert(transport_callbacks);
- pool = blade_handle_pool_get(bh);
+ ks_pool_open(&pool);
+ ks_assert(pool);
bc = ks_pool_alloc(pool, sizeof(blade_connection_t));
bc->handle = bh;
bc->pool = pool;
- bc->transport_init_data = transport_init_data;
- bc->transport_callbacks = transport_callbacks;
ks_uuid(&id);
bc->id = ks_uuid_str(pool, &id);
ks_q_create(&bc->sending, pool, 0);
ks_assert(bc->sending);
- *bcP = bc;
+ ks_assert(ks_pool_set_cleanup(pool, bc, NULL, blade_connection_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n");
+ *bcP = bc;
+
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
{
blade_connection_t *bc = NULL;
+ ks_pool_t *pool = NULL;
ks_assert(bcP);
ks_assert(*bcP);
bc = *bcP;
- blade_connection_shutdown(bc);
-
- ks_q_destroy(&bc->sending);
-
- ks_rwl_destroy(&bc->lock);
-
- ks_pool_free(bc->pool, &bc->id);
-
- ks_pool_free(bc->pool, bcP);
+ pool = bc->pool;
+ //ks_pool_free(bc->pool, bcP);
+ ks_pool_close(&pool);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
+ *bcP = NULL;
+
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction)
{
+ blade_handle_t *bh = NULL;
+
ks_assert(bc);
+ bh = blade_connection_handle_get(bc);
+
bc->direction = direction;
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
- if (ks_thread_create_ex(&bc->state_thread,
- blade_connection_state_thread,
- bc,
- KS_THREAD_FLAG_DEFAULT,
- KS_THREAD_DEFAULT_STACK,
- KS_PRI_NORMAL,
- bc->pool) != KS_STATUS_SUCCESS) {
+ if (ks_thread_pool_add_job(blade_handle_tpool_get(bh), blade_connection_state_thread, bc) != KS_STATUS_SUCCESS) {
// @todo error logging
return KS_STATUS_FAIL;
}
ks_assert(bc);
- if (bc->state_thread) {
- bc->shutdown = KS_TRUE;
- ks_thread_join(bc->state_thread);
- ks_pool_free(bc->pool, &bc->state_thread);
- bc->shutdown = KS_FALSE;
- }
-
- if (bc->session) ks_pool_free(bc->pool, &bc->session);
+ blade_handle_connections_remove(bc);
while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
}
-KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc)
-{
- ks_assert(bc);
-
- return bc->transport_init_data;
-}
-
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
{
ks_assert(bc);
return bc->transport_data;
}
-KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data)
+KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data, blade_transport_callbacks_t *transport_callbacks)
{
ks_assert(bc);
+ ks_assert(transport_data);
+ ks_assert(transport_callbacks);
bc->transport_data = transport_data;
+ bc->transport_callbacks = transport_callbacks;
}
blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state)
{
blade_connection_t *bc = NULL;
blade_connection_state_t state;
+ ks_bool_t shutdown = KS_FALSE;
ks_assert(thread);
ks_assert(data);
bc = (blade_connection_t *)data;
- while (!bc->shutdown) {
+ while (!shutdown) {
state = bc->state;
switch (state) {
case BLADE_CONNECTION_STATE_DISCONNECT:
blade_connection_state_on_disconnect(bc);
+ shutdown = KS_TRUE;
break;
case BLADE_CONNECTION_STATE_NEW:
blade_connection_state_on_new(bc);
break;
default: break;
}
-
- if (state == BLADE_CONNECTION_STATE_DISCONNECT) break;
}
+ blade_connection_destroy(&bc);
+
return NULL;
}
if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
- else ks_sleep_ms(1);
return KS_STATUS_SUCCESS;
}
props_participant = cJSON_GetObjectItem(props, "blade.chat.participant");
if (props_participant && props_participant->type == cJSON_True) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to join chat but is already a participant\n", blade_session_id_get(bs));
- blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -10000, "Already a participant of chat");
+ blade_rpc_error_create(&res, NULL, breq->message_id, -10000, "Already a participant of chat");
} else {
ks_log(KS_LOG_DEBUG, "Session (%s) joined chat\n", blade_session_id_get(bs));
ks_list_append(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and cleanup when removed
- blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
+ blade_rpc_response_create(&res, NULL, breq->message_id);
// @todo create an event to send to participants when a session joins and leaves, send after main response though
}
props_participant = cJSON_GetObjectItem(props, "blade.chat.participant");
if (!props_participant || props_participant->type == cJSON_False) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to leave chat but is not a participant\n", blade_session_id_get(bs));
- blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -10000, "Not a participant of chat");
+ blade_rpc_error_create(&res, NULL, breq->message_id, -10000, "Not a participant of chat");
} else {
ks_log(KS_LOG_DEBUG, "Session (%s) left chat\n", blade_session_id_get(bs));
ks_list_delete(bm_chat->participants, blade_session_id_get(bs)); // @todo make copy of session id instead and search manually, also free the id
- blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
+ blade_rpc_response_create(&res, NULL, breq->message_id);
// @todo create an event to send to participants when a session joins and leaves, send after main response though
}
params = cJSON_GetObjectItem(breq->message, "params"); // @todo cache this in blade_request_t for quicker/easier access
if (!params) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'params' object\n", blade_session_id_get(bs));
- blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -32602, "Missing params object");
+ blade_rpc_error_create(&res, NULL, breq->message_id, -32602, "Missing params object");
} else if (!(message = cJSON_GetObjectCstr(params, "message"))) {
ks_log(KS_LOG_DEBUG, "Session (%s) attempted to send chat message with no 'message'\n", blade_session_id_get(bs));
- blade_rpc_error_create(breq->pool, &res, NULL, breq->message_id, -32602, "Missing params message string");
+ blade_rpc_error_create(&res, NULL, breq->message_id, -32602, "Missing params message string");
}
bs = blade_handle_sessions_get(breq->handle, breq->session_id);
ks_assert(bs);
if (!res) {
- blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
+ blade_rpc_response_create(&res, NULL, breq->message_id);
sendevent = KS_TRUE;
}
blade_session_send(bs, res, NULL);
cJSON_Delete(res);
if (sendevent) {
- blade_rpc_event_create(breq->pool, &event, &res, "blade.chat.message");
+ blade_rpc_event_create(&event, &res, "blade.chat.message");
ks_assert(event);
cJSON_AddStringToObject(res, "from", breq->session_id); // @todo should really be the identity, but we don't have that in place yet
cJSON_AddStringToObject(res, "message", message);
typedef struct blade_module_wss_s blade_module_wss_t;
typedef struct blade_transport_wss_s blade_transport_wss_t;
-typedef struct blade_transport_wss_init_s blade_transport_wss_init_t;
struct blade_module_wss_s {
blade_handle_t *handle;
ks_thread_t *listeners_thread;
struct pollfd *listeners_poll;
int32_t listeners_count;
-
- ks_list_t *connected; // @todo consider keeping this only as the list of connection id's, since the handle retains the pointer lookup
};
struct blade_transport_wss_s {
blade_module_wss_t *module;
ks_pool_t *pool;
+ const char *session_id;
ks_socket_t sock;
kws_t *kws;
};
-struct blade_transport_wss_init_s {
- blade_module_wss_t *module;
- ks_pool_t *pool;
-
- ks_socket_t sock;
- const char *session_id;
-};
-
ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh);
-ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
-ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
+ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
+//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
-ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
-
-
-ks_bool_t blade_test_echo_request_handler(blade_module_t *bm, blade_request_t *breq);
-ks_bool_t blade_test_echo_response_handler(blade_response_t *bres);
-
-
static blade_module_callbacks_t g_module_wss_callbacks =
{
blade_module_wss_on_load,
bm_wss->module_callbacks = &g_module_wss_callbacks;
bm_wss->transport_callbacks = &g_transport_wss_callbacks;
- ks_list_create(&bm_wss->connected, pool);
- ks_assert(bm_wss->connected);
+ ks_log(KS_LOG_DEBUG, "Created\n");
*bm_wssP = bm_wss;
- ks_log(KS_LOG_DEBUG, "Created\n");
-
return KS_STATUS_SUCCESS;
}
blade_module_destroy(&bm_wss->module);
- ks_list_destroy(&bm_wss->connected);
-
ks_pool_free(bm_wss->pool, bm_wssP);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id)
-{
- blade_transport_wss_init_t *bt_wssi = NULL;
-
- ks_assert(bt_wssiP);
- ks_assert(bm_wss);
- ks_assert(sock != KS_SOCK_INVALID);
-
- bt_wssi = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_init_t));
- bt_wssi->module = bm_wss;
- bt_wssi->pool = bm_wss->pool;
- bt_wssi->sock = sock;
- if (session_id) bt_wssi->session_id = ks_pstrdup(bt_wssi->pool, session_id);
-
- *bt_wssiP = bt_wssi;
-
- ks_log(KS_LOG_DEBUG, "Created\n");
-
- return KS_STATUS_SUCCESS;
-}
-
-ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP)
-{
- blade_transport_wss_init_t *bt_wssi = NULL;
-
- ks_assert(bt_wssiP);
- ks_assert(*bt_wssiP);
-
- bt_wssi = *bt_wssiP;
-
- if (bt_wssi->session_id) ks_pool_free(bt_wssi->pool, &bt_wssi->session_id);
-
- ks_pool_free(bt_wssi->pool, bt_wssiP);
-
- ks_log(KS_LOG_DEBUG, "Destroyed\n");
-
- return KS_STATUS_SUCCESS;
-}
-
ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t *config)
{
config_setting_t *wss = NULL;
KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config)
{
blade_module_wss_t *bm_wss = NULL;
- blade_space_t *space = NULL;
- blade_method_t *method = NULL;
ks_assert(bm);
ks_assert(config);
}
}
- if (ks_thread_create_ex(&bm_wss->listeners_thread,
+ if (bm_wss->listeners_count > 0 &&
+ ks_thread_create_ex(&bm_wss->listeners_thread,
blade_module_wss_listeners_thread,
bm_wss,
KS_THREAD_FLAG_DEFAULT,
blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
-
- blade_space_create(&space, bm_wss->handle, bm, "blade.test");
- ks_assert(space);
-
- blade_method_create(&method, space, "echo", blade_test_echo_request_handler);
- ks_assert(method);
-
- blade_space_methods_add(space, method);
-
- blade_handle_space_register(space);
-
ks_log(KS_LOG_DEBUG, "Started\n");
return KS_STATUS_SUCCESS;
KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
{
blade_module_wss_t *bm_wss = NULL;
- blade_connection_t *bc = NULL;
ks_bool_t stopped = KS_FALSE;
ks_assert(bm);
bm_wss->listeners_count = 0;
if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
- if (ks_list_size(bm_wss->connected) > 0) {
- // this approach to shutdown is cleaner, ensures connections will detach from sessions and be destroyed all in the same places
- ks_list_iterator_start(bm_wss->connected);
- while (ks_list_iterator_hasnext(bm_wss->connected)) {
- bc = (blade_connection_t *)ks_list_iterator_next(bm_wss->connected);
- blade_connection_disconnect(bc);
- }
- ks_list_iterator_stop(bm_wss->connected);
- while (ks_list_size(bm_wss->connected) > 0) ks_sleep_ms(100);
- }
-
if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n");
return KS_STATUS_SUCCESS;
void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
{
blade_module_wss_t *bm_wss = NULL;
- blade_transport_wss_init_t *bt_wss_init = NULL;
+ blade_transport_wss_t *bt_wss = NULL;
blade_connection_t *bc = NULL;
ks_assert(thread);
ks_log(KS_LOG_DEBUG, "Started\n");
while (!bm_wss->shutdown) {
+ //if (bm_wss->listeners_count == 0) {
+ // ks_sleep_ms(500);
+ // continue;
+ //}
// @todo take exact timeout from a setting in config_wss_endpoints
if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) {
for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
ks_log(KS_LOG_DEBUG, "Socket accepted\n", index);
- blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, NULL);
- ks_assert(bt_wss_init);
-
- blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
+ // @todo make new function to wrap the following code all the way through assigning initial state to reuse in outbound connects
+ blade_connection_create(&bc, bm_wss->handle);
ks_assert(bc);
+ blade_transport_wss_create(&bt_wss, blade_connection_pool_get(bc), bm_wss, sock, NULL);
+ ks_assert(bt_wss);
+
+ blade_connection_transport_set(bc, bt_wss, bm_wss->transport_callbacks);
+
blade_connection_read_lock(bc, KS_TRUE);
if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
+ blade_connection_read_unlock(bc);
blade_connection_destroy(&bc);
- blade_transport_wss_init_destroy(&bt_wss_init);
- ks_socket_close(&sock);
continue;
}
ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
blade_handle_connections_add(bc);
- ks_list_append(bm_wss->connected, bc);
+
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
blade_connection_read_unlock(bc);
+ // @todo end of reusable function, lock ensures it cannot be destroyed until this code finishes
}
}
}
return NULL;
}
+static void blade_transport_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+ blade_transport_wss_t *bt_wss = (blade_transport_wss_t *)ptr;
+ ks_assert(bt_wss);
-ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock)
+ switch (action) {
+ case KS_MPCL_ANNOUNCE:
+ break;
+ case KS_MPCL_TEARDOWN:
+ if (bt_wss->session_id) ks_pool_free(bt_wss->pool, &bt_wss->session_id);
+ if (bt_wss->kws) kws_destroy(&bt_wss->kws);
+ else ks_socket_close(&bt_wss->sock);
+ break;
+ case KS_MPCL_DESTROY:
+ break;
+ }
+}
+
+
+ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id)
{
blade_transport_wss_t *bt_wss = NULL;
ks_assert(bm_wss);
ks_assert(sock != KS_SOCK_INVALID);
- bt_wss = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_t));
+ bt_wss = ks_pool_alloc(pool, sizeof(blade_transport_wss_t));
bt_wss->module = bm_wss;
- bt_wss->pool = bm_wss->pool;
+ bt_wss->pool = pool;
bt_wss->sock = sock;
+ if (session_id) bt_wss->session_id = ks_pstrdup(pool, session_id);
- *bt_wssP = bt_wss;
+ ks_assert(ks_pool_set_cleanup(pool, bt_wss, NULL, blade_transport_wss_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n");
- return KS_STATUS_SUCCESS;
-}
-
-ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
-{
- blade_transport_wss_t *bt_wss = NULL;
-
- ks_assert(bt_wssP);
- ks_assert(*bt_wssP);
-
- bt_wss = *bt_wssP;
-
- if (bt_wss->kws) kws_destroy(&bt_wss->kws);
- else ks_socket_close(&bt_wss->sock);
-
- ks_pool_free(bt_wss->pool, bt_wssP);
-
- ks_log(KS_LOG_DEBUG, "Destroyed\n");
+ *bt_wssP = bt_wss;
return KS_STATUS_SUCCESS;
}
+//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
+//{
+// blade_transport_wss_t *bt_wss = NULL;
+//
+// ks_assert(bt_wssP);
+// ks_assert(*bt_wssP);
+//
+// bt_wss = *bt_wssP;
+//
+// ks_pool_free(bt_wss->pool, bt_wssP);
+//
+// ks_log(KS_LOG_DEBUG, "Destroyed\n");
+//
+// *bt_wssP = NULL;
+//
+// return KS_STATUS_SUCCESS;
+//}
+
ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
{
ks_status_t ret = KS_STATUS_SUCCESS;
const char *ip = NULL;
const char *portstr = NULL;
ks_port_t port = 1234;
- blade_transport_wss_init_t *bt_wss_init = NULL;
+ blade_transport_wss_t *bt_wss = NULL;
blade_connection_t *bc = NULL;
ks_assert(bcP);
ks_log(KS_LOG_DEBUG, "Socket connected\n");
- blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, session_id);
- ks_assert(bt_wss_init);
-
- blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
+ // @todo see above listener code, make reusable function for the following code
+ blade_connection_create(&bc, bm_wss->handle);
ks_assert(bc);
+ blade_transport_wss_create(&bt_wss, blade_connection_pool_get(bc), bm_wss, sock, session_id);
+ ks_assert(bt_wss);
+
+ blade_connection_transport_set(bc, bt_wss, bm_wss->transport_callbacks);
+
+ blade_connection_read_lock(bc, KS_TRUE);
+
if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_OUTBOUND) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
+ blade_connection_read_unlock(bc);
blade_connection_destroy(&bc);
- blade_transport_wss_init_destroy(&bt_wss_init);
- ks_socket_close(&sock);
ret = KS_STATUS_FAIL;
goto done;
}
ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
- // @todo make sure it's sensible to be mixing outbound and inbound connections in the same list, but this allows entering the destruction pipeline
- // for module shutdown, disconnects and errors without special considerations
+
blade_handle_connections_add(bc);
- ks_list_append(bm_wss->connected, bc);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
+ blade_connection_read_unlock(bc);
+
+ // @todo consider ramification of unlocking above, while returning the new connection object back to the framework, thread might run and disconnect quickly
+ // @todo have blade_handle_connect and blade_transport_wss_on_connect (and the abstracted callback) return a copy of the connection id (allocated from blade_handle_t's pool temporarily) rather than the connection pointer itself
+ // which will then require getting the connection and thus relock it for any further use, if it disconnects during that time the connection will be locked preventing obtaining and then return NULL if removed
*bcP = bc;
done:
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- blade_rpc_error_create(blade_connection_pool_get(bc), &json, NULL, id, code, message);
+ blade_rpc_error_create(&json, NULL, id, code, message);
if (blade_transport_wss_write(bt_wss, json) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Failed to write error message\n");
blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
- blade_transport_wss_t *bt_wss = NULL;
- blade_transport_wss_init_t *bt_wss_init = NULL;
+ //blade_transport_wss_t *bt_wss = NULL;
ks_assert(bc);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
- bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-
- ks_list_delete(bt_wss->module->connected, bc);
+ //bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
- if (bt_wss) blade_transport_wss_destroy(&bt_wss); // @TODO: Scream at this very loudly until I feel better for it wasting 2 days to track down, and then fix the issue it's causing
+ //blade_transport_wss_destroy(&bt_wss);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
- blade_transport_wss_t *bt_wss = NULL;
- blade_transport_wss_init_t *bt_wss_init = NULL;
-
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
- bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-
- blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
- ks_assert(bt_wss);
-
- blade_connection_transport_set(bc, bt_wss);
-
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
- blade_transport_wss_t *bt_wss = NULL;
- blade_transport_wss_init_t *bt_wss_init = NULL;
-
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
- bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
-
- blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
- ks_assert(bt_wss);
-
- blade_connection_transport_set(bc, bt_wss);
-
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
{
blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
blade_transport_wss_t *bt_wss = NULL;
- ks_pool_t *pool = NULL;
cJSON *json_req = NULL;
cJSON *json_res = NULL;
cJSON *json_params = NULL;
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- pool = blade_connection_pool_get(bc);
-
// @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
while (blade_transport_wss_read(bt_wss, &json_req) == KS_STATUS_SUCCESS) {
blade_handle_sessions_add(bs);
}
- // @todo wrapper to generate request and response
- blade_rpc_response_create(pool, &json_res, &json_result, id);
+ blade_rpc_response_create(&json_res, &json_result, id);
ks_assert(json_res);
cJSON_AddStringToObject(json_result, "session-id", blade_session_id_get(bs));
- // @todo send response
if (blade_transport_wss_write(bt_wss, json_res) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Failed to write response message\n");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
blade_handle_t *bh = NULL;
blade_transport_wss_t *bt_wss = NULL;
- blade_transport_wss_init_t *bt_wss_init = NULL;
ks_pool_t *pool = NULL;
cJSON *json_req = NULL;
cJSON *json_params = NULL;
bh = blade_connection_handle_get(bc);
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
- pool = blade_connection_pool_get(bc);
+ pool = blade_handle_pool_get(bh);
blade_rpc_request_create(pool, &json_req, &json_params, &mid, "blade.session.attach");
ks_assert(json_req);
- if (bt_wss_init->session_id) cJSON_AddStringToObject(json_params, "session-id", bt_wss_init->session_id);
+ if (bt_wss->session_id) cJSON_AddStringToObject(json_params, "session-id", bt_wss->session_id);
- ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss_init->session_id ? bt_wss_init->session_id : "none"));
+ ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss->session_id ? bt_wss->session_id : "none"));
if (blade_transport_wss_write(bt_wss, json_req) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Failed to write request message\n");
ks_assert(bc);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) {
+ blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
- //cJSON *req = NULL;
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
- bs = blade_handle_sessions_get(blade_connection_handle_get(bc), blade_connection_session_get(bc));
- ks_assert(bs);
+ bh = blade_connection_handle_get(bc);
+ ks_assert(bh);
- //blade_rpc_request_create(blade_connection_pool_get(bc), &req, NULL, NULL, "blade.test.echo");
- //blade_session_send(bs, req, blade_test_echo_response_handler);
+ bs = blade_handle_sessions_get(bh, blade_connection_session_get(bc));
+ ks_assert(bs);
blade_session_read_unlock(bs);
}
}
-
-ks_bool_t blade_test_echo_request_handler(blade_module_t *bm, blade_request_t *breq)
-{
- blade_session_t *bs = NULL;
- cJSON *res = NULL;
-
- ks_assert(bm);
- ks_assert(breq);
-
- ks_log(KS_LOG_DEBUG, "Request Received!\n");
-
- bs = blade_handle_sessions_get(breq->handle, breq->session_id);
- ks_assert(bs);
-
- blade_rpc_response_create(breq->pool, &res, NULL, breq->message_id);
- blade_session_send(bs, res, NULL);
-
- blade_session_read_unlock(bs);
-
- return KS_FALSE;
-}
-
-ks_bool_t blade_test_echo_response_handler(blade_response_t *bres)
-{
- ks_assert(bres);
-
- ks_log(KS_LOG_DEBUG, "Response Received!\n");
-
- return KS_FALSE;
-}
-
/* For Emacs:
* Local Variables:
* mode:c
config_setting_t *config_directory;
config_setting_t *config_datastore;
- ks_thread_t *worker_thread;
- ks_bool_t shutdown;
-
ks_hash_t *transports; // registered transports exposed by modules, NOT active connections
ks_hash_t *spaces; // registered method spaces exposed by modules
// registered event callback registry
ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id
};
-void *blade_handle_worker_thread(ks_thread_t *thread, void *data);
-
typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t;
struct blade_handle_transport_registration_s {
ks_pool_t *pool;
// @todo load internal modules, call onload and onstartup
- if (ks_thread_create_ex(&bh->worker_thread,
- blade_handle_worker_thread,
- bh,
- KS_THREAD_FLAG_DEFAULT,
- KS_THREAD_DEFAULT_STACK,
- KS_PRI_NORMAL,
- bh->pool) != KS_STATUS_SUCCESS) {
- // @todo error logging
- return KS_STATUS_FAIL;
- }
-
return KS_STATUS_SUCCESS;
}
ks_assert(bh);
- while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
- void *key = NULL;
- blade_request_t *value = NULL;
-
- ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
- ks_hash_remove(bh->requests, key);
+ // @todo call onshutdown for internal modules
- blade_request_destroy(&value);
- // @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
- }
+ // @todo repeat the same as below for connections, this will catch all including those that have not yet been attached to a session for edge case cleanup
+ ks_hash_read_lock(bh->sessions);
for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
blade_session_t *value = NULL;
blade_session_hangup(value);
}
+ ks_hash_read_unlock(bh->sessions);
while (ks_hash_count(bh->sessions) > 0) ks_sleep_ms(100);
- // @todo unload internal modules, call onshutdown and onunload
+
+ // @todo call onunload for internal modules
+
+ while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
+ void *key = NULL;
+ blade_request_t *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
+ ks_hash_remove(bh->requests, key);
+
+ blade_request_destroy(&value);
+ }
while ((it = ks_hash_first(bh->events, KS_UNLOCKED))) {
void *key = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
blade_handle_event_unregister(bh, (const char *)key);
- // @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
}
while ((it = ks_hash_first(bh->spaces, KS_UNLOCKED))) {
ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
blade_handle_space_unregister(value);
- // @todo note to self, fix this when switching to auto cleanup, as hash invalidates iterator when removing
}
// @todo unload DSOs
if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
- if (bh->worker_thread) {
- bh->shutdown = KS_TRUE;
- ks_thread_join(bh->worker_thread);
- ks_pool_free(bh->pool, &bh->worker_thread);
- bh->shutdown = KS_FALSE;
- }
-
return KS_STATUS_SUCCESS;
}
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
}
-void *blade_handle_worker_thread(ks_thread_t *thread, void *data)
-{
- blade_handle_t *bh = NULL;
- blade_connection_t *bc = NULL;
- //blade_session_t *bs = NULL;
- ks_hash_iterator_t *it = NULL;
- ks_q_t *cleanup = NULL;
-
- ks_assert(thread);
- ks_assert(data);
-
- bh = (blade_handle_t *)data;
-
- ks_q_create(&cleanup, bh->pool, 0);
- ks_assert(cleanup);
-
- while (!bh->shutdown) {
- ks_hash_write_lock(bh->connections);
- for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- void *key = NULL;
- blade_connection_t *value = NULL;
-
- ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
-
- if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value);
- }
- ks_hash_write_unlock(bh->connections);
-
- while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) {
- blade_handle_connections_remove(bc);
- blade_connection_destroy(&bc);
- }
-
- //ks_hash_write_lock(bh->sessions);
- //for (it = ks_hash_first(bh->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- // void *key = NULL;
- // blade_session_t *value = NULL;
-
- // ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
-
- // if (blade_session_state_get(value) == BLADE_SESSION_STATE_CLEANUP) ks_q_push(cleanup, value);
- //}
- //ks_hash_write_unlock(bh->sessions);
-
- //while (ks_q_trypop(cleanup, (void **)&bs) == KS_STATUS_SUCCESS) {
- // blade_handle_sessions_remove(bs);
- // blade_session_destroy(&bs);
- //}
-
- ks_sleep_ms(500);
- }
-
- return NULL;
-}
-
/* For Emacs:
* Local Variables:
* mode:c