ks_pool_t *pool;
const char *uri;
- // @todo breakdown of uri into constituent parts
+
+ const char *components;
+ const char *name;
+ const char *domain;
+ const char *resource;
+ ks_hash_t *parameters;
};
ks_assert(*biP);
bi = *biP;
+ if (bi->uri) {
+ ks_pool_free(bi->pool, &bi->uri);
+ ks_pool_free(bi->pool, &bi->components);
+ }
+ if (bi->parameters) ks_hash_destroy(&bi->parameters);
ks_pool_free(bi->pool, biP);
KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *uri)
{
+ char *tmp = NULL;
+ char *tmp2 = NULL;
+
ks_assert(bi);
ks_assert(uri);
- if (bi->uri) ks_pool_free(bi->pool, &bi->uri);
+ if (bi->uri) {
+ ks_pool_free(bi->pool, &bi->uri);
+ ks_pool_free(bi->pool, &bi->components);
+ }
bi->uri = ks_pstrdup(bi->pool, uri);
+ bi->components = tmp = ks_pstrdup(bi->pool, uri);
- // @todo parse into components
+ bi->name = tmp;
+ if (!(tmp = strchr(tmp, '@'))) return KS_STATUS_FAIL;
+ *tmp++ = '\0';
+ bi->domain = tmp2 = tmp;
+ if ((tmp = strchr(tmp, '/'))) {
+ *tmp++ = '\0';
+ bi->resource = tmp2 = tmp;
+ } else tmp = tmp2;
+
+ if ((tmp = strchr(tmp, '?'))) {
+ *tmp++ = '\0';
+
+ while (tmp) {
+ char *key = tmp;
+ char *val = NULL;
+ if (!(tmp = strchr(tmp, '='))) return KS_STATUS_FAIL;
+ *tmp++ = '\0';
+ val = tmp;
+ if ((tmp = strchr(tmp, '&'))) {
+ *tmp++ = '\0';
+ }
+
+ if (!bi->parameters) {
+ ks_hash_create(&bi->parameters, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bi->pool);
+ ks_assert(bi->parameters);
+ }
+ ks_hash_insert(bi->parameters, key, val);
+ }
+ }
+
return KS_STATUS_SUCCESS;
}
static blade_transport_callbacks_t g_transport_wss_callbacks =
{
+ "wss",
+
blade_transport_wss_on_connect,
blade_transport_wss_on_rank,
blade_transport_wss_on_send,
bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
- // @todo register wss transport to the blade_handle_t
-
if (blade_module_wss_config(bm_wss, config) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "blade_module_wss_config failed\n");
return KS_STATUS_FAIL;
KS_PRI_NORMAL,
bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ blade_handle_transport_register(bm_wss->handle, bm_wss->transport_callbacks);
+
return KS_STATUS_SUCCESS;
}
bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
- // @todo unregister wss transport from the blade_handle_t
+ blade_handle_transport_unregister(bm_wss->handle, bm_wss->transport_callbacks);
if (bm_wss->listeners_thread) {
bm_wss->shutdown = KS_TRUE;
config_setting_t *config_service;
config_setting_t *config_datastore;
+ ks_hash_t *transports;
ks_q_t *messages_discarded;
blade_datastore_t *datastore;
ks_q_destroy(&bh->messages_discarded);
}
+ ks_hash_destroy(&bh->transports);
+
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
bh->pool = pool;
bh->tpool = tpool;
+ ks_hash_create(&bh->transports, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool);
+ ks_assert(bh->transports);
+
// @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages
ks_q_create(&bh->messages_discarded, bh->pool, 100);
ks_assert(bh->messages_discarded);
return bh->tpool;
}
+KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_transport_callbacks_t *callbacks)
+{
+ ks_assert(bh);
+ ks_assert(callbacks);
+
+ ks_hash_write_lock(bh->transports);
+ ks_hash_insert(bh->transports, (void *)callbacks->name, callbacks);
+ ks_hash_write_unlock(bh->transports);
+
+ ks_log(KS_LOG_DEBUG, "Transport Registered: %s\n", callbacks->name);
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, blade_transport_callbacks_t *callbacks)
+{
+ ks_assert(bh);
+ ks_assert(callbacks);
+
+ ks_hash_write_lock(bh->transports);
+ ks_hash_remove(bh->transports, (void *)callbacks->name);
+ ks_hash_write_unlock(bh->transports);
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target)
+{
+ ks_assert(bh);
+ ks_assert(target);
+
+ ks_hash_read_lock(bh->transports);
+ // @todo find transport for target, check if target specifies explicit transport parameter first, otherwise use onrank and keep highest ranked callbacks
+ ks_hash_read_unlock(bh->transports);
+
+ // transport_callbacks->onconnect(bcP, target);
+
+ return KS_STATUS_SUCCESS;
+}
+
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length)
{
blade_message_t *msg = NULL;
KS_DECLARE(ks_pool_t *) blade_handle_pool_get(blade_handle_t *bh);
KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
+KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_transport_callbacks_t *callbacks);
+KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, blade_transport_callbacks_t *callbacks);
+
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length);
KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message);
typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition);
struct blade_transport_callbacks_s {
+ const char *name;
+
blade_transport_connect_callback_t onconnect;
blade_transport_rank_callback_t onrank;
blade_transport_send_callback_t onsend;
blade:
{
+ identity = "directory@domain";
+ directory:
+ {
+ };
datastore:
{
database:
--- /dev/null
+blade:
+{
+ identity = "peer@domain";
+ directory:
+ {
+ uris = ( "directory@domain?transport=wss&host=127.0.0.1&port=2100" );
+ };
+ datastore:
+ {
+ database:
+ {
+ path = ":mem:";
+ };
+ };
+ wss:
+ {
+ endpoints:
+ {
+ ipv4 = ( { address = "0.0.0.0", port = 2101 } );
+ ipv6 = ( { address = "::", port = 2101 } );
+ backlog = 128;
+ };
+ # SSL group is optional, disabled when absent
+ ssl:
+ {
+ # todo: server SSL stuffs here
+ };
+ };
+};