if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
}
+KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc)
+{
+ ks_assert(bc);
+ return bc->state;
+}
+
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
{
ks_assert(bc);
ks_assert(bc);
- blade_handle_connections_remove(bc);
-
callback = blade_connection_state_callback_lookup(bc, BLADE_CONNECTION_STATE_DISCONNECT);
if (callback) callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
- blade_connection_destroy(&bc);
+ blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CLEANUP);
return KS_STATUS_SUCCESS;
}
config_setting_t *config_directory;
config_setting_t *config_datastore;
+ ks_thread_t *worker_thread;
+ ks_bool_t shutdown;
+
ks_hash_t *transports; // registered transports exposed by modules, NOT active connections
ks_hash_t *spaces; // registered method spaces exposed by modules
// registered event callback registry
ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id
};
+void *blade_handle_worker_thread(ks_thread_t *thread, void *data);
typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t;
struct blade_handle_transport_registration_s {
// @todo call onload and onstartup callbacks for modules from DSOs
+ if (ks_thread_create_ex(&bh->worker_thread,
+ blade_handle_worker_thread,
+ bh,
+ KS_THREAD_FLAG_DEFAULT,
+ KS_THREAD_DEFAULT_STACK,
+ KS_PRI_NORMAL,
+ bh->pool) != KS_STATUS_SUCCESS) {
+ // @todo error logging
+ return KS_STATUS_FAIL;
+ }
+
return KS_STATUS_SUCCESS;
}
ks_assert(bh);
+ if (bh->worker_thread) {
+ bh->shutdown = KS_TRUE;
+ ks_thread_join(bh->worker_thread);
+ ks_pool_free(bh->pool, &bh->worker_thread);
+ bh->shutdown = KS_FALSE;
+ }
+
while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) {
void *key = NULL;
blade_request_t *value = NULL;
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
}
+void *blade_handle_worker_thread(ks_thread_t *thread, void *data)
+{
+ blade_handle_t *bh = NULL;
+ blade_connection_t *bc = NULL;
+ ks_hash_iterator_t *it = NULL;
+ ks_q_t *cleanup = NULL;
+
+ ks_assert(thread);
+ ks_assert(data);
+
+ bh = (blade_handle_t *)data;
+
+ ks_q_create(&cleanup, bh->pool, 0);
+ ks_assert(cleanup);
+
+ while (!bh->shutdown) {
+ ks_hash_write_lock(bh->connections);
+ for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ blade_connection_t *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
+
+ if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value);
+ }
+ ks_hash_write_unlock(bh->connections);
+
+ while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) {
+ blade_handle_connections_remove(bc);
+ blade_connection_destroy(&bc);
+ }
+
+ ks_sleep_ms(500);
+ }
+
+ return NULL;
+}
/* For Emacs:
* Local Variables:
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data);
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
+KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc);
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json);