ks_pool_t *pool;
// @todo how does "exclusive" play into the controllers, does "exclusive" mean only one provider can exist for a given protocol and realm? what does non exclusive mean?
- ks_hash_t *protocols; // protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate
- ks_hash_t *protocols_cleanup; // keyed by the nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup
+ ks_hash_t *protocols; // protocols that have been published with blade.publish, and the details to locate a protocol controller with blade.locate
};
ks_hash_create(&bmmgr->protocols, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bmmgr->pool);
ks_assert(bmmgr->protocols);
- ks_hash_create(&bmmgr->protocols_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bmmgr->pool);
- ks_assert(bmmgr->protocols_cleanup);
-
ks_pool_set_cleanup(pool, bmmgr, NULL, blade_mastermgr_cleanup);
*bmmgrP = bmmgr;
return bmmgr->handle;
}
+KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid)
+{
+ ks_hash_t *cleanup = NULL;
+
+ ks_hash_write_lock(bmmgr->protocols);
+ for (ks_hash_iterator_t *it = ks_hash_first(bmmgr->protocols, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const char *key = NULL;
+ blade_protocol_t *bp = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
+
+ if (blade_protocol_purge(bp, nodeid)) {
+ if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bmmgr->pool);
+ ks_hash_insert(cleanup, key, bp);
+ }
+ }
+ if (cleanup) {
+ for (ks_hash_iterator_t *it = ks_hash_first(cleanup, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const char *key = NULL;
+ blade_protocol_t *bp = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
+
+ ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
+ ks_hash_remove(bmmgr->protocols, key);
+ }
+ ks_hash_destroy(&cleanup);
+ }
+
+ ks_hash_write_unlock(bmmgr->protocols);
+
+ return KS_STATUS_SUCCESS;
+}
+
KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm)
{
blade_protocol_t *bp = NULL;
{
blade_protocol_t *bp = NULL;
char *key = NULL;
- ks_hash_t *cleanup = NULL;
ks_assert(bmmgr);
ks_assert(protocol);
ks_hash_insert(bmmgr->protocols, (void *)ks_pstrdup(bmmgr->pool, key), bp);
}
- cleanup = (ks_hash_t *)ks_hash_search(bmmgr->protocols_cleanup, (void *)controller, KS_UNLOCKED);
- if (!cleanup) {
- ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bmmgr->pool);
- ks_assert(cleanup);
-
- ks_hash_insert(bmmgr->protocols_cleanup, (void *)ks_pstrdup(bmmgr->pool, controller), cleanup);
- }
- ks_hash_insert(cleanup, (void *)ks_pstrdup(bmmgr->pool, key), (void *)KS_TRUE);
blade_protocol_controllers_add(bp, controller);
- ks_log(KS_LOG_DEBUG, "Protocol Controller Added: %s to %s\n", controller, key);
+
+ ks_pool_free(bmmgr->pool, &key);
ks_hash_write_unlock(bmmgr->protocols);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *controller)
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel)
{
- ks_hash_t *cleanup = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_protocol_t *bp = NULL;
+ char *key = NULL;
ks_assert(bmmgr);
- ks_assert(controller);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(channel);
- ks_hash_write_lock(bmmgr->protocols);
- cleanup = (ks_hash_t *)ks_hash_search(bmmgr->protocols_cleanup, (void *)controller, KS_UNLOCKED);
- if (cleanup) {
- for (ks_hash_iterator_t *it = ks_hash_first(cleanup, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- void *key = NULL;
- void *value = NULL;
- blade_protocol_t *bp = NULL;
- ks_hash_t *controllers = NULL;
+ key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
- ks_hash_this(it, (const void **)&key, NULL, &value);
+ bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+ if (!bp) {
+ ret = KS_STATUS_NOT_FOUND;
+ goto done;
+ }
- bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, key, KS_UNLOCKED);
- ks_assert(bp); // should not happen when a cleanup still has a provider tracked for a protocol
+ blade_protocol_channel_add(bp, channel);
- ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s\n", controller, key);
- blade_protocol_controllers_remove(bp, controller);
+done:
+ ks_pool_free(bmmgr->pool, &key);
- controllers = blade_protocol_controllers_get(bp);
- if (ks_hash_count(controllers) == 0) {
- // @note this depends on locking something outside of the protocol that won't be destroyed, like the top level
- // protocols hash, but assumes then that any reader keeps the top level hash read locked while using the protocol
- // so it cannot be deleted
- ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
- ks_hash_remove(bmmgr->protocols, key);
- }
- }
- ks_hash_remove(bmmgr->protocols_cleanup, (void *)controller);
+ ks_hash_read_unlock(bmmgr->protocols);
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_protocol_t *bp = NULL;
+ char *key = NULL;
+
+ ks_assert(bmmgr);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(channel);
+
+ key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
+
+ bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+ if (!bp) {
+ ret = KS_STATUS_NOT_FOUND;
+ goto done;
}
- ks_hash_write_unlock(bmmgr->protocols);
- return KS_STATUS_SUCCESS;
+ blade_protocol_channel_remove(bp, channel);
+
+done:
+ ks_pool_free(bmmgr->pool, &key);
+
+ ks_hash_read_unlock(bmmgr->protocols);
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_protocol_t *bp = NULL;
+ char *key = NULL;
+ //ks_hash_t *cleanup = NULL;
+
+ ks_assert(bmmgr);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(channel);
+ ks_assert(controller);
+ ks_assert(target);
+
+ key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
+
+ bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+ if (!bp) {
+ ret = KS_STATUS_NOT_FOUND;
+ goto done;
+ }
+
+ ret = blade_protocol_channel_authorize(bp, remove, channel, controller, target);
+
+done:
+ ks_pool_free(bmmgr->pool, &key);
+
+ ks_hash_read_unlock(bmmgr->protocols);
+
+ return ret;
+}
+
+KS_DECLARE(ks_bool_t) blade_mastermgr_channel_verify(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel, const char *target)
+{
+ ks_bool_t ret = KS_FALSE;
+ blade_protocol_t *bp = NULL;
+ char *key = NULL;
+ //ks_hash_t *cleanup = NULL;
+
+ ks_assert(bmmgr);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(channel);
+ ks_assert(target);
+
+ key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
+
+ bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+ if (!bp) goto done;
+
+ ret = blade_protocol_channel_verify(bp, channel, target);
+
+done:
+ ks_pool_free(bmmgr->pool, &key);
+
+ ks_hash_read_unlock(bmmgr->protocols);
+
+ return ret;
}
/* For Emacs:
const char *name;
const char *realm;
ks_hash_t *controllers;
+ ks_hash_t *channels;
// @todo descriptors (schema, etc) for each method within a protocol
};
if (bp->name) ks_pool_free(bp->pool, &bp->name);
if (bp->realm) ks_pool_free(bp->pool, &bp->realm);
if (bp->controllers) ks_hash_destroy(&bp->controllers);
+ if (bp->channels) ks_hash_destroy(&bp->channels);
break;
case KS_MPCL_DESTROY:
break;
bp->name = ks_pstrdup(pool, name);
bp->realm = ks_pstrdup(pool, realm);
- ks_hash_create(&bp->controllers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
+ ks_hash_create(&bp->controllers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
ks_assert(bp->controllers);
+ ks_hash_create(&bp->channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
+ ks_assert(bp->channels);
+
ks_pool_set_cleanup(pool, bp, NULL, blade_protocol_cleanup);
*bpP = bp;
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_hash_t *) blade_protocol_controllers_get(blade_protocol_t *bp)
+KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
+{
+ ks_bool_t ret = KS_FALSE;
+
+ ks_assert(bp);
+ ks_assert(nodeid);
+
+ // @todo iterate all channels, remove the nodeid from all authorized hashes
+ ks_hash_write_lock(bp->channels);
+ for (ks_hash_iterator_t *it = ks_hash_first(bp->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const char *key = NULL;
+ ks_hash_t *authorizations = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, (void **)&authorizations);
+
+ if (ks_hash_remove(authorizations, nodeid)) {
+ ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", nodeid, bp->name, bp->realm, key);
+ }
+ }
+ ks_hash_write_unlock(bp->channels);
+
+ ks_hash_write_lock(bp->controllers);
+ if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
+ ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
+ }
+ ret = ks_hash_count(bp->controllers) == 0;
+ ks_hash_write_unlock(bp->controllers);
+
+ return ret;
+}
+
+KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
{
+ cJSON *controllers = cJSON_CreateObject();
+
ks_assert(bp);
- return bp->controllers;
+ for (ks_hash_iterator_t *it = ks_hash_first(bp->controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const char *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, &value);
+
+ cJSON_AddItemToArray(controllers, cJSON_CreateString(key));
+ }
+
+ return controllers;
}
KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid)
key = ks_pstrdup(bp->pool, nodeid);
ks_hash_insert(bp->controllers, (void *)key, (void *)KS_TRUE);
+ ks_log(KS_LOG_DEBUG, "Protocol Controller Added: %s to %s@%s\n", nodeid, bp->name, bp->realm);
+
return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ ks_hash_t *authorized = NULL;
+ char *key = NULL;
+
+ ks_assert(bp);
+ ks_assert(name);
+
+ ks_hash_write_lock(bp->channels);
+ if (ks_hash_search(bp->channels, name, KS_UNLOCKED)) {
+ ret = KS_STATUS_DUPLICATE_OPERATION;
+ goto done;
+ }
+
+ ks_hash_create(&authorized, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
+
+ key = ks_pstrdup(bp->pool, name);
+ ks_hash_insert(bp->channels, (void *)key, (void *)authorized);
+
+ ks_log(KS_LOG_DEBUG, "Protocol Channel Added: %s to %s@%s\n", key, bp->name, bp->realm);
+
+done:
+
+ ks_hash_write_unlock(bp->channels);
+
+ return ret;
}
-KS_DECLARE(ks_status_t) blade_protocol_controllers_remove(blade_protocol_t *bp, const char *nodeid)
+KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
{
ks_assert(bp);
- ks_assert(nodeid);
+ ks_assert(name);
- ks_hash_remove(bp->controllers, (void *)nodeid);
+ ks_hash_remove(bp->channels, (void *)name);
+
+ ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ ks_hash_t *authorizations = NULL;
+ ks_bool_t allowed = KS_FALSE;
+
+ ks_assert(bp);
+ ks_assert(channel);
+ ks_assert(controller);
+ ks_assert(target);
+
+ allowed = (ks_bool_t)(intptr_t)ks_hash_search(bp->controllers, (void *)controller, KS_READLOCKED);
+ ks_hash_read_unlock(bp->controllers);
+
+ if (!allowed) {
+ ret = KS_STATUS_NOT_ALLOWED;
+ goto done;
+ }
+
+ // @todo verify controller, get ks_hash_t* value based on channel, add target to the channels hash
+ authorizations = (ks_hash_t *)ks_hash_search(bp->channels, (void *)channel, KS_READLOCKED);
+ if (authorizations) {
+ if (remove) {
+ if (ks_hash_remove(authorizations, (void *)target)) {
+ ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", target, bp->name, bp->realm, channel);
+ } else ret = KS_STATUS_NOT_FOUND;
+ }
+ else {
+ ks_hash_insert(authorizations, (void *)ks_pstrdup(bp->pool, target), (void *)KS_TRUE);
+ ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Added: %s to %s@%s/%s\n", target, bp->name, bp->realm, channel);
+ }
+ }
+ ks_hash_read_unlock(bp->channels);
+
+ if (!authorizations) ret = KS_STATUS_NOT_FOUND;
+
+done:
+ return ret;
+}
+
+KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target)
+{
+ ks_bool_t ret = KS_FALSE;
+ ks_hash_t *authorizations = NULL;
+
+ ks_assert(bp);
+ ks_assert(channel);
+ ks_assert(target);
+
+ // @todo verify controller, get ks_hash_t* value based on channel, add target to the channels hash
+ authorizations = (ks_hash_t *)ks_hash_search(bp->channels, (void *)channel, KS_READLOCKED);
+ if (authorizations) ret = ks_hash_search(authorizations, target, KS_UNLOCKED) != NULL;
+ ks_hash_read_unlock(bp->channels);
+ return ret;
}
/* For Emacs:
blade_handle_rpcregister(brmgr->handle, target, KS_TRUE, NULL, NULL);
+ blade_subscriptionmgr_purge(blade_handle_subscriptionmgr_get(brmgr->handle), target);
+
// @note protocols are cleaned up here because routes can be removed that are not locally connected with a session but still
// have protocols published to the master node from further downstream, in which case if a route is announced upstream to be
// removed, a master node is still able to catch that here even when there is no direct session, but is also hit when there
// is a direct session being terminated
- blade_mastermgr_controller_remove(blade_handle_mastermgr_get(brmgr->handle), target);
+ blade_mastermgr_purge(blade_handle_mastermgr_get(brmgr->handle), target);
return KS_STATUS_SUCCESS;
}
const char *realm;
blade_rpc_request_callback_t callback;
- void *data;
+ cJSON *data;
};
struct blade_rpc_request_s {
cJSON *message;
const char *message_id; // pulled from message for easier keying
blade_rpc_response_callback_t callback;
- void *data;
+ cJSON *data;
// @todo ttl to wait for response before injecting an error response locally
};
static void blade_rpc_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
- //blade_rpc_t *brpc = (blade_rpc_t *)ptr;
+ blade_rpc_t *brpc = (blade_rpc_t *)ptr;
- //ks_assert(brpc);
+ ks_assert(brpc);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
+ if (brpc->data) cJSON_Delete(brpc->data);
break;
case KS_MPCL_DESTROY:
break;
}
}
-KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data)
{
blade_rpc_t *brpc = NULL;
ks_pool_t *pool = NULL;
return brpc->callback;
}
-KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc)
+KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc)
{
ks_assert(brpc);
case KS_MPCL_TEARDOWN:
ks_pool_free(brpcreq->pool, (void **)&brpcreq->session_id);
cJSON_Delete(brpcreq->message);
+ if (brpcreq->data) cJSON_Delete(brpcreq->data);
break;
case KS_MPCL_DESTROY:
break;
const char *session_id,
cJSON *json,
blade_rpc_response_callback_t callback,
- void *data)
+ cJSON *data)
{
blade_rpc_request_t *brpcreq = NULL;
return brpcreq->callback;
}
-KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
+KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
{
ks_assert(brpcreq);
return brpcreq->data;
}
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data)
{
blade_rpc_request_t *brpcreq = NULL;
const char *method = NULL;
blade_session_send(bs_router, json, NULL, NULL);
blade_session_read_unlock(bs_router);
+ // @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which
+ // subscriptions should be removed along the request path, regardless of whether it's the consumer requesting or the
+ // master due to a deauthorization, without respect to waiting for the response as it should be a gaurenteed operation
+ // even when requested by the subscriber. This unsubscribed-channels field is simply treated as a special field, regardless
+ // of the actual method of the request.
+
return KS_STATUS_SUCCESS;
}
}
blade_session_send(bs_router, json, NULL, NULL);
blade_session_read_unlock(bs_router);
+ // @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a
+ // subscribed-channels field which should be added for the requester-nodeid, this will ensure the subscriptions are
+ // added with respect to the master response and not immediately upon request. This subscribed-channels field is
+ // simply treated as a special field, regardless of the actual method of the request which is unavailable here.
+
return KS_STATUS_SUCCESS;
}
}
ks_log(KS_LOG_DEBUG, "Session Removed: %s\n", id);
- blade_subscriptionmgr_subscriber_cleanup(blade_handle_subscriptionmgr_get(bsmgr->handle), id);
-
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), id)) {
blade_upstreammgr_localid_set(blade_handle_upstreammgr_get(bsmgr->handle), NULL);
blade_upstreammgr_masterid_set(blade_handle_upstreammgr_get(bsmgr->handle), NULL);
blade_sessionmgr_t *sessionmgr;
};
-ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data);
+ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
static void blade_handle_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
blade_rpc_create(&brpc, bh, "blade.publish", NULL, NULL, blade_rpcpublish_request_handler, NULL);
blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
+ blade_rpc_create(&brpc, bh, "blade.authorize", NULL, NULL, blade_rpcauthorize_request_handler, NULL);
+ blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
+
blade_rpc_create(&brpc, bh, "blade.locate", NULL, NULL, blade_rpclocate_request_handler, NULL);
blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
// which is important for implementation of blade.execute where errors can be relayed back to the requester properly
// blade.register request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
}
// blade.register request handler
-ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
// blade.publish request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
const char *id = NULL;
ks_assert(bh);
- ks_assert(name);
+ ks_assert(protocol);
ks_assert(realm);
// @todo consideration for the Master trying to publish a protocol, with no upstream
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.publish");
// fill in the req_params
- cJSON_AddStringToObject(req_params, "protocol", name);
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
cJSON_AddStringToObject(req_params, "responder-nodeid", id);
ks_pool_free(pool, &id);
+ // @todo may want to switch this system to use a blade_rpcpublish_args_t with validation on the contents on this list internally
+ // and to produce the entire json block internally in case the channel args change to include additional information like an encryption key,
+ // however if passing encryption keys then they should be asymetrically encrypted using a public key provided by the master so that
+ // the channel keys can be transmitted without intermediate nodes being able to snoop them
+ if (channels) cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
+
// @todo add a parameter containing a block of json for schema definitions for each of the methods being published
ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
}
// blade.publish request handler
-ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
+ cJSON *req_params_channels = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_requester_nodeid = NULL;
goto done;
}
+ req_params_channels = cJSON_GetObjectItem(req_params, "channels");
+ if (req_params_channels) {
+ int size = 0;
+
+ if (req_params_channels->type != cJSON_Array) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ size = cJSON_GetArraySize(req_params_channels);
+ for (int index = 0; index < size; ++index) {
+ cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
+ if (element->type != cJSON_String) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ }
+ }
+
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
+ if (req_params_channels) {
+ int size = cJSON_GetArraySize(req_params_channels);
+ for (int index = 0; index < size; ++index) {
+ cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
+ blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+ }
+ }
+
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
}
+// blade.authorize request generator
+KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_session_t *bs = NULL;
+ ks_pool_t *pool = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ const char *id = NULL;
+
+ ks_assert(bh);
+ ks_assert(nodeid);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(channels);
+
+ // @todo consideration for the Master trying to publish a protocol, with no upstream
+ if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
+ ret = KS_STATUS_DISCONNECTED;
+ goto done;
+ }
+
+ pool = blade_handle_pool_get(bh);
+ ks_assert(pool);
+
+ blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.authorize");
+
+ // fill in the req_params
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
+ cJSON_AddStringToObject(req_params, "realm", realm);
+ if (remove) cJSON_AddTrueToObject(req_params, "remove");
+ cJSON_AddStringToObject(req_params, "authorized-nodeid", nodeid);
+
+ blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
+ ks_assert(id);
+
+ cJSON_AddStringToObject(req_params, "requester-nodeid", id);
+ ks_pool_free(pool, &id);
+
+ blade_upstreammgr_masterid_copy(bh->upstreammgr, pool, &id);
+ ks_assert(id);
+
+ cJSON_AddStringToObject(req_params, "responder-nodeid", id);
+ ks_pool_free(pool, &id);
+
+ cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
+
+ // @todo add a parameter containing a block of json for schema definitions for each of the methods being published
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request started\n", blade_session_id_get(bs));
+
+ ret = blade_session_send(bs, req, callback, data);
+
+done:
+ if (req) cJSON_Delete(req);
+ if (bs) blade_session_read_unlock(bs);
+
+ return ret;
+}
+
+// blade.authorize request handler
+ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+{
+ blade_handle_t *bh = NULL;
+ blade_session_t *bs = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ cJSON *req_params_channels = NULL;
+ cJSON *req_params_remove = NULL;
+ cJSON *channel = NULL;
+ ks_bool_t remove = KS_FALSE;
+ const char *req_params_protocol = NULL;
+ const char *req_params_realm = NULL;
+ const char *req_params_authorized_nodeid = NULL;
+ const char *req_params_requester_nodeid = NULL;
+ const char *req_params_responder_nodeid = NULL;
+ cJSON *res = NULL;
+ cJSON *res_result = NULL;
+ cJSON *res_result_authorized_channels = NULL;
+ cJSON *res_result_unauthorized_channels = NULL;
+ cJSON *res_result_failed_channels = NULL;
+
+ ks_assert(brpcreq);
+
+ bh = blade_rpc_request_handle_get(brpcreq);
+ ks_assert(bh);
+
+ bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
+ ks_assert(bs);
+
+ req = blade_rpc_request_message_get(brpcreq);
+ ks_assert(req);
+
+ req_params = cJSON_GetObjectItem(req, "params");
+ if (!req_params) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'params' object\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+ if (!req_params_protocol) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'protocol'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+ if (!req_params_realm) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'realm'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_remove = cJSON_GetObjectItem(req_params, "remove");
+ if (req_params_remove && req_params_remove->type == cJSON_True) remove = KS_TRUE;
+
+ req_params_authorized_nodeid = cJSON_GetObjectCstr(req_params, "authorized-nodeid");
+ if (!req_params_authorized_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'authorized-nodeid'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params authorized-nodeid");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
+
+ req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
+ if (!req_params_requester_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'requester-nodeid'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
+ if (!req_params_responder_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'responder-nodeid'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_channels = cJSON_GetObjectItem(req_params, "channels");
+ if (!req_params_channels) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'channels'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ if (req_params_channels->type != cJSON_Array) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ cJSON_ArrayForEach(channel, req_params_channels) {
+ if (channel->type != cJSON_String) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ }
+
+ if (!blade_upstreammgr_masterid_compare(bh->upstreammgr, req_params_responder_nodeid)) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) authorize request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
+
+ // build the actual response finally
+ blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+
+ cJSON_ArrayForEach(channel, req_params_channels) {
+ if (blade_mastermgr_channel_authorize(bh->mastermgr, remove, req_params_protocol, req_params_realm, channel->valuestring, req_params_requester_nodeid, req_params_authorized_nodeid) == KS_STATUS_SUCCESS) {
+ if (remove) {
+ if (!res_result_unauthorized_channels) res_result_unauthorized_channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(res_result_unauthorized_channels, cJSON_CreateString(channel->valuestring));
+ // @todo unauthorizing channels should force a subscribe remove request for the target if they are subscribed, to prevent further events from reaching the target
+ // this will require the master node to invoke the subscription removal as opposed to the target who normally invokes subscribe
+ } else {
+ if (!res_result_authorized_channels) res_result_authorized_channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(res_result_authorized_channels, cJSON_CreateString(channel->valuestring));
+ }
+ } else {
+ if (!res_result_failed_channels) res_result_failed_channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(res_result_failed_channels, cJSON_CreateString(channel->valuestring));
+ }
+ }
+
+ cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+ cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+ cJSON_AddStringToObject(res_result, "authorized-nodeid", req_params_authorized_nodeid);
+ cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
+ cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
+ if (res_result_authorized_channels) cJSON_AddItemToObject(res_result, "authorized-channels", res_result_authorized_channels);
+ if (res_result_unauthorized_channels) cJSON_AddItemToObject(res_result, "unauthorized-channels", res_result_unauthorized_channels);
+ if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
+
+ // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+ blade_session_send(bs, res, NULL, NULL);
+
+done:
+
+ if (res) cJSON_Delete(res);
+ if (bs) blade_session_read_unlock(bs);
+
+ return KS_FALSE;
+}
+
+
// blade.locate request generator
// @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located
// to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting
// every protocol update to everyone which may actually be a better way than an explicit locate request
-KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
const char *id = NULL;
ks_assert(bh);
- ks_assert(name);
+ ks_assert(protocol);
ks_assert(realm);
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.locate");
// fill in the req_params
- cJSON_AddStringToObject(req_params, "protocol", name);
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
}
// blade.locate request handler
-ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
const char *req_params_responder_nodeid = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
- cJSON *res_result_controllers;
+ cJSON *res_result_controllers = NULL;
blade_protocol_t *bp = NULL;
ks_assert(brpcreq);
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
- res_result_controllers = cJSON_CreateObject();
-
bp = blade_mastermgr_protocol_lookup(bh->mastermgr, req_params_protocol, req_params_realm);
- if (bp) {
- ks_hash_t *controllers = blade_protocol_controllers_get(bp);
- for (ks_hash_iterator_t *it = ks_hash_first(controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const char *key = NULL;
- void *value = NULL;
-
- ks_hash_this(it, (const void **)&key, NULL, &value);
-
- cJSON_AddItemToArray(res_result_controllers, cJSON_CreateString(key));
- }
- }
+ if (bp) res_result_controllers = blade_protocol_controllers_pack(bp);
// build the actual response finally
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
- cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
+ if (res_result_controllers) cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
// blade.execute request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) execute request started\n", blade_session_id_get(bs));
- // @todo change what blade_rpc_request_t carries for tracking data, use a tuple instead which makes it
- // easier to free the tuple and potentially associated data if a request needs to be destroyed without
- // the callback being called to know that the data is a tuple to destroy, meanwhile a tuple offers a
- // spare pointer for universally wrapping callback + data pairs in a case like this
- // in which case do not create the tuple here, just pass 2 data pointers to send and let it store them
- // in the internal tuple
ret = blade_session_send(bs, req, callback, data);
done:
}
// blade.execute request handler
-ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
// blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
const char *localid = NULL;
- ks_bool_t propagate = KS_FALSE;
blade_subscription_t *bsub = NULL;
+ cJSON *temp_data = NULL;
ks_assert(bh);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
+ ks_assert(subscribe_channels || unsubscribe_channels);
+ // @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
blade_upstreammgr_localid_copy(bh->upstreammgr, bh->pool, &localid);
ks_assert(localid);
- if (remove) {
- propagate = blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, event, protocol, realm, localid);
- } else {
- propagate = blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, event, protocol, realm, localid);
- ks_assert(event_callback);
+ if (unsubscribe_channels) {
+ cJSON *channel = NULL;
+ cJSON_ArrayForEach(channel, unsubscribe_channels) {
+ blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, protocol, realm, channel->valuestring, localid);
+ }
}
- ks_pool_free(bh->pool, &localid);
- if (!remove && bsub) {
- blade_subscription_callback_set(bsub, event_callback);
- blade_subscription_callback_data_set(bsub, event_data);
- }
+ temp_data = cJSON_CreateObject();
+
+ if (callback) cJSON_AddItemToObject(temp_data, "callback", cJSON_CreatePtr((uintptr_t)callback));
+ if (data) cJSON_AddItemToObject(temp_data, "data", data);
- if (propagate) ret = blade_handle_rpcsubscribe_raw(bh, event, protocol, realm, remove, callback, data);
+ if (channel_callback) cJSON_AddItemToObject(temp_data, "channel-callback", cJSON_CreatePtr((uintptr_t)channel_callback));
+ if (channel_data) cJSON_AddItemToObject(temp_data, "channel-data", channel_data);
+
+ ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
+
+ ks_pool_free(bh->pool, &localid);
done:
if (bs) blade_session_read_unlock(bs);
return ret;
}
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
cJSON *req_params = NULL;
ks_assert(bh);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
+ ks_assert(subscribe_channels || unsubscribe_channels);
+ ks_assert(subscriber);
- if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
+ if (downstream) {
+ // @note if a master is sending a downstream update, it may only use unsubscribe_channels, cannot force a subscription without a subscriber callback
+ if (subscribe_channels) {
+ ret = KS_STATUS_NOT_ALLOWED;
+ goto done;
+ }
+ if (!(bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), subscriber))) {
+ ret = KS_STATUS_DISCONNECTED;
+ goto done;
+ }
+ }
+ else if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
- cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
- if (remove) cJSON_AddTrueToObject(req_params, "remove");
+ cJSON_AddStringToObject(req_params, "subscriber-nodeid", subscriber);
+ if (downstream) cJSON_AddTrueToObject(req_params, "downstream");
+
+ if (subscribe_channels) cJSON_AddItemToObject(req_params, "subscribe-channels", cJSON_Duplicate(subscribe_channels, 1));
+ if (unsubscribe_channels) cJSON_AddItemToObject(req_params, "unsubscribe-channels", cJSON_Duplicate(unsubscribe_channels, 1));
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
- ret = blade_session_send(bs, req, callback, data);
+ ret = blade_session_send(bs, req, blade_rpcsubscribe_response_handler, data);
done:
if (req) cJSON_Delete(req);
}
// blade.subscribe request handler
-ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
- const char *req_params_event = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
- cJSON *req_params_remove = NULL;
- ks_bool_t remove = KS_FALSE;
+ const char *req_params_subscriber_nodeid = NULL;
+ cJSON *req_params_downstream = NULL;
+ ks_bool_t downstream = KS_FALSE;
+ cJSON *req_params_subscribe_channels = NULL;
+ cJSON *req_params_unsubscribe_channels = NULL;
+ ks_bool_t masterlocal = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
- ks_bool_t propagate = KS_FALSE;
+ cJSON *res_result_subscribe_channels = NULL;
+ cJSON *res_result_failed_channels = NULL;
ks_assert(brpcreq);
goto done;
}
- req_params_event = cJSON_GetObjectCstr(req_params, "event");
- if (!req_params_event) {
- ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'event'\n", blade_session_id_get(bs));
- blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
- blade_session_send(bs, res, NULL, NULL);
- goto done;
- }
-
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
goto done;
}
- req_params_remove = cJSON_GetObjectItem(req_params, "remove");
- remove = req_params_remove && req_params_remove->type == cJSON_True;
+ req_params_subscriber_nodeid = cJSON_GetObjectCstr(req_params, "subscriber-nodeid");
+ if (!req_params_subscriber_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscriber-nodeid");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ // @todo this may not be required, may be able to assume direction based on the session the message is received from, if came from upstream
+ // then it is heading downstream, otherwise it is heading upstream
+ req_params_downstream = cJSON_GetObjectItem(req_params, "downstream");
+ downstream = req_params_downstream && req_params_downstream->type == cJSON_True;
+
+ req_params_subscribe_channels = cJSON_GetObjectItem(req_params, "subscribe-channels");
+ req_params_unsubscribe_channels = cJSON_GetObjectItem(req_params, "unsubscribe-channels");
+
+ if (!req_params_subscribe_channels && !req_params_unsubscribe_channels) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscribe-channels' or 'unsubscribe-channels'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscribe-channels or unsubscribe-channels");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
// @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
- if (remove) {
- propagate = blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
- } else {
- propagate = blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+ if (req_params_unsubscribe_channels) {
+ cJSON *channel = NULL;
+ cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
+ blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
+ }
}
- if (propagate) blade_handle_rpcsubscribe_raw(bh, req_params_event, req_params_protocol, req_params_realm, remove, NULL, NULL);
+ masterlocal = blade_upstreammgr_masterlocal(blade_handle_upstreammgr_get(bh));
- // build the actual response finally
- blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+ if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
+ blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
- cJSON_AddStringToObject(res_result, "event", req_params_event);
- cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
- cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+ cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+ cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+ cJSON_AddStringToObject(res_result, "subscriber-nodeid", req_params_subscriber_nodeid);
+ if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
- // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
- blade_session_send(bs, res, NULL, NULL);
+ if (req_params_subscribe_channels) {
+ // @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request containing subscribe-channels
+ cJSON *channel = NULL;
+
+ cJSON_ArrayForEach(channel, req_params_subscribe_channels) {
+ if (blade_mastermgr_channel_verify(bh->mastermgr, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid)) {
+ blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
+ if (!res_result_subscribe_channels) res_result_subscribe_channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(res_result_subscribe_channels, cJSON_CreateString(channel->valuestring));
+ } else {
+ if (!res_result_failed_channels) res_result_failed_channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(res_result_failed_channels, cJSON_CreateString(channel->valuestring));
+ }
+ }
+ }
+
+ if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", res_result_subscribe_channels);
+ if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
+ // @note unsubscribe-channels get handled during the request path, so the response handlers do not need to reiterate them for any forseeable reason, and if they are needed they
+ // could be pulled from the original request associated to the response
+
+ // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+ blade_session_send(bs, res, NULL, NULL);
+ } else {
+ cJSON *temp_data = cJSON_CreateObject();
+
+ // @note track this so that when this local node gets a response to this propagated request we know what messageid to propagate the response with
+ cJSON_AddStringToObject(temp_data, "messageid", blade_rpc_request_messageid_get(brpcreq));
+
+ blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
+ cJSON_Delete(temp_data);
+ }
done:
return KS_FALSE;
}
+// blade.subscribe response handler
+ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+{
+ ks_bool_t ret = KS_FALSE;
+ blade_handle_t *bh = NULL;
+ blade_session_t *bs = NULL;
+ blade_rpc_response_callback_t original_callback = NULL;
+ cJSON *original_data = NULL;
+ blade_rpc_request_callback_t channel_callback = NULL;
+ cJSON *channel_data = NULL;
+ const char *messageid = NULL;
+ cJSON *res = NULL;
+ cJSON *res_result = NULL;
+ const char *res_result_protocol = NULL;
+ const char *res_result_realm = NULL;
+ const char *res_result_subscriber_nodeid = NULL;
+ cJSON *res_result_downstream = NULL;
+ ks_bool_t downstream = KS_FALSE;
+ cJSON *res_result_subscribe_channels = NULL;
+ cJSON *res_result_failed_channels = NULL;
+
+ ks_assert(brpcres);
+ ks_assert(data);
+
+ bh = blade_rpc_response_handle_get(brpcres);
+ ks_assert(bh);
+
+ bs = blade_sessionmgr_session_lookup(bh->sessionmgr, blade_rpc_response_sessionid_get(brpcres));
+ ks_assert(bs);
+
+ original_data = cJSON_GetObjectItem(data, "data");
+ original_callback = (blade_rpc_response_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "callback");
+ channel_data = cJSON_GetObjectItem(data, "channel-data");
+ channel_callback = (blade_rpc_request_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "channel-callback");
+
+ // @note when messageid exists, it means this message is only intended to be examined and relayed, the local node is not the subscriber
+ messageid = cJSON_GetObjectCstr(data, "messageid");
+
+ res = blade_rpc_response_message_get(brpcres);
+ ks_assert(res);
+
+ res_result = cJSON_GetObjectItem(res, "result");
+ if (!res_result) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'result' object\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ // @todo the following 3 fields, protocol, realm, and subscriber-nodeid may not be required to carry in the response as they could be
+ // obtained from the original request tied to the response, change this later
+ res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
+ if (!res_result_protocol) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'protocol'\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
+ if (!res_result_realm) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'realm'\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ res_result_subscriber_nodeid = cJSON_GetObjectCstr(res_result, "subscriber-nodeid");
+ if (!res_result_subscriber_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ res_result_downstream = cJSON_GetObjectItem(res_result, "downstream");
+ downstream = res_result_downstream && res_result_downstream->type == cJSON_True;
+
+ res_result_subscribe_channels = cJSON_GetObjectItem(res_result, "subscribe-channels");
+ res_result_failed_channels = cJSON_GetObjectItem(res_result, "failed-channels");
+
+ if (res_result_subscribe_channels) {
+ // @note only reach here when the master has responded to authorize subscriptions to channels, so all nodes along the path must
+ // add the subscriber
+ cJSON *channel = NULL;
+ blade_subscription_t *bsub = NULL;
+
+ cJSON_ArrayForEach(channel, res_result_subscribe_channels) {
+ blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, res_result_protocol, res_result_realm, channel->valuestring, res_result_subscriber_nodeid);
+ // @note these will only get assigned on the last response, received by the subscriber
+ if (channel_callback) blade_subscription_callback_set(bsub, channel_callback);
+ if (channel_data) blade_subscription_callback_data_set(bsub, channel_data);
+ }
+ }
+
+ // @note this will only happen on the last response, received by the subscriber
+ if (original_callback) ret = original_callback(brpcres, original_data);
+
+ if (messageid) {
+ blade_session_t *relay = NULL;
+ if (downstream) {
+ if (!(relay = blade_upstreammgr_session_get(bh->upstreammgr))) {
+ goto done;
+ }
+ } else {
+ if (!(relay = blade_routemgr_route_lookup(bh->routemgr, res_result_subscriber_nodeid))) {
+ goto done;
+ }
+ }
+
+ blade_rpc_response_raw_create(&res, &res_result, messageid);
+
+ cJSON_AddStringToObject(res_result, "protocol", res_result_protocol);
+ cJSON_AddStringToObject(res_result, "realm", res_result_realm);
+ cJSON_AddStringToObject(res_result, "subscriber-nodeid", res_result_subscriber_nodeid);
+ if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
+ if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", cJSON_Duplicate(res_result_subscribe_channels, 1));
+ if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", cJSON_Duplicate(res_result_failed_channels, 1));
+
+ blade_session_send(relay, res, NULL, NULL);
+
+ cJSON_Delete(res);
+
+ blade_session_read_unlock(relay);
+ }
+
+done:
+ blade_session_read_unlock(bs);
+ return ret;
+}
+
// blade.broadcast request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *broadcaster_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
- ks_pool_t *pool = NULL;
- const char *localid = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
- // this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to
- pool = blade_handle_pool_get(bh);
-
- if (!broadcaster_nodeid) {
- blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
- ks_assert(localid);
- broadcaster_nodeid = localid;
- }
-
- ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, broadcaster_nodeid, NULL, event, protocol, realm, params, callback, data);
-
- if (localid) ks_pool_free(pool, &localid);
+ ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, NULL, protocol, realm, channel, event, params, callback, data);
// @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
// if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
}
// blade.broadcast request handler
-ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
- const char *req_params_broadcaster_nodeid = NULL;
- const char *req_params_event = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
+ const char *req_params_channel = NULL;
+ const char *req_params_event = NULL;
cJSON *req_params_params = NULL;
blade_subscription_t *bsub = NULL;
blade_rpc_request_callback_t callback = NULL;
goto done;
}
- req_params_broadcaster_nodeid = cJSON_GetObjectCstr(req_params, "broadcaster-nodeid");
- if (!req_params_broadcaster_nodeid) {
- ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'broadcaster-nodeid'\n", blade_session_id_get(bs));
- blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params broadcaster-nodeid");
- blade_session_send(bs, res, NULL, NULL);
- goto done;
- }
-
- req_params_event = cJSON_GetObjectCstr(req_params, "event");
- if (!req_params_event) {
- ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
- blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
- blade_session_send(bs, res, NULL, NULL);
- goto done;
- }
-
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
goto done;
}
+ req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
+ if (!req_params_channel) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_event = cJSON_GetObjectCstr(req_params, "event");
+ if (!req_params_event) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
req_params_params = cJSON_GetObjectItem(req_params, "params");
- blade_subscriptionmgr_broadcast(bh->subscriptionmgr, req_params_broadcaster_nodeid, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL);
+ blade_subscriptionmgr_broadcast(bh->subscriptionmgr, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
- bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_event, req_params_protocol, req_params_realm);
+ bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
if (bsub) {
const char *localid = NULL;
ks_pool_t *pool = NULL;
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
- cJSON_AddStringToObject(res_result, "broadcaster-nodeid", req_params_broadcaster_nodeid);
- cJSON_AddStringToObject(res_result, "event", req_params_event);
+ // @todo this is not neccessary, can obtain this from the original request
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+ cJSON_AddStringToObject(res_result, "channel", req_params_channel);
+ cJSON_AddStringToObject(res_result, "event", req_params_event);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
return ret;
}
-KS_DECLARE(const char *) blade_rpcbroadcast_request_broadcaster_nodeid_get(blade_rpc_request_t *brpcreq)
-{
- cJSON *req = NULL;
- cJSON *req_params = NULL;
- const char *req_broadcaster_nodeid = NULL;
-
- ks_assert(brpcreq);
-
- req = blade_rpc_request_message_get(brpcreq);
- ks_assert(req);
-
- req_params = cJSON_GetObjectItem(req, "params");
- if (req_params) req_broadcaster_nodeid = cJSON_GetObjectCstr(req_params, "broadcaster-nodeid");
-
- return req_broadcaster_nodeid;
-}
-
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq)
{
cJSON *req = NULL;
struct blade_subscription_s {
ks_pool_t *pool;
- const char *event;
const char *protocol;
const char *realm;
+ const char *channel;
ks_hash_t *subscribers;
blade_rpc_request_callback_t callback;
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
- if (bsub->event) ks_pool_free(bsub->pool, &bsub->event);
if (bsub->protocol) ks_pool_free(bsub->pool, &bsub->protocol);
if (bsub->realm) ks_pool_free(bsub->pool, &bsub->subscribers);
+ if (bsub->channel) ks_pool_free(bsub->pool, &bsub->channel);
if (bsub->subscribers) ks_hash_destroy(&bsub->subscribers);
break;
case KS_MPCL_DESTROY:
}
}
-KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm)
+KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *protocol, const char *realm, const char *channel)
{
blade_subscription_t *bsub = NULL;
ks_assert(bsubP);
ks_assert(pool);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
+ ks_assert(channel);
bsub = ks_pool_alloc(pool, sizeof(blade_subscription_t));
bsub->pool = pool;
- bsub->event = ks_pstrdup(pool, event);
bsub->protocol = ks_pstrdup(pool, protocol);
bsub->realm = ks_pstrdup(pool, realm);
+ bsub->channel = ks_pstrdup(pool, channel);
ks_hash_create(&bsub->subscribers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsub->pool);
ks_assert(bsub->subscribers);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub)
+KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
- return bsub->event;
+ return bsub->protocol;
}
-KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
+KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
- return bsub->protocol;
+ return bsub->realm;
}
-KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub)
+KS_DECLARE(const char *) blade_subscription_channel_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
- return bsub->realm;
+ return bsub->channel;
}
// return bs;
//}
-KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *event, const char *protocol, const char *realm)
+KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel)
{
blade_subscription_t *bsub = NULL;
char *key = NULL;
ks_assert(bsmgr);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
+ ks_assert(channel);
- key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+ key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_READLOCKED);
// @todo if (bsub) blade_subscription_read_lock(bsub);
return bsub;
}
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target)
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
ks_bool_t propagate = KS_FALSE;
ks_assert(bsmgr);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
- ks_assert(target);
+ ks_assert(channel);
+ ks_assert(subscriber);
- key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+ key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
ks_hash_write_lock(bsmgr->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_UNLOCKED);
if (!bsub) {
- blade_subscription_create(&bsub, bsmgr->pool, event, protocol, realm);
+ blade_subscription_create(&bsub, bsmgr->pool, protocol, realm, channel);
ks_assert(bsub);
ks_hash_insert(bsmgr->subscriptions, (void *)ks_pstrdup(bsmgr->pool, key), bsub);
propagate = KS_TRUE;
}
- bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
+ bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)subscriber, KS_UNLOCKED);
if (!bsub_cleanup) {
ks_hash_create(&bsub_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsmgr->pool);
ks_assert(bsub_cleanup);
ks_log(KS_LOG_DEBUG, "Subscription Added: %s\n", key);
- ks_hash_insert(bsmgr->subscriptions_cleanup, (void *)ks_pstrdup(bsmgr->pool, target), (void *)bsub_cleanup);
+ ks_hash_insert(bsmgr->subscriptions_cleanup, (void *)ks_pstrdup(bsmgr->pool, subscriber), (void *)bsub_cleanup);
}
ks_hash_insert(bsub_cleanup, (void *)ks_pstrdup(bsmgr->pool, key), (void *)KS_TRUE);
- blade_subscription_subscribers_add(bsub, target);
+ blade_subscription_subscribers_add(bsub, subscriber);
ks_hash_write_unlock(bsmgr->subscriptions);
- ks_log(KS_LOG_DEBUG, "Subscriber Added: %s to %s\n", target, key);
+ ks_log(KS_LOG_DEBUG, "Subscriber Added: %s to %s\n", subscriber, key);
ks_pool_free(bsmgr->pool, &key);
return propagate;
}
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target)
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
ks_bool_t propagate = KS_FALSE;
ks_assert(bsmgr);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
- ks_assert(target);
+ ks_assert(channel);
+ ks_assert(subscriber);
- key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+ key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
ks_hash_write_lock(bsmgr->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_UNLOCKED);
if (bsub) {
- bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
+ bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)subscriber, KS_UNLOCKED);
ks_assert(bsub_cleanup);
ks_hash_remove(bsub_cleanup, key);
if (ks_hash_count(bsub_cleanup) == 0) {
- ks_hash_remove(bsmgr->subscriptions_cleanup, (void *)target);
+ ks_hash_remove(bsmgr->subscriptions_cleanup, (void *)subscriber);
}
- ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", target, key);
- blade_subscription_subscribers_remove(bsub, target);
+ ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", subscriber, key);
+ blade_subscription_subscribers_remove(bsub, subscriber);
if (ks_hash_count(blade_subscription_subscribers_get(bsub)) == 0) {
ks_log(KS_LOG_DEBUG, "Subscription Removed: %s\n", key);
return propagate;
}
-KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_t *bsmgr, const char *target)
+KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target)
{
ks_bool_t unsubbed = KS_FALSE;
while (!unsubbed) {
ks_hash_t *subscriptions = NULL;
- const char *event = NULL;
const char *protocol = NULL;
const char *realm = NULL;
+ const char *channel = NULL;
ks_hash_read_lock(bsmgr->subscriptions);
subscriptions = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
ks_assert(bsub);
// @note allocate these to avoid lifecycle issues when the last subscriber is removed causing the subscription to be removed
- event = ks_pstrdup(bsmgr->pool, blade_subscription_event_get(bsub));
protocol = ks_pstrdup(bsmgr->pool, blade_subscription_protocol_get(bsub));
realm = ks_pstrdup(bsmgr->pool, blade_subscription_realm_get(bsub));
+ channel = ks_pstrdup(bsmgr->pool, blade_subscription_channel_get(bsub));
}
ks_hash_read_unlock(bsmgr->subscriptions);
if (!unsubbed) {
- if (blade_subscriptionmgr_subscriber_remove(bsmgr, NULL, event, protocol, realm, target)) {
- blade_handle_rpcsubscribe_raw(bsmgr->handle, event, protocol, realm, KS_TRUE, NULL, NULL);
- }
- ks_pool_free(bsmgr->pool, &event);
+ blade_subscriptionmgr_subscriber_remove(bsmgr, NULL, protocol, realm, channel, target);
+
ks_pool_free(bsmgr->pool, &protocol);
ks_pool_free(bsmgr->pool, &realm);
+ ks_pool_free(bsmgr->pool, &channel);
}
}
}
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *broadcaster_nodeid, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
{
const char *bsub_key = NULL;
blade_subscription_t *bsub = NULL;
blade_session_t *bs = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
ks_assert(bsmgr);
- ks_assert(broadcaster_nodeid);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
+ ks_assert(channel);
+
+ bsub_key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
- bsub_key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+ blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
+ cJSON_AddStringToObject(req_params, "realm", realm);
+ cJSON_AddStringToObject(req_params, "channel", channel);
+ cJSON_AddStringToObject(req_params, "event", event);
+ if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
ks_hash_read_lock(bsmgr->subscriptions);
for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
- cJSON *req = NULL;
- cJSON *req_params = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
+ // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
- bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bsmgr->handle), (const char *)key);
+ bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
if (bs) {
- ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
-
- blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
-
- cJSON_AddStringToObject(req_params, "broadcaster-nodeid", broadcaster_nodeid);
- cJSON_AddStringToObject(req_params, "event", event);
- cJSON_AddStringToObject(req_params, "protocol", protocol);
- cJSON_AddStringToObject(req_params, "realm", realm);
-
- if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+ ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data);
- cJSON_Delete(req);
-
blade_session_read_unlock(bs);
}
}
ks_hash_read_unlock(bsmgr->subscriptions);
- ks_pool_free(bsmgr->pool, &bsub_key);
-
bs = blade_upstreammgr_session_get(blade_handle_upstreammgr_get(bsmgr->handle));
if (bs) {
if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
- cJSON *req = NULL;
- cJSON *req_params = NULL;
-
- ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
-
- blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
-
- cJSON_AddStringToObject(req_params, "broadcaster-nodeid", broadcaster_nodeid);
- cJSON_AddStringToObject(req_params, "event", event);
- cJSON_AddStringToObject(req_params, "protocol", protocol);
- cJSON_AddStringToObject(req_params, "realm", realm);
-
- if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+ ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data);
-
- cJSON_Delete(req);
}
blade_session_read_unlock(bs);
}
+ cJSON_Delete(req);
+
+ ks_pool_free(bsmgr->pool, &bsub_key);
+
return KS_STATUS_SUCCESS;
}
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(ks_bool_t) blade_upstreammgr_masterlocal(blade_upstreammgr_t *bumgr)
+{
+ ks_bool_t ret = KS_FALSE;
+
+ ks_assert(bumgr);
+
+ ks_rwl_read_lock(bumgr->masterid_rwl);
+ ks_rwl_read_lock(bumgr->localid_rwl);
+ ret = bumgr->masterid && bumgr->localid && !ks_safe_strcasecmp(bumgr->masterid, bumgr->localid);
+ ks_rwl_read_unlock(bumgr->localid_rwl);
+ ks_rwl_read_unlock(bumgr->masterid_rwl);
+
+ return ret;
+}
+
KS_DECLARE(ks_status_t) blade_upstreammgr_realm_add(blade_upstreammgr_t *bumgr, const char *realm)
{
char *key = NULL;
KS_DECLARE(ks_status_t) blade_mastermgr_create(blade_mastermgr_t **bmmgrP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_mastermgr_destroy(blade_mastermgr_t **bmmgrP);
KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr);
+KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid);
KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
-KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *controller);
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target);
+KS_DECLARE(ks_bool_t) blade_mastermgr_channel_verify(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel, const char *target);
KS_END_EXTERN_C
#endif
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
-KS_DECLARE(ks_hash_t *) blade_protocol_controllers_get(blade_protocol_t *bp);
+KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid);
-KS_DECLARE(ks_status_t) blade_protocol_controllers_remove(blade_protocol_t *bp, const char *nodeid);
+KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp);
+KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name);
+KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
+KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target);
+KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target);
KS_END_EXTERN_C
#endif
#include <blade.h>
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_rpc_destroy(blade_rpc_t **brpcP);
KS_DECLARE(blade_handle_t *) blade_rpc_handle_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_method_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_protocol_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_realm_get(blade_rpc_t *brpc);
KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brpc);
-KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc);
+KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc);
KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
blade_handle_t *bh,
const char *session_id,
cJSON *json,
blade_rpc_response_callback_t callback,
- void *data);
+ cJSON *data);
KS_DECLARE(ks_status_t) blade_rpc_request_destroy(blade_rpc_request_t **brpcreqP);
+KS_DECLARE(ks_status_t) blade_rpc_request_duplicate(blade_rpc_request_t **brpcreqP, blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
-KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
+KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_rpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);
KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
-KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data);
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
-KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data);
+
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data);
-KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *broadcaster_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
-KS_DECLARE(const char *) blade_rpcbroadcast_request_broadcaster_nodeid_get(blade_rpc_request_t *brpcreq);
+KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
KS_END_EXTERN_C
#include <blade.h>
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm);
+KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP);
-KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub);
+KS_DECLARE(const char *) blade_subscription_channel_get(blade_subscription_t *bsub);
KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_create(blade_subscriptionmgr_t **bsmgrP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_destroy(blade_subscriptionmgr_t **bsmgrP);
KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscriptionmgr_t *bsmgr);
-KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *event, const char *protocol, const char *realm);
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target);
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target);
-KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_t *bsmgr, const char *target);
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *broadcaster_nodeid, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
+KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_END_EXTERN_C
#endif
typedef struct blade_sessionmgr_s blade_sessionmgr_t;
typedef struct blade_session_callback_data_s blade_session_callback_data_t;
-typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);
-typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, void *data);
+typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, cJSON *data);
+typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, cJSON *data);
typedef enum {
KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_set(blade_upstreammgr_t *bumgr, const char *id);
KS_DECLARE(ks_bool_t) blade_upstreammgr_masterid_compare(blade_upstreammgr_t *bumgr, const char *id);
KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_copy(blade_upstreammgr_t *bumgr, ks_pool_t *pool, const char **id);
+KS_DECLARE(ks_bool_t) blade_upstreammgr_masterlocal(blade_upstreammgr_t *bumgr);
//KS_DECLARE(ks_hash_t *) blade_upstreammgr_realm_lookup(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_upstreammgr_realm_add(blade_upstreammgr_t *bumgr, const char *realm);
KS_DECLARE(ks_status_t) blade_upstreammgr_realm_remove(blade_upstreammgr_t *bumgr, const char *realm);
void command_subscribe(blade_handle_t *bh, char *args)
{
+ cJSON *channels = NULL;
+
ks_assert(bh);
ks_assert(args);
- blade_handle_rpcsubscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
+ blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
+ cJSON_Delete(channels);
}
/* For Emacs:
cJSON_AddStringToObject(result, "text", text);
blade_rpcexecute_response_send(brpcreq, result);
+ cJSON_Delete(result);
return KS_FALSE;
}
-ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_broadcast_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
- ks_log(KS_LOG_DEBUG, "Session (%s) test.event response processing\n", blade_session_id_get(bs));
+ ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast response processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
// @todo build up json-based method schema for each protocolrpc registered above, and pass into blade_handle_rpcpublish() to attach to the request, to be stored in the blade_protocol_t tracked by the master node
- blade_handle_rpcpublish(bh, "test", "mydomain.com", blade_publish_response_handler, NULL);
+ blade_handle_rpcpublish(bh, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
}
void command_broadcast(blade_handle_t *bh, char *args)
ks_assert(bh);
ks_assert(args);
- blade_handle_rpcbroadcast(bh, NULL, "test.event", "test", "mydomain.com", NULL, test_event_response_handler, NULL);
+ blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "event", NULL, test_broadcast_response_handler, NULL);
}
const char *g_testcon_nodeid = NULL;
-ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
return KS_FALSE;
}
-ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
return KS_FALSE;
}
-ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
return KS_FALSE;
}
-ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
return KS_FALSE;
}
-ks_bool_t test_join_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
- const char *broadcaster_nodeid = NULL;
cJSON *params = NULL;
//cJSON *result = NULL;
params = blade_rpcbroadcast_request_params_get(brpcreq);
ks_assert(params);
- broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
- ks_assert(broadcaster_nodeid);
-
- ks_log(KS_LOG_DEBUG, "Session (%s) test.join (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
-
- blade_session_read_unlock(bs);
-
- return KS_FALSE;
-}
-
-ks_bool_t test_leave_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
-{
- blade_handle_t *bh = NULL;
- blade_session_t *bs = NULL;
- const char *broadcaster_nodeid = NULL;
- cJSON *params = NULL;
- //cJSON *result = NULL;
-
- ks_assert(brpcreq);
-
- bh = blade_rpc_request_handle_get(brpcreq);
- ks_assert(bh);
-
- bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
- ks_assert(bs);
-
- params = blade_rpcbroadcast_request_params_get(brpcreq);
- ks_assert(params);
-
- broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
- ks_assert(broadcaster_nodeid);
-
- ks_log(KS_LOG_DEBUG, "Session (%s) test.leave (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
-
- blade_session_read_unlock(bs);
-
- return KS_FALSE;
-}
-
-ks_bool_t test_talk_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
-{
- blade_handle_t *bh = NULL;
- blade_session_t *bs = NULL;
- const char *broadcaster_nodeid = NULL;
- cJSON *params = NULL;
- //cJSON *result = NULL;
-
- ks_assert(brpcreq);
-
- bh = blade_rpc_request_handle_get(brpcreq);
- ks_assert(bh);
-
- bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
- ks_assert(bs);
-
- broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
- ks_assert(broadcaster_nodeid);
-
- params = blade_rpcbroadcast_request_params_get(brpcreq);
- ks_assert(params);
-
- // @todo pull out text from params
-
- ks_log(KS_LOG_DEBUG, "Session (%s) test.talk (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
+ ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
void command_join(blade_handle_t *bh, char *args)
{
cJSON *params = NULL;
+ cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
params = cJSON_CreateObject();
-
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", "mydomain.com", params, test_join_response_handler, NULL);
+ cJSON_Delete(params);
- blade_handle_rpcsubscribe(bh, "test.join", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_join_broadcast_handler, NULL);
- blade_handle_rpcsubscribe(bh, "test.leave", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_leave_broadcast_handler, NULL);
- blade_handle_rpcsubscribe(bh, "test.talk", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_talk_broadcast_handler, NULL);
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
+ blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_broadcast_handler, NULL);
+ cJSON_Delete(channels);
}
void command_leave(blade_handle_t *bh, char *args)
{
cJSON *params = NULL;
+ cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
}
params = cJSON_CreateObject();
-
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", "mydomain.com", params, test_leave_response_handler, NULL);
+ cJSON_Delete(params);
- blade_handle_rpcsubscribe(bh, "test.join", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
- blade_handle_rpcsubscribe(bh, "test.leave", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
- blade_handle_rpcsubscribe(bh, "test.talk", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
+ blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, NULL, NULL, NULL, NULL);
+ cJSON_Delete(channels);
}
void command_talk(blade_handle_t *bh, char *args)
}
params = cJSON_CreateObject();
-
cJSON_AddStringToObject(params, "text", args);
-
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", "mydomain.com", params, test_talk_response_handler, NULL);
+ cJSON_Delete(params);
}
/* For Emacs:
return KS_STATUS_SUCCESS;
}
-ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
//testproto_t *test = NULL;
blade_handle_t *bh = NULL;
ks_assert(brpcres);
ks_assert(data);
- //test = (testproto_t *)data;
+ //test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
return KS_FALSE;
}
-ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
testproto_t *test = NULL;
blade_handle_t *bh = NULL;
const char *requester_nodeid = NULL;
const char *key = NULL;
cJSON *params = NULL;
+ cJSON *channels = NULL;
cJSON *result = NULL;
ks_assert(brpcreq);
ks_assert(data);
- test = (testproto_t *)data;
+ test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
+ // session for execute response
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
requester_nodeid = blade_rpcexecute_request_requester_nodeid_get(brpcreq);
ks_assert(requester_nodeid);
+ // inner rpcexecute parameters
params = blade_rpcexecute_request_params_get(brpcreq);
ks_assert(params);
ks_log(KS_LOG_DEBUG, "Session (%s) test.join request processing\n", blade_session_id_get(bs));
+ // add to participants
key = ks_pstrdup(test->pool, requester_nodeid);
ks_assert(key);
+ // @todo to properly maintain protocol details tied to a specific node like this participants list requires a way to know if a specific node of interest goes offline to cleanup associated details
+ // refer back to work notes on ideas about this
ks_hash_write_lock(test->participants);
ks_hash_insert(test->participants, (void *)key, (void *)KS_TRUE);
ks_hash_write_unlock(test->participants);
- blade_session_read_unlock(bs);
+ // authorize channels with the master for the requester
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+
+ blade_handle_rpcauthorize(bh, requester_nodeid, KS_FALSE, "test", "mydomain.com", channels, NULL, NULL);
+
+ cJSON_Delete(channels);
+ // send rpcexecute response to the requester
result = cJSON_CreateObject();
blade_rpcexecute_response_send(brpcreq, result);
+ cJSON_Delete(result);
+
+ blade_session_read_unlock(bs);
+
+ // broadcast to authorized nodes that have subscribed, that the requester has joined
params = cJSON_CreateObject();
- blade_handle_rpcbroadcast(bh, requester_nodeid, "test.join", "test", "mydomain.com", params, NULL, NULL);
+ cJSON_AddStringToObject(params, "joiner-nodeid", requester_nodeid);
+
+ blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "join", params, NULL, NULL);
+
+ cJSON_Delete(params);
return KS_FALSE;
}
-ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
testproto_t *test = NULL;
blade_handle_t *bh = NULL;
ks_assert(brpcreq);
ks_assert(data);
- test = (testproto_t *)data;
+ test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
params = cJSON_CreateObject();
- blade_handle_rpcbroadcast(bh, requester_nodeid, "test.leave", "test", "mydomain.com", params, NULL, NULL);
+ cJSON_AddStringToObject(params, "leaver-nodeid", requester_nodeid);
+
+ blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "leave", params, NULL, NULL);
return KS_FALSE;
}
-ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
//testproto_t *test = NULL;
blade_handle_t *bh = NULL;
ks_assert(brpcreq);
ks_assert(data);
- //test = (testproto_t *)data;
+ //test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
cJSON_AddStringToObject(params, "text", text);
- blade_handle_rpcbroadcast(bh, requester_nodeid, "test.talk", "test", "mydomain.com", params, NULL, NULL);
+ cJSON_AddStringToObject(params, "talker-nodeid", requester_nodeid);
+
+ blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "talk", params, NULL, NULL);
return KS_FALSE;
}
blade_identity_destroy(&target);
if (connected) {
+ cJSON *channels = NULL;
+
// @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
ks_sleep_ms(3000);
- blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, test);
+ blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
- blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, test);
+ blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
- blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, test);
+ blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
- blade_handle_rpcpublish(bh, "test", "mydomain.com", test_publish_response_handler, test);
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+
+ blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, cJSON_CreatePtr((uintptr_t)test));
}
}
KS_DECLARE(cJSON *) cJSON_CreateStringPrintf(const char *fmt, ...);
KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *string);
+KS_DECLARE(cJSON *) cJSON_CreatePtr(uintptr_t pointer);
+KS_DECLARE(uintptr_t) cJSON_GetPtrValue(const cJSON *object);
+KS_DECLARE(uintptr_t) cJSON_GetObjectPtr(const cJSON *object, const char *string);
static inline cJSON *ks_json_add_child_obj(cJSON *json, const char *name, cJSON *obj)
{
return item;
}
-KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *string)
+KS_DECLARE(const char *) cJSON_GetObjectCstr(const cJSON *object, const char *string)
{
cJSON *cj = cJSON_GetObjectItem(object, string);
return cj->valuestring;
}
+KS_DECLARE(cJSON *) cJSON_CreatePtr(uintptr_t pointer)
+{
+ // @todo check for 32bit and use integer storage instead
+ return cJSON_CreateStringPrintf("%p", (void *)pointer);
+}
+
+KS_DECLARE(uintptr_t) cJSON_GetPtrValue(const cJSON *object)
+{
+ // @todo check for 32bit and use integer storage instead
+ void *pointer = NULL;
+ if (object && object->type == cJSON_String) sscanf_s(object->valuestring, "%p", &pointer);
+ return (uintptr_t)pointer;
+}
+
+KS_DECLARE(uintptr_t) cJSON_GetObjectPtr(const cJSON *object, const char *string)
+{
+ return cJSON_GetPtrValue(cJSON_GetObjectItem(object, string));
+}
+
/* For Emacs:
* Local Variables:
* mode:c