]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: Temporary commit for peer review
authorShane Bryldt <astaelan@gmail.com>
Tue, 25 Jul 2017 17:00:45 +0000 (11:00 -0600)
committerShane Bryldt <astaelan@gmail.com>
Tue, 25 Jul 2017 17:01:07 +0000 (11:01 -0600)
25 files changed:
libs/libblade/src/blade_mastermgr.c
libs/libblade/src/blade_protocol.c
libs/libblade/src/blade_routemgr.c
libs/libblade/src/blade_rpc.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_sessionmgr.c
libs/libblade/src/blade_stack.c
libs/libblade/src/blade_subscription.c
libs/libblade/src/blade_subscriptionmgr.c
libs/libblade/src/blade_upstreammgr.c
libs/libblade/src/include/blade_mastermgr.h
libs/libblade/src/include/blade_protocol.h
libs/libblade/src/include/blade_rpc.h
libs/libblade/src/include/blade_session.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_subscription.h
libs/libblade/src/include/blade_subscriptionmgr.h
libs/libblade/src/include/blade_types.h
libs/libblade/src/include/blade_upstreammgr.h
libs/libblade/test/bladec.c
libs/libblade/test/blades.c
libs/libblade/test/testcli.c
libs/libblade/test/testcon.c
libs/libks/src/include/ks_json.h
libs/libks/src/ks_json.c

index 445182b87a7fae0acd2761429427b833e47a32d9..1f003748390967327032a03902d4f53df5cffd17 100644 (file)
@@ -38,8 +38,7 @@ struct blade_mastermgr_s {
        ks_pool_t *pool;
 
        // @todo how does "exclusive" play into the controllers, does "exclusive" mean only one provider can exist for a given protocol and realm? what does non exclusive mean?
-       ks_hash_t *protocols; // protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate
-       ks_hash_t *protocols_cleanup; // keyed by the nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup
+       ks_hash_t *protocols; // protocols that have been published with blade.publish, and the details to locate a protocol controller with blade.locate
 };
 
 
@@ -76,9 +75,6 @@ KS_DECLARE(ks_status_t) blade_mastermgr_create(blade_mastermgr_t **bmmgrP, blade
        ks_hash_create(&bmmgr->protocols, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bmmgr->pool);
        ks_assert(bmmgr->protocols);
 
-       ks_hash_create(&bmmgr->protocols_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bmmgr->pool);
-       ks_assert(bmmgr->protocols_cleanup);
-
        ks_pool_set_cleanup(pool, bmmgr, NULL, blade_mastermgr_cleanup);
 
        *bmmgrP = bmmgr;
@@ -113,6 +109,40 @@ KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr
        return bmmgr->handle;
 }
 
+KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid)
+{
+       ks_hash_t *cleanup = NULL;
+
+       ks_hash_write_lock(bmmgr->protocols);
+       for (ks_hash_iterator_t *it = ks_hash_first(bmmgr->protocols, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const char *key = NULL;
+               blade_protocol_t *bp = NULL;
+
+               ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
+
+               if (blade_protocol_purge(bp, nodeid)) {
+                       if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bmmgr->pool);
+                       ks_hash_insert(cleanup, key, bp);
+               }
+       }
+       if (cleanup) {
+               for (ks_hash_iterator_t *it = ks_hash_first(cleanup, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       const char *key = NULL;
+                       blade_protocol_t *bp = NULL;
+
+                       ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
+
+                       ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
+                       ks_hash_remove(bmmgr->protocols, key);
+               }
+               ks_hash_destroy(&cleanup);
+       }
+
+       ks_hash_write_unlock(bmmgr->protocols);
+
+       return KS_STATUS_SUCCESS;
+}
+
 KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm)
 {
        blade_protocol_t *bp = NULL;
@@ -135,7 +165,6 @@ KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr,
 {
        blade_protocol_t *bp = NULL;
        char *key = NULL;
-       ks_hash_t *cleanup = NULL;
 
        ks_assert(bmmgr);
        ks_assert(protocol);
@@ -159,60 +188,131 @@ KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr,
                ks_hash_insert(bmmgr->protocols, (void *)ks_pstrdup(bmmgr->pool, key), bp);
        }
 
-       cleanup = (ks_hash_t *)ks_hash_search(bmmgr->protocols_cleanup, (void *)controller, KS_UNLOCKED);
-       if (!cleanup) {
-               ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bmmgr->pool);
-               ks_assert(cleanup);
-
-               ks_hash_insert(bmmgr->protocols_cleanup, (void *)ks_pstrdup(bmmgr->pool, controller), cleanup);
-       }
-       ks_hash_insert(cleanup, (void *)ks_pstrdup(bmmgr->pool, key), (void *)KS_TRUE);
        blade_protocol_controllers_add(bp, controller);
-       ks_log(KS_LOG_DEBUG, "Protocol Controller Added: %s to %s\n", controller, key);
+
+       ks_pool_free(bmmgr->pool, &key);
 
        ks_hash_write_unlock(bmmgr->protocols);
 
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *controller)
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel)
 {
-       ks_hash_t *cleanup = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_protocol_t *bp = NULL;
+       char *key = NULL;
 
        ks_assert(bmmgr);
-       ks_assert(controller);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(channel);
 
-       ks_hash_write_lock(bmmgr->protocols);
-       cleanup = (ks_hash_t *)ks_hash_search(bmmgr->protocols_cleanup, (void *)controller, KS_UNLOCKED);
-       if (cleanup) {
-               for (ks_hash_iterator_t *it = ks_hash_first(cleanup, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       void *key = NULL;
-                       void *value = NULL;
-                       blade_protocol_t *bp = NULL;
-                       ks_hash_t *controllers = NULL;
+       key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
 
-                       ks_hash_this(it, (const void **)&key, NULL, &value);
+       bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+       if (!bp) {
+               ret = KS_STATUS_NOT_FOUND;
+               goto done;
+       }
 
-                       bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, key, KS_UNLOCKED);
-                       ks_assert(bp); // should not happen when a cleanup still has a provider tracked for a protocol
+       blade_protocol_channel_add(bp, channel);
 
-                       ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s\n", controller, key);
-                       blade_protocol_controllers_remove(bp, controller);
+done:
+       ks_pool_free(bmmgr->pool, &key);
 
-                       controllers = blade_protocol_controllers_get(bp);
-                       if (ks_hash_count(controllers) == 0) {
-                               // @note this depends on locking something outside of the protocol that won't be destroyed, like the top level
-                               // protocols hash, but assumes then that any reader keeps the top level hash read locked while using the protocol
-                               // so it cannot be deleted
-                               ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
-                               ks_hash_remove(bmmgr->protocols, key);
-                       }
-               }
-               ks_hash_remove(bmmgr->protocols_cleanup, (void *)controller);
+       ks_hash_read_unlock(bmmgr->protocols);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_protocol_t *bp = NULL;
+       char *key = NULL;
+
+       ks_assert(bmmgr);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(channel);
+
+       key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
+
+       bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+       if (!bp) {
+               ret = KS_STATUS_NOT_FOUND;
+               goto done;
        }
-       ks_hash_write_unlock(bmmgr->protocols);
 
-       return KS_STATUS_SUCCESS;
+       blade_protocol_channel_remove(bp, channel);
+
+done:
+       ks_pool_free(bmmgr->pool, &key);
+
+       ks_hash_read_unlock(bmmgr->protocols);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_protocol_t *bp = NULL;
+       char *key = NULL;
+       //ks_hash_t *cleanup = NULL;
+
+       ks_assert(bmmgr);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(channel);
+       ks_assert(controller);
+       ks_assert(target);
+
+       key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
+
+       bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+       if (!bp) {
+               ret = KS_STATUS_NOT_FOUND;
+               goto done;
+       }
+       
+       ret = blade_protocol_channel_authorize(bp, remove, channel, controller, target);
+
+done:
+       ks_pool_free(bmmgr->pool, &key);
+
+       ks_hash_read_unlock(bmmgr->protocols);
+
+       return ret;
+}
+
+KS_DECLARE(ks_bool_t) blade_mastermgr_channel_verify(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel, const char *target)
+{
+       ks_bool_t ret = KS_FALSE;
+       blade_protocol_t *bp = NULL;
+       char *key = NULL;
+       //ks_hash_t *cleanup = NULL;
+
+       ks_assert(bmmgr);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(channel);
+       ks_assert(target);
+
+       key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
+
+       bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
+       if (!bp) goto done;
+
+       ret = blade_protocol_channel_verify(bp, channel, target);
+
+done:
+       ks_pool_free(bmmgr->pool, &key);
+
+       ks_hash_read_unlock(bmmgr->protocols);
+
+       return ret;
 }
 
 /* For Emacs:
index da6d29b3b84ad2c0b6e73cf0064f72631383794c..2a70375f1199f56c41c0ababf4d6d16d95fb086a 100644 (file)
@@ -39,6 +39,7 @@ struct blade_protocol_s {
        const char *name;
        const char *realm;
        ks_hash_t *controllers;
+       ks_hash_t *channels;
        // @todo descriptors (schema, etc) for each method within a protocol
 };
 
@@ -56,6 +57,7 @@ static void blade_protocol_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_poo
                if (bp->name) ks_pool_free(bp->pool, &bp->name);
                if (bp->realm) ks_pool_free(bp->pool, &bp->realm);
                if (bp->controllers) ks_hash_destroy(&bp->controllers);
+               if (bp->channels) ks_hash_destroy(&bp->channels);
                break;
        case KS_MPCL_DESTROY:
                break;
@@ -76,9 +78,12 @@ KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t
        bp->name = ks_pstrdup(pool, name);
        bp->realm = ks_pstrdup(pool, realm);
 
-       ks_hash_create(&bp->controllers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
+       ks_hash_create(&bp->controllers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
        ks_assert(bp->controllers);
 
+       ks_hash_create(&bp->channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
+       ks_assert(bp->channels);
+
        ks_pool_set_cleanup(pool, bp, NULL, blade_protocol_cleanup);
 
        *bpP = bp;
@@ -100,11 +105,53 @@ KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_hash_t *) blade_protocol_controllers_get(blade_protocol_t *bp)
+KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
+{
+       ks_bool_t ret = KS_FALSE;
+
+       ks_assert(bp);
+       ks_assert(nodeid);
+
+       // @todo iterate all channels, remove the nodeid from all authorized hashes
+       ks_hash_write_lock(bp->channels);
+       for (ks_hash_iterator_t *it = ks_hash_first(bp->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const char *key = NULL;
+               ks_hash_t *authorizations = NULL;
+
+               ks_hash_this(it, (const void **)&key, NULL, (void **)&authorizations);
+
+               if (ks_hash_remove(authorizations, nodeid)) {
+                       ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", nodeid, bp->name, bp->realm, key);
+               }
+       }
+       ks_hash_write_unlock(bp->channels);
+
+       ks_hash_write_lock(bp->controllers);
+       if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
+               ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
+       }
+       ret = ks_hash_count(bp->controllers) == 0;
+       ks_hash_write_unlock(bp->controllers);
+
+       return ret;
+}
+
+KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
 {
+       cJSON *controllers = cJSON_CreateObject();
+
        ks_assert(bp);
-       return bp->controllers;
 
+       for (ks_hash_iterator_t *it = ks_hash_first(bp->controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const char *key = NULL;
+               void *value = NULL;
+
+               ks_hash_this(it, (const void **)&key, NULL, &value);
+
+               cJSON_AddItemToArray(controllers, cJSON_CreateString(key));
+       }
+
+       return controllers;
 }
 
 KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid)
@@ -117,19 +164,108 @@ KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, con
        key = ks_pstrdup(bp->pool, nodeid);
        ks_hash_insert(bp->controllers, (void *)key, (void *)KS_TRUE);
 
+       ks_log(KS_LOG_DEBUG, "Protocol Controller Added: %s to %s@%s\n", nodeid, bp->name, bp->realm);
+
        return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       ks_hash_t *authorized = NULL;
+       char *key = NULL;
+
+       ks_assert(bp);
+       ks_assert(name);
+
+       ks_hash_write_lock(bp->channels);
 
+       if (ks_hash_search(bp->channels, name, KS_UNLOCKED)) {
+               ret = KS_STATUS_DUPLICATE_OPERATION;
+               goto done;
+       }
+
+       ks_hash_create(&authorized, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
+
+       key = ks_pstrdup(bp->pool, name);
+       ks_hash_insert(bp->channels, (void *)key, (void *)authorized);
+
+       ks_log(KS_LOG_DEBUG, "Protocol Channel Added: %s to %s@%s\n", key, bp->name, bp->realm);
+
+done:
+
+       ks_hash_write_unlock(bp->channels);
+
+       return ret;
 }
 
-KS_DECLARE(ks_status_t) blade_protocol_controllers_remove(blade_protocol_t *bp, const char *nodeid)
+KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
 {
        ks_assert(bp);
-       ks_assert(nodeid);
+       ks_assert(name);
 
-       ks_hash_remove(bp->controllers, (void *)nodeid);
+       ks_hash_remove(bp->channels, (void *)name);
+
+       ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
 
        return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       ks_hash_t *authorizations = NULL;
+       ks_bool_t allowed = KS_FALSE;
+
+       ks_assert(bp);
+       ks_assert(channel);
+       ks_assert(controller);
+       ks_assert(target);
+
+       allowed = (ks_bool_t)(intptr_t)ks_hash_search(bp->controllers, (void *)controller, KS_READLOCKED);
+       ks_hash_read_unlock(bp->controllers);
+
+       if (!allowed) {
+               ret = KS_STATUS_NOT_ALLOWED;
+               goto done;
+       }
+
+       // @todo verify controller, get ks_hash_t* value based on channel, add target to the channels hash
+       authorizations = (ks_hash_t *)ks_hash_search(bp->channels, (void *)channel, KS_READLOCKED);
+       if (authorizations) {
+               if (remove) {
+                       if (ks_hash_remove(authorizations, (void *)target)) {
+                               ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", target, bp->name, bp->realm, channel);
+                       } else ret = KS_STATUS_NOT_FOUND;
+               }
+               else {
+                       ks_hash_insert(authorizations, (void *)ks_pstrdup(bp->pool, target), (void *)KS_TRUE);
+                       ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Added: %s to %s@%s/%s\n", target, bp->name, bp->realm, channel);
+               }
+       }
+       ks_hash_read_unlock(bp->channels);
+
+       if (!authorizations) ret = KS_STATUS_NOT_FOUND;
+
+done:
+       return ret;
+}
+
+KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target)
+{
+       ks_bool_t ret = KS_FALSE;
+       ks_hash_t *authorizations = NULL;
+
+       ks_assert(bp);
+       ks_assert(channel);
+       ks_assert(target);
+
+       // @todo verify controller, get ks_hash_t* value based on channel, add target to the channels hash
+       authorizations = (ks_hash_t *)ks_hash_search(bp->channels, (void *)channel, KS_READLOCKED);
+       if (authorizations) ret = ks_hash_search(authorizations, target, KS_UNLOCKED) != NULL;
+       ks_hash_read_unlock(bp->channels);
 
+       return ret;
 }
 
 /* For Emacs:
index 6bee7907778ce4d7cdc499e82601ffbdb53453cf..84397f7b83b5c5687d915599e821e3b43f4a31bd 100644 (file)
@@ -157,12 +157,14 @@ KS_DECLARE(ks_status_t) blade_routemgr_route_remove(blade_routemgr_t *brmgr, con
 
        blade_handle_rpcregister(brmgr->handle, target, KS_TRUE, NULL, NULL);
 
+       blade_subscriptionmgr_purge(blade_handle_subscriptionmgr_get(brmgr->handle), target);
+
        // @note protocols are cleaned up here because routes can be removed that are not locally connected with a session but still
        // have protocols published to the master node from further downstream, in which case if a route is announced upstream to be
        // removed, a master node is still able to catch that here even when there is no direct session, but is also hit when there
        // is a direct session being terminated
 
-       blade_mastermgr_controller_remove(blade_handle_mastermgr_get(brmgr->handle), target);
+       blade_mastermgr_purge(blade_handle_mastermgr_get(brmgr->handle), target);
 
        return KS_STATUS_SUCCESS;
 }
index 1ee93a0e096b8381d224878fa177f598d248390f..ca1a3712cc80d5f16f36634b92de862b9425ba6c 100644 (file)
@@ -42,7 +42,7 @@ struct blade_rpc_s {
        const char *realm;
 
        blade_rpc_request_callback_t callback;
-       void *data;
+       cJSON *data;
 };
 
 struct blade_rpc_request_s {
@@ -54,7 +54,7 @@ struct blade_rpc_request_s {
        cJSON *message;
        const char *message_id; // pulled from message for easier keying
        blade_rpc_response_callback_t callback;
-       void *data;
+       cJSON *data;
        // @todo ttl to wait for response before injecting an error response locally
 };
 
@@ -72,21 +72,22 @@ struct blade_rpc_response_s {
 
 static void blade_rpc_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
 {
-       //blade_rpc_t *brpc = (blade_rpc_t *)ptr;
+       blade_rpc_t *brpc = (blade_rpc_t *)ptr;
 
-       //ks_assert(brpc);
+       ks_assert(brpc);
 
        switch (action) {
        case KS_MPCL_ANNOUNCE:
                break;
        case KS_MPCL_TEARDOWN:
+               if (brpc->data) cJSON_Delete(brpc->data);
                break;
        case KS_MPCL_DESTROY:
                break;
        }
 }
 
-KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data)
 {
        blade_rpc_t *brpc = NULL;
        ks_pool_t *pool = NULL;
@@ -169,7 +170,7 @@ KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brp
        return brpc->callback;
 }
 
-KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc)
+KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc)
 {
        ks_assert(brpc);
 
@@ -189,6 +190,7 @@ static void blade_rpc_request_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_
        case KS_MPCL_TEARDOWN:
                ks_pool_free(brpcreq->pool, (void **)&brpcreq->session_id);
                cJSON_Delete(brpcreq->message);
+               if (brpcreq->data) cJSON_Delete(brpcreq->data);
                break;
        case KS_MPCL_DESTROY:
                break;
@@ -201,7 +203,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
                                                                                                         const char *session_id,
                                                                                                         cJSON *json,
                                                                                                         blade_rpc_response_callback_t callback,
-                                                                                                        void *data)
+                                                                                                        cJSON *data)
 {
        blade_rpc_request_t *brpcreq = NULL;
 
@@ -276,7 +278,7 @@ KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_r
        return brpcreq->callback;
 }
 
-KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
+KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
 {
        ks_assert(brpcreq);
        return brpcreq->data;
index 46ced003f624650d1bf9f4b8916cbab207708c5d..aecd52c273d144d167daf9cd2dc20015bf16ae7f 100644 (file)
@@ -582,7 +582,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
 }
 
 
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data)
 {
        blade_rpc_request_t *brpcreq = NULL;
        const char *method = NULL;
@@ -706,6 +706,12 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                blade_session_send(bs_router, json, NULL, NULL);
                                blade_session_read_unlock(bs_router);
 
+                               // @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which
+                               // subscriptions should be removed along the request path, regardless of whether it's the consumer requesting or the
+                               // master due to a deauthorization, without respect to waiting for the response as it should be a gaurenteed operation
+                               // even when requested by the subscriber.  This unsubscribed-channels field is simply treated as a special field, regardless
+                               // of the actual method of the request.
+
                                return KS_STATUS_SUCCESS;
                        }
                }
@@ -763,6 +769,11 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                blade_session_send(bs_router, json, NULL, NULL);
                                blade_session_read_unlock(bs_router);
 
+                               // @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a
+                               // subscribed-channels field which should be added for the requester-nodeid, this will ensure the subscriptions are
+                               // added with respect to the master response and not immediately upon request.  This subscribed-channels field is
+                               // simply treated as a special field, regardless of the actual method of the request which is unavailable here.
+
                                return KS_STATUS_SUCCESS;
                        }
                }
index bc27a018dea25adc00af96b08085d8f82e4de75e..77d2b89ed051988c9110eebfbb98268beeb56752 100644 (file)
@@ -200,8 +200,6 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmg
 
        ks_log(KS_LOG_DEBUG, "Session Removed: %s\n", id);
 
-       blade_subscriptionmgr_subscriber_cleanup(blade_handle_subscriptionmgr_get(bsmgr->handle), id);
-
        if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), id)) {
                blade_upstreammgr_localid_set(blade_handle_upstreammgr_get(bsmgr->handle), NULL);
                blade_upstreammgr_masterid_set(blade_handle_upstreammgr_get(bsmgr->handle), NULL);
index 99f27fbde0b28c049612b5a76d5698e235f0426a..0f153e951f5ed49a1c8e7940954af9b72df1296d 100644 (file)
@@ -47,12 +47,14 @@ struct blade_handle_s {
        blade_sessionmgr_t *sessionmgr;
 };
 
-ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
-ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data);
+ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
 
 
 static void blade_handle_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
@@ -225,6 +227,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
        blade_rpc_create(&brpc, bh, "blade.publish", NULL, NULL, blade_rpcpublish_request_handler, NULL);
        blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
 
+       blade_rpc_create(&brpc, bh, "blade.authorize", NULL, NULL, blade_rpcauthorize_request_handler, NULL);
+       blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
+
        blade_rpc_create(&brpc, bh, "blade.locate", NULL, NULL, blade_rpclocate_request_handler, NULL);
        blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
 
@@ -348,7 +353,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
 // which is important for implementation of blade.execute where errors can be relayed back to the requester properly
 
 // blade.register request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -385,7 +390,7 @@ done:
 }
 
 // blade.register request handler
-ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -449,7 +454,7 @@ done:
 
 
 // blade.publish request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -459,7 +464,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
        const char *id = NULL;
 
        ks_assert(bh);
-       ks_assert(name);
+       ks_assert(protocol);
        ks_assert(realm);
 
        // @todo consideration for the Master trying to publish a protocol, with no upstream
@@ -474,7 +479,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
        blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.publish");
 
        // fill in the req_params
-       cJSON_AddStringToObject(req_params, "protocol", name);
+       cJSON_AddStringToObject(req_params, "protocol", protocol);
        cJSON_AddStringToObject(req_params, "realm", realm);
 
        blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
@@ -489,6 +494,12 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
        cJSON_AddStringToObject(req_params, "responder-nodeid", id);
        ks_pool_free(pool, &id);
 
+       // @todo may want to switch this system to use a blade_rpcpublish_args_t with validation on the contents on this list internally
+       // and to produce the entire json block internally in case the channel args change to include additional information like an encryption key,
+       // however if passing encryption keys then they should be asymetrically encrypted using a public key provided by the master so that
+       // the channel keys can be transmitted without intermediate nodes being able to snoop them
+       if (channels) cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
+
        // @todo add a parameter containing a block of json for schema definitions for each of the methods being published
 
        ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
@@ -503,12 +514,13 @@ done:
 }
 
 // blade.publish request handler
-ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
        cJSON *req = NULL;
        cJSON *req_params = NULL;
+       cJSON *req_params_channels = NULL;
        const char *req_params_protocol = NULL;
        const char *req_params_realm = NULL;
        const char *req_params_requester_nodeid = NULL;
@@ -576,10 +588,41 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                goto done;
        }
 
+       req_params_channels = cJSON_GetObjectItem(req_params, "channels");
+       if (req_params_channels) {
+               int size = 0;
+
+               if (req_params_channels->type != cJSON_Array) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
+                       blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+                       blade_session_send(bs, res, NULL, NULL);
+                       goto done;
+               }
+
+               size = cJSON_GetArraySize(req_params_channels);
+               for (int index = 0; index < size; ++index) {
+                       cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
+                       if (element->type != cJSON_String) {
+                               ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
+                               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+                               blade_session_send(bs, res, NULL, NULL);
+                               goto done;
+                       }
+               }
+       }
+
        ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
 
        blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
 
+       if (req_params_channels) {
+               int size = cJSON_GetArraySize(req_params_channels);
+               for (int index = 0; index < size; ++index) {
+                       cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
+                       blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+               }
+       }
+
        // build the actual response finally
        blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
 
@@ -600,11 +643,231 @@ done:
 }
 
 
+// blade.authorize request generator
+KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_session_t *bs = NULL;
+       ks_pool_t *pool = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       const char *id = NULL;
+
+       ks_assert(bh);
+       ks_assert(nodeid);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(channels);
+
+       // @todo consideration for the Master trying to publish a protocol, with no upstream
+       if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
+               ret = KS_STATUS_DISCONNECTED;
+               goto done;
+       }
+
+       pool = blade_handle_pool_get(bh);
+       ks_assert(pool);
+
+       blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.authorize");
+
+       // fill in the req_params
+       cJSON_AddStringToObject(req_params, "protocol", protocol);
+       cJSON_AddStringToObject(req_params, "realm", realm);
+       if (remove) cJSON_AddTrueToObject(req_params, "remove");
+       cJSON_AddStringToObject(req_params, "authorized-nodeid", nodeid);
+
+       blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
+       ks_assert(id);
+
+       cJSON_AddStringToObject(req_params, "requester-nodeid", id);
+       ks_pool_free(pool, &id);
+
+       blade_upstreammgr_masterid_copy(bh->upstreammgr, pool, &id);
+       ks_assert(id);
+
+       cJSON_AddStringToObject(req_params, "responder-nodeid", id);
+       ks_pool_free(pool, &id);
+
+       cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
+
+       // @todo add a parameter containing a block of json for schema definitions for each of the methods being published
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) authorize request started\n", blade_session_id_get(bs));
+
+       ret = blade_session_send(bs, req, callback, data);
+
+done:
+       if (req) cJSON_Delete(req);
+       if (bs) blade_session_read_unlock(bs);
+
+       return ret;
+}
+
+// blade.authorize request handler
+ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       cJSON *req_params_channels = NULL;
+       cJSON *req_params_remove = NULL;
+       cJSON *channel = NULL;
+       ks_bool_t remove = KS_FALSE;
+       const char *req_params_protocol = NULL;
+       const char *req_params_realm = NULL;
+       const char *req_params_authorized_nodeid = NULL;
+       const char *req_params_requester_nodeid = NULL;
+       const char *req_params_responder_nodeid = NULL;
+       cJSON *res = NULL;
+       cJSON *res_result = NULL;
+       cJSON *res_result_authorized_channels = NULL;
+       cJSON *res_result_unauthorized_channels = NULL;
+       cJSON *res_result_failed_channels = NULL;
+
+       ks_assert(brpcreq);
+
+       bh = blade_rpc_request_handle_get(brpcreq);
+       ks_assert(bh);
+
+       bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
+       ks_assert(bs);
+
+       req = blade_rpc_request_message_get(brpcreq);
+       ks_assert(req);
+
+       req_params = cJSON_GetObjectItem(req, "params");
+       if (!req_params) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'params' object\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+       if (!req_params_protocol) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'protocol'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+       if (!req_params_realm) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'realm'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_remove = cJSON_GetObjectItem(req_params, "remove");
+       if (req_params_remove && req_params_remove->type == cJSON_True) remove = KS_TRUE;
+
+       req_params_authorized_nodeid = cJSON_GetObjectCstr(req_params, "authorized-nodeid");
+       if (!req_params_authorized_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'authorized-nodeid'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params authorized-nodeid");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
+
+       req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
+       if (!req_params_requester_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'requester-nodeid'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
+       if (!req_params_responder_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'responder-nodeid'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_channels = cJSON_GetObjectItem(req_params, "channels");
+       if (!req_params_channels) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'channels'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       if (req_params_channels->type != cJSON_Array) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       cJSON_ArrayForEach(channel, req_params_channels) {
+               if (channel->type != cJSON_String) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
+                       blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
+                       blade_session_send(bs, res, NULL, NULL);
+                       goto done;
+               }
+       }
+
+       if (!blade_upstreammgr_masterid_compare(bh->upstreammgr, req_params_responder_nodeid)) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) authorize request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
+
+       // build the actual response finally
+       blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+
+       cJSON_ArrayForEach(channel, req_params_channels) {
+               if (blade_mastermgr_channel_authorize(bh->mastermgr, remove, req_params_protocol, req_params_realm, channel->valuestring, req_params_requester_nodeid, req_params_authorized_nodeid) == KS_STATUS_SUCCESS) {
+                       if (remove) {
+                               if (!res_result_unauthorized_channels) res_result_unauthorized_channels = cJSON_CreateArray();
+                               cJSON_AddItemToArray(res_result_unauthorized_channels, cJSON_CreateString(channel->valuestring));
+                               // @todo unauthorizing channels should force a subscribe remove request for the target if they are subscribed, to prevent further events from reaching the target
+                               // this will require the master node to invoke the subscription removal as opposed to the target who normally invokes subscribe
+                       } else {
+                               if (!res_result_authorized_channels) res_result_authorized_channels = cJSON_CreateArray();
+                               cJSON_AddItemToArray(res_result_authorized_channels, cJSON_CreateString(channel->valuestring));
+                       }
+               } else {
+                       if (!res_result_failed_channels) res_result_failed_channels = cJSON_CreateArray();
+                       cJSON_AddItemToArray(res_result_failed_channels, cJSON_CreateString(channel->valuestring));
+               }
+       }
+
+       cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+       cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+       cJSON_AddStringToObject(res_result, "authorized-nodeid", req_params_authorized_nodeid);
+       cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
+       cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
+       if (res_result_authorized_channels)  cJSON_AddItemToObject(res_result, "authorized-channels", res_result_authorized_channels);
+       if (res_result_unauthorized_channels)  cJSON_AddItemToObject(res_result, "unauthorized-channels", res_result_unauthorized_channels);
+       if (res_result_failed_channels)  cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
+
+       // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+       blade_session_send(bs, res, NULL, NULL);
+
+done:
+
+       if (res) cJSON_Delete(res);
+       if (bs) blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
+
+
 // blade.locate request generator
 // @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located
 // to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting
 // every protocol update to everyone which may actually be a better way than an explicit locate request
-KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -614,7 +877,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *n
        const char *id = NULL;
 
        ks_assert(bh);
-       ks_assert(name);
+       ks_assert(protocol);
        ks_assert(realm);
 
        if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
@@ -628,7 +891,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *n
        blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.locate");
 
        // fill in the req_params
-       cJSON_AddStringToObject(req_params, "protocol", name);
+       cJSON_AddStringToObject(req_params, "protocol", protocol);
        cJSON_AddStringToObject(req_params, "realm", realm);
 
        blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
@@ -655,7 +918,7 @@ done:
 }
 
 // blade.locate request handler
-ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -667,7 +930,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        const char *req_params_responder_nodeid = NULL;
        cJSON *res = NULL;
        cJSON *res_result = NULL;
-       cJSON *res_result_controllers;
+       cJSON *res_result_controllers = NULL;
        blade_protocol_t *bp = NULL;
 
        ks_assert(brpcreq);
@@ -732,20 +995,8 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
 
        ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
 
-       res_result_controllers = cJSON_CreateObject();
-
        bp = blade_mastermgr_protocol_lookup(bh->mastermgr, req_params_protocol, req_params_realm);
-       if (bp) {
-               ks_hash_t *controllers = blade_protocol_controllers_get(bp);
-               for (ks_hash_iterator_t *it = ks_hash_first(controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       const char *key = NULL;
-                       void *value = NULL;
-
-                       ks_hash_this(it, (const void **)&key, NULL, &value);
-
-                       cJSON_AddItemToArray(res_result_controllers, cJSON_CreateString(key));
-               }
-       }
+       if (bp) res_result_controllers = blade_protocol_controllers_pack(bp);
 
 
        // build the actual response finally
@@ -755,7 +1006,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        cJSON_AddStringToObject(res_result, "realm", req_params_realm);
        cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
        cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
-       cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
+       if (res_result_controllers) cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
        blade_session_send(bs, res, NULL, NULL);
@@ -770,7 +1021,7 @@ done:
 
 
 // blade.execute request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -813,12 +1064,6 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *
 
        ks_log(KS_LOG_DEBUG, "Session (%s) execute request started\n", blade_session_id_get(bs));
 
-       // @todo change what blade_rpc_request_t carries for tracking data, use a tuple instead which makes it
-       // easier to free the tuple and potentially associated data if a request needs to be destroyed without
-       // the callback being called to know that the data is a tuple to destroy, meanwhile a tuple offers a
-       // spare pointer for universally wrapping callback + data pairs in a case like this
-       // in which case do not create the tuple here, just pass 2 data pointers to send and let it store them
-       // in the internal tuple
        ret = blade_session_send(bs, req, callback, data);
        
 done:
@@ -829,7 +1074,7 @@ done:
 }
 
 // blade.execute request handler
-ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        ks_bool_t ret = KS_FALSE;
        blade_handle_t *bh = NULL;
@@ -1059,19 +1304,20 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ
 
 
 // blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
        const char *localid = NULL;
-       ks_bool_t propagate = KS_FALSE;
        blade_subscription_t *bsub = NULL;
+       cJSON *temp_data = NULL;
 
        ks_assert(bh);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
+       ks_assert(subscribe_channels || unsubscribe_channels);
 
+       // @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
        if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
@@ -1080,20 +1326,24 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
        blade_upstreammgr_localid_copy(bh->upstreammgr, bh->pool, &localid);
        ks_assert(localid);
 
-       if (remove) {
-               propagate = blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, event, protocol, realm, localid);
-       } else {
-               propagate = blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, event, protocol, realm, localid);
-               ks_assert(event_callback);
+       if (unsubscribe_channels) {
+               cJSON *channel = NULL;
+               cJSON_ArrayForEach(channel, unsubscribe_channels) {
+                       blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, protocol, realm, channel->valuestring, localid);
+               }
        }
-       ks_pool_free(bh->pool, &localid);
 
-       if (!remove && bsub) {
-               blade_subscription_callback_set(bsub, event_callback);
-               blade_subscription_callback_data_set(bsub, event_data);
-       }
+       temp_data = cJSON_CreateObject();
+       
+       if (callback) cJSON_AddItemToObject(temp_data, "callback", cJSON_CreatePtr((uintptr_t)callback));
+       if (data) cJSON_AddItemToObject(temp_data, "data", data);
 
-       if (propagate) ret = blade_handle_rpcsubscribe_raw(bh, event, protocol, realm, remove, callback, data);
+       if (channel_callback) cJSON_AddItemToObject(temp_data, "channel-callback", cJSON_CreatePtr((uintptr_t)channel_callback));
+       if (channel_data) cJSON_AddItemToObject(temp_data, "channel-data", channel_data);
+
+       ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
+
+       ks_pool_free(bh->pool, &localid);
 
 done:
        if (bs) blade_session_read_unlock(bs);
@@ -1101,7 +1351,7 @@ done:
        return ret;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -1110,11 +1360,23 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const
        cJSON *req_params = NULL;
 
        ks_assert(bh);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
+       ks_assert(subscribe_channels || unsubscribe_channels);
+       ks_assert(subscriber);
 
-       if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
+       if (downstream) {
+               // @note if a master is sending a downstream update, it may only use unsubscribe_channels, cannot force a subscription without a subscriber callback
+               if (subscribe_channels) {
+                       ret = KS_STATUS_NOT_ALLOWED;
+                       goto done;
+               }
+               if (!(bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), subscriber))) {
+                       ret = KS_STATUS_DISCONNECTED;
+                       goto done;
+               }
+       }
+       else if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -1124,14 +1386,17 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const
 
        blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
 
-       cJSON_AddStringToObject(req_params, "event", event);
        cJSON_AddStringToObject(req_params, "protocol", protocol);
        cJSON_AddStringToObject(req_params, "realm", realm);
-       if (remove) cJSON_AddTrueToObject(req_params, "remove");
+       cJSON_AddStringToObject(req_params, "subscriber-nodeid", subscriber);
+       if (downstream) cJSON_AddTrueToObject(req_params, "downstream");
+
+       if (subscribe_channels) cJSON_AddItemToObject(req_params, "subscribe-channels", cJSON_Duplicate(subscribe_channels, 1));
+       if (unsubscribe_channels) cJSON_AddItemToObject(req_params, "unsubscribe-channels", cJSON_Duplicate(unsubscribe_channels, 1));
 
        ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, blade_rpcsubscribe_response_handler, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -1141,21 +1406,25 @@ done:
 }
 
 // blade.subscribe request handler
-ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
        ks_pool_t *pool = NULL;
        cJSON *req = NULL;
        cJSON *req_params = NULL;
-       const char *req_params_event = NULL;
        const char *req_params_protocol = NULL;
        const char *req_params_realm = NULL;
-       cJSON *req_params_remove = NULL;
-       ks_bool_t remove = KS_FALSE;
+       const char *req_params_subscriber_nodeid = NULL;
+       cJSON *req_params_downstream = NULL;
+       ks_bool_t downstream = KS_FALSE;
+       cJSON *req_params_subscribe_channels = NULL;
+       cJSON *req_params_unsubscribe_channels = NULL;
+       ks_bool_t masterlocal = KS_FALSE;
        cJSON *res = NULL;
        cJSON *res_result = NULL;
-       ks_bool_t propagate = KS_FALSE;
+       cJSON *res_result_subscribe_channels = NULL;
+       cJSON *res_result_failed_channels = NULL;
 
        ks_assert(brpcreq);
 
@@ -1179,14 +1448,6 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
                goto done;
        }
 
-       req_params_event = cJSON_GetObjectCstr(req_params, "event");
-       if (!req_params_event) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'event'\n", blade_session_id_get(bs));
-               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
-               blade_session_send(bs, res, NULL, NULL);
-               goto done;
-       }
-
        req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
@@ -1203,30 +1464,82 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
                goto done;
        }
 
-       req_params_remove = cJSON_GetObjectItem(req_params, "remove");
-       remove = req_params_remove && req_params_remove->type == cJSON_True;
+       req_params_subscriber_nodeid = cJSON_GetObjectCstr(req_params, "subscriber-nodeid");
+       if (!req_params_subscriber_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscriber-nodeid");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       // @todo this may not be required, may be able to assume direction based on the session the message is received from, if came from upstream
+       // then it is heading downstream, otherwise it is heading upstream
+       req_params_downstream = cJSON_GetObjectItem(req_params, "downstream");
+       downstream = req_params_downstream && req_params_downstream->type == cJSON_True;
+
+       req_params_subscribe_channels = cJSON_GetObjectItem(req_params, "subscribe-channels");
+       req_params_unsubscribe_channels = cJSON_GetObjectItem(req_params, "unsubscribe-channels");
+
+       if (!req_params_subscribe_channels && !req_params_unsubscribe_channels) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscribe-channels' or 'unsubscribe-channels'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscribe-channels or unsubscribe-channels");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
 
        // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
 
        ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
 
-       if (remove) {
-               propagate = blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
-       } else {
-               propagate = blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+       if (req_params_unsubscribe_channels) {
+               cJSON *channel = NULL;
+               cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
+                       blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
+               }
        }
 
-       if (propagate) blade_handle_rpcsubscribe_raw(bh, req_params_event, req_params_protocol, req_params_realm, remove, NULL, NULL);
+       masterlocal = blade_upstreammgr_masterlocal(blade_handle_upstreammgr_get(bh));
 
-       // build the actual response finally
-       blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+       if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
+               blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
 
-       cJSON_AddStringToObject(res_result, "event", req_params_event);
-       cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
-       cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+               cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+               cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+               cJSON_AddStringToObject(res_result, "subscriber-nodeid", req_params_subscriber_nodeid);
+               if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
 
-       // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-       blade_session_send(bs, res, NULL, NULL);
+               if (req_params_subscribe_channels) {
+                       // @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request containing subscribe-channels
+                       cJSON *channel = NULL;
+
+                       cJSON_ArrayForEach(channel, req_params_subscribe_channels) {
+                               if (blade_mastermgr_channel_verify(bh->mastermgr, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid)) {
+                                       blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
+                                       if (!res_result_subscribe_channels) res_result_subscribe_channels = cJSON_CreateArray();
+                                       cJSON_AddItemToArray(res_result_subscribe_channels, cJSON_CreateString(channel->valuestring));
+                               } else {
+                                       if (!res_result_failed_channels) res_result_failed_channels = cJSON_CreateArray();
+                                       cJSON_AddItemToArray(res_result_failed_channels, cJSON_CreateString(channel->valuestring));
+                               }
+                       }
+               }
+
+               if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", res_result_subscribe_channels);
+               if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
+               // @note unsubscribe-channels get handled during the request path, so the response handlers do not need to reiterate them for any forseeable reason, and if they are needed they
+               // could be pulled from the original request associated to the response
+
+               // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+               blade_session_send(bs, res, NULL, NULL);
+       } else {
+               cJSON *temp_data = cJSON_CreateObject();
+
+               // @note track this so that when this local node gets a response to this propagated request we know what messageid to propagate the response with
+               cJSON_AddStringToObject(temp_data, "messageid", blade_rpc_request_messageid_get(brpcreq));
+
+               blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
+               cJSON_Delete(temp_data);
+       }
 
 done:
 
@@ -1236,31 +1549,141 @@ done:
        return KS_FALSE;
 }
 
+// blade.subscribe response handler
+ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+{
+       ks_bool_t ret = KS_FALSE;
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       blade_rpc_response_callback_t original_callback = NULL;
+       cJSON *original_data = NULL;
+       blade_rpc_request_callback_t channel_callback = NULL;
+       cJSON *channel_data = NULL;
+       const char *messageid = NULL;
+       cJSON *res = NULL;
+       cJSON *res_result = NULL;
+       const char *res_result_protocol = NULL;
+       const char *res_result_realm = NULL;
+       const char *res_result_subscriber_nodeid = NULL;
+       cJSON *res_result_downstream = NULL;
+       ks_bool_t downstream = KS_FALSE;
+       cJSON *res_result_subscribe_channels = NULL;
+       cJSON *res_result_failed_channels = NULL;
+
+       ks_assert(brpcres);
+       ks_assert(data);
+
+       bh = blade_rpc_response_handle_get(brpcres);
+       ks_assert(bh);
+
+       bs = blade_sessionmgr_session_lookup(bh->sessionmgr, blade_rpc_response_sessionid_get(brpcres));
+       ks_assert(bs);
+
+       original_data = cJSON_GetObjectItem(data, "data");
+       original_callback = (blade_rpc_response_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "callback");
+       channel_data = cJSON_GetObjectItem(data, "channel-data");
+       channel_callback = (blade_rpc_request_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "channel-callback");
+
+       // @note when messageid exists, it means this message is only intended to be examined and relayed, the local node is not the subscriber
+       messageid = cJSON_GetObjectCstr(data, "messageid");
+
+       res = blade_rpc_response_message_get(brpcres);
+       ks_assert(res);
+
+       res_result = cJSON_GetObjectItem(res, "result");
+       if (!res_result) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'result' object\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       // @todo the following 3 fields, protocol, realm, and subscriber-nodeid may not be required to carry in the response as they could be
+       // obtained from the original request tied to the response, change this later
+       res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
+       if (!res_result_protocol) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'protocol'\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
+       if (!res_result_realm) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'realm'\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       res_result_subscriber_nodeid = cJSON_GetObjectCstr(res_result, "subscriber-nodeid");
+       if (!res_result_subscriber_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       res_result_downstream = cJSON_GetObjectItem(res_result, "downstream");
+       downstream = res_result_downstream && res_result_downstream->type == cJSON_True;
+
+       res_result_subscribe_channels = cJSON_GetObjectItem(res_result, "subscribe-channels");
+       res_result_failed_channels = cJSON_GetObjectItem(res_result, "failed-channels");
+
+       if (res_result_subscribe_channels) {
+               // @note only reach here when the master has responded to authorize subscriptions to channels, so all nodes along the path must
+               // add the subscriber
+               cJSON *channel = NULL;
+               blade_subscription_t *bsub = NULL;
+
+               cJSON_ArrayForEach(channel, res_result_subscribe_channels) {
+                       blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, res_result_protocol, res_result_realm, channel->valuestring, res_result_subscriber_nodeid);
+                       // @note these will only get assigned on the last response, received by the subscriber
+                       if (channel_callback) blade_subscription_callback_set(bsub, channel_callback);
+                       if (channel_data) blade_subscription_callback_data_set(bsub, channel_data);
+               }
+       }
+
+       // @note this will only happen on the last response, received by the subscriber
+       if (original_callback) ret = original_callback(brpcres, original_data);
+
+       if (messageid) {
+               blade_session_t *relay = NULL;
+               if (downstream) {
+                       if (!(relay = blade_upstreammgr_session_get(bh->upstreammgr))) {
+                               goto done;
+                       }
+               } else {
+                       if (!(relay = blade_routemgr_route_lookup(bh->routemgr, res_result_subscriber_nodeid))) {
+                               goto done;
+                       }
+               }
+
+               blade_rpc_response_raw_create(&res, &res_result, messageid);
+
+               cJSON_AddStringToObject(res_result, "protocol", res_result_protocol);
+               cJSON_AddStringToObject(res_result, "realm", res_result_realm);
+               cJSON_AddStringToObject(res_result, "subscriber-nodeid", res_result_subscriber_nodeid);
+               if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
+               if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", cJSON_Duplicate(res_result_subscribe_channels, 1));
+               if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", cJSON_Duplicate(res_result_failed_channels, 1));
+
+               blade_session_send(relay, res, NULL, NULL);
+
+               cJSON_Delete(res);
+
+               blade_session_read_unlock(relay);
+       }
+
+done:
+       blade_session_read_unlock(bs);
+       return ret;
+}
+
 
 // blade.broadcast request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *broadcaster_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
-       ks_pool_t *pool = NULL;
-       const char *localid = NULL;
 
        ks_assert(bh);
        ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
 
-       // this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to
-       pool = blade_handle_pool_get(bh);
-
-       if (!broadcaster_nodeid) {
-               blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
-               ks_assert(localid);
-               broadcaster_nodeid = localid;
-       }
-
-       ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, broadcaster_nodeid, NULL, event, protocol, realm, params, callback, data);
-
-       if (localid) ks_pool_free(pool, &localid);
+       ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, NULL,  protocol, realm, channel, event, params, callback, data);
 
        // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
        // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
@@ -1270,17 +1693,17 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
 }
 
 // blade.broadcast request handler
-ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        ks_bool_t ret = KS_FALSE;
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
        cJSON *req = NULL;
        cJSON *req_params = NULL;
-       const char *req_params_broadcaster_nodeid = NULL;
-       const char *req_params_event = NULL;
        const char *req_params_protocol = NULL;
        const char *req_params_realm = NULL;
+       const char *req_params_channel = NULL;
+       const char *req_params_event = NULL;
        cJSON *req_params_params = NULL;
        blade_subscription_t *bsub = NULL;
        blade_rpc_request_callback_t callback = NULL;
@@ -1306,22 +1729,6 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
                goto done;
        }
 
-       req_params_broadcaster_nodeid = cJSON_GetObjectCstr(req_params, "broadcaster-nodeid");
-       if (!req_params_broadcaster_nodeid) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'broadcaster-nodeid'\n", blade_session_id_get(bs));
-               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params broadcaster-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
-               goto done;
-       }
-
-       req_params_event = cJSON_GetObjectCstr(req_params, "event");
-       if (!req_params_event) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
-               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
-               blade_session_send(bs, res, NULL, NULL);
-               goto done;
-       }
-
        req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
@@ -1338,11 +1745,27 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
                goto done;
        }
 
+       req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
+       if (!req_params_channel) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_event = cJSON_GetObjectCstr(req_params, "event");
+       if (!req_params_event) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
        req_params_params = cJSON_GetObjectItem(req_params, "params");
 
-       blade_subscriptionmgr_broadcast(bh->subscriptionmgr, req_params_broadcaster_nodeid, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL);
+       blade_subscriptionmgr_broadcast(bh->subscriptionmgr, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
 
-       bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_event, req_params_protocol, req_params_realm);
+       bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
        if (bsub) {
                const char *localid = NULL;
                ks_pool_t *pool = NULL;
@@ -1362,10 +1785,11 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        // build the actual response finally
        blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
 
-       cJSON_AddStringToObject(res_result, "broadcaster-nodeid", req_params_broadcaster_nodeid);
-       cJSON_AddStringToObject(res_result, "event", req_params_event);
+       // @todo this is not neccessary, can obtain this from the original request
        cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
        cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+       cJSON_AddStringToObject(res_result, "channel", req_params_channel);
+       cJSON_AddStringToObject(res_result, "event", req_params_event);
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
        blade_session_send(bs, res, NULL, NULL);
@@ -1378,23 +1802,6 @@ done:
        return ret;
 }
 
-KS_DECLARE(const char *) blade_rpcbroadcast_request_broadcaster_nodeid_get(blade_rpc_request_t *brpcreq)
-{
-       cJSON *req = NULL;
-       cJSON *req_params = NULL;
-       const char *req_broadcaster_nodeid = NULL;
-
-       ks_assert(brpcreq);
-
-       req = blade_rpc_request_message_get(brpcreq);
-       ks_assert(req);
-
-       req_params = cJSON_GetObjectItem(req, "params");
-       if (req_params) req_broadcaster_nodeid = cJSON_GetObjectCstr(req_params, "broadcaster-nodeid");
-
-       return req_broadcaster_nodeid;
-}
-
 KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq)
 {
        cJSON *req = NULL;
index 599e7d01c0293e8d9e84ad96c0fba098f4b29851..99d732aec9f20ea6e085d9618600db858f93788d 100644 (file)
@@ -36,9 +36,9 @@
 struct blade_subscription_s {
        ks_pool_t *pool;
 
-       const char *event;
        const char *protocol;
        const char *realm;
+       const char *channel;
        ks_hash_t *subscribers;
 
        blade_rpc_request_callback_t callback;
@@ -56,9 +56,9 @@ static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks
        case KS_MPCL_ANNOUNCE:
                break;
        case KS_MPCL_TEARDOWN:
-               if (bsub->event) ks_pool_free(bsub->pool, &bsub->event);
                if (bsub->protocol) ks_pool_free(bsub->pool, &bsub->protocol);
                if (bsub->realm) ks_pool_free(bsub->pool, &bsub->subscribers);
+               if (bsub->channel) ks_pool_free(bsub->pool, &bsub->channel);
                if (bsub->subscribers) ks_hash_destroy(&bsub->subscribers);
                break;
        case KS_MPCL_DESTROY:
@@ -66,21 +66,21 @@ static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks
        }
 }
 
-KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm)
+KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *protocol, const char *realm, const char *channel)
 {
        blade_subscription_t *bsub = NULL;
 
        ks_assert(bsubP);
        ks_assert(pool);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
+       ks_assert(channel);
 
        bsub = ks_pool_alloc(pool, sizeof(blade_subscription_t));
        bsub->pool = pool;
-       bsub->event = ks_pstrdup(pool, event);
        bsub->protocol = ks_pstrdup(pool, protocol);
        bsub->realm = ks_pstrdup(pool, realm);
+       bsub->channel = ks_pstrdup(pool, channel);
 
        ks_hash_create(&bsub->subscribers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsub->pool);
        ks_assert(bsub->subscribers);
@@ -106,27 +106,27 @@ KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub)
+KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
 {
        ks_assert(bsub);
 
-       return bsub->event;
+       return bsub->protocol;
 
 }
 
-KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
+KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub)
 {
        ks_assert(bsub);
 
-       return bsub->protocol;
+       return bsub->realm;
 
 }
 
-KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub)
+KS_DECLARE(const char *) blade_subscription_channel_get(blade_subscription_t *bsub)
 {
        ks_assert(bsub);
 
-       return bsub->realm;
+       return bsub->channel;
 
 }
 
index 45dd48cd1567c4ecb313f8fbe1e88af5fa773cec..ea40e5c11418340bde47ae211fa865b680226836 100644 (file)
@@ -128,17 +128,17 @@ KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscription
 //     return bs;
 //}
 
-KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *event, const char *protocol, const char *realm)
+KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel)
 {
        blade_subscription_t *bsub = NULL;
        char *key = NULL;
 
        ks_assert(bsmgr);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
+       ks_assert(channel);
 
-       key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+       key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
 
        bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_READLOCKED);
        // @todo if (bsub) blade_subscription_read_lock(bsub);
@@ -149,7 +149,7 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla
        return bsub;
 }
 
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target)
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
 {
        char *key = NULL;
        blade_subscription_t *bsub = NULL;
@@ -157,40 +157,40 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr
        ks_bool_t propagate = KS_FALSE;
 
        ks_assert(bsmgr);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
-       ks_assert(target);
+       ks_assert(channel);
+       ks_assert(subscriber);
 
-       key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+       key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
 
        ks_hash_write_lock(bsmgr->subscriptions);
 
        bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_UNLOCKED);
 
        if (!bsub) {
-               blade_subscription_create(&bsub, bsmgr->pool, event, protocol, realm);
+               blade_subscription_create(&bsub, bsmgr->pool, protocol, realm, channel);
                ks_assert(bsub);
 
                ks_hash_insert(bsmgr->subscriptions, (void *)ks_pstrdup(bsmgr->pool, key), bsub);
                propagate = KS_TRUE;
        }
 
-       bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
+       bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)subscriber, KS_UNLOCKED);
        if (!bsub_cleanup) {
                ks_hash_create(&bsub_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsmgr->pool);
                ks_assert(bsub_cleanup);
 
                ks_log(KS_LOG_DEBUG, "Subscription Added: %s\n", key);
-               ks_hash_insert(bsmgr->subscriptions_cleanup, (void *)ks_pstrdup(bsmgr->pool, target), (void *)bsub_cleanup);
+               ks_hash_insert(bsmgr->subscriptions_cleanup, (void *)ks_pstrdup(bsmgr->pool, subscriber), (void *)bsub_cleanup);
        }
        ks_hash_insert(bsub_cleanup, (void *)ks_pstrdup(bsmgr->pool, key), (void *)KS_TRUE);
 
-       blade_subscription_subscribers_add(bsub, target);
+       blade_subscription_subscribers_add(bsub, subscriber);
 
        ks_hash_write_unlock(bsmgr->subscriptions);
 
-       ks_log(KS_LOG_DEBUG, "Subscriber Added: %s to %s\n", target, key);
+       ks_log(KS_LOG_DEBUG, "Subscriber Added: %s to %s\n", subscriber, key);
 
        ks_pool_free(bsmgr->pool, &key);
 
@@ -199,7 +199,7 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr
        return propagate;
 }
 
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target)
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
 {
        char *key = NULL;
        blade_subscription_t *bsub = NULL;
@@ -207,28 +207,28 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscription
        ks_bool_t propagate = KS_FALSE;
 
        ks_assert(bsmgr);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
-       ks_assert(target);
+       ks_assert(channel);
+       ks_assert(subscriber);
 
-       key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+       key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
 
        ks_hash_write_lock(bsmgr->subscriptions);
 
        bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_UNLOCKED);
 
        if (bsub) {
-               bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
+               bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)subscriber, KS_UNLOCKED);
                ks_assert(bsub_cleanup);
                ks_hash_remove(bsub_cleanup, key);
 
                if (ks_hash_count(bsub_cleanup) == 0) {
-                       ks_hash_remove(bsmgr->subscriptions_cleanup, (void *)target);
+                       ks_hash_remove(bsmgr->subscriptions_cleanup, (void *)subscriber);
                }
 
-               ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", target, key);
-               blade_subscription_subscribers_remove(bsub, target);
+               ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", subscriber, key);
+               blade_subscription_subscribers_remove(bsub, subscriber);
 
                if (ks_hash_count(blade_subscription_subscribers_get(bsub)) == 0) {
                        ks_log(KS_LOG_DEBUG, "Subscription Removed: %s\n", key);
@@ -246,7 +246,7 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscription
        return propagate;
 }
 
-KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_t *bsmgr, const char *target)
+KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target)
 {
        ks_bool_t unsubbed = KS_FALSE;
 
@@ -255,9 +255,9 @@ KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_
 
        while (!unsubbed) {
                ks_hash_t *subscriptions = NULL;
-               const char *event = NULL;
                const char *protocol = NULL;
                const char *realm = NULL;
+               const char *channel = NULL;
 
                ks_hash_read_lock(bsmgr->subscriptions);
                subscriptions = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
@@ -276,36 +276,43 @@ KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_
                        ks_assert(bsub);
 
                        // @note allocate these to avoid lifecycle issues when the last subscriber is removed causing the subscription to be removed
-                       event = ks_pstrdup(bsmgr->pool, blade_subscription_event_get(bsub));
                        protocol = ks_pstrdup(bsmgr->pool, blade_subscription_protocol_get(bsub));
                        realm = ks_pstrdup(bsmgr->pool, blade_subscription_realm_get(bsub));
+                       channel = ks_pstrdup(bsmgr->pool, blade_subscription_channel_get(bsub));
                }
                ks_hash_read_unlock(bsmgr->subscriptions);
 
                if (!unsubbed) {
-                       if (blade_subscriptionmgr_subscriber_remove(bsmgr, NULL, event, protocol, realm, target)) {
-                               blade_handle_rpcsubscribe_raw(bsmgr->handle, event, protocol, realm, KS_TRUE, NULL, NULL);
-                       }
-                       ks_pool_free(bsmgr->pool, &event);
+                       blade_subscriptionmgr_subscriber_remove(bsmgr, NULL, protocol, realm, channel, target);
+
                        ks_pool_free(bsmgr->pool, &protocol);
                        ks_pool_free(bsmgr->pool, &realm);
+                       ks_pool_free(bsmgr->pool, &channel);
                }
        }
 }
 
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *broadcaster_nodeid, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
 {
        const char *bsub_key = NULL;
        blade_subscription_t *bsub = NULL;
        blade_session_t *bs = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
 
        ks_assert(bsmgr);
-       ks_assert(broadcaster_nodeid);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
+       ks_assert(channel);
+
+       bsub_key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
 
-       bsub_key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
+       blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
+       cJSON_AddStringToObject(req_params, "protocol", protocol);
+       cJSON_AddStringToObject(req_params, "realm", realm);
+       cJSON_AddStringToObject(req_params, "channel", channel);
+       cJSON_AddStringToObject(req_params, "event", event);
+       if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
 
        ks_hash_read_lock(bsmgr->subscriptions);
 
@@ -318,32 +325,20 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
                for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
                        void *key = NULL;
                        void *value = NULL;
-                       cJSON *req = NULL;
-                       cJSON *req_params = NULL;
 
                        ks_hash_this(it, (const void **)&key, NULL, &value);
 
                        if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
 
+                       // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
                        if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
 
-                       bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bsmgr->handle), (const char *)key);
+                       bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
                        if (bs) {
-                               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
-
-                               blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
-
-                               cJSON_AddStringToObject(req_params, "broadcaster-nodeid", broadcaster_nodeid);
-                               cJSON_AddStringToObject(req_params, "event", event);
-                               cJSON_AddStringToObject(req_params, "protocol", protocol);
-                               cJSON_AddStringToObject(req_params, "realm", realm);
-
-                               if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+                               ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
 
                                blade_session_send(bs, req, callback, data);
 
-                               cJSON_Delete(req);
-
                                blade_session_read_unlock(bs);
                        }
                }
@@ -351,33 +346,21 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
 
        ks_hash_read_unlock(bsmgr->subscriptions);
 
-       ks_pool_free(bsmgr->pool, &bsub_key);
-
        bs = blade_upstreammgr_session_get(blade_handle_upstreammgr_get(bsmgr->handle));
        if (bs) {
                if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
-                       cJSON *req = NULL;
-                       cJSON *req_params = NULL;
-
-                       ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
-
-                       blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
-
-                       cJSON_AddStringToObject(req_params, "broadcaster-nodeid", broadcaster_nodeid);
-                       cJSON_AddStringToObject(req_params, "event", event);
-                       cJSON_AddStringToObject(req_params, "protocol", protocol);
-                       cJSON_AddStringToObject(req_params, "realm", realm);
-
-                       if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+                       ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
 
                        blade_session_send(bs, req, callback, data);
-
-                       cJSON_Delete(req);
                }
 
                blade_session_read_unlock(bs);
        }
 
+       cJSON_Delete(req);
+
+       ks_pool_free(bsmgr->pool, &bsub_key);
+
        return KS_STATUS_SUCCESS;
 }
 
index 394933eb09ece93e89cd624044f8f5a1102c6a22..2860b21e1b02da7d94db17ce5688866044c16130 100644 (file)
@@ -250,6 +250,21 @@ KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_copy(blade_upstreammgr_t *bum
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(ks_bool_t) blade_upstreammgr_masterlocal(blade_upstreammgr_t *bumgr)
+{
+       ks_bool_t ret = KS_FALSE;
+
+       ks_assert(bumgr);
+
+       ks_rwl_read_lock(bumgr->masterid_rwl);
+       ks_rwl_read_lock(bumgr->localid_rwl);
+       ret = bumgr->masterid && bumgr->localid && !ks_safe_strcasecmp(bumgr->masterid, bumgr->localid);
+       ks_rwl_read_unlock(bumgr->localid_rwl);
+       ks_rwl_read_unlock(bumgr->masterid_rwl);
+
+       return ret;
+}
+
 KS_DECLARE(ks_status_t) blade_upstreammgr_realm_add(blade_upstreammgr_t *bumgr, const char *realm)
 {
        char *key = NULL;
index 6d5e668c71a46bb1764a510ee697f7c82fbc3bac..db1d993b85052445ccd8b19b7f8586eed4bf0842 100644 (file)
@@ -39,9 +39,13 @@ KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_mastermgr_create(blade_mastermgr_t **bmmgrP, blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_mastermgr_destroy(blade_mastermgr_t **bmmgrP);
 KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr);
+KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid);
 KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm);
 KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
-KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *controller);
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target);
+KS_DECLARE(ks_bool_t) blade_mastermgr_channel_verify(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel, const char *target);
 KS_END_EXTERN_C
 
 #endif
index 7fc81d865d1850e9779c14cc2312f9916c5047ce..7e9e5242babc9540aa383382e6ae034fb728a014 100644 (file)
 KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
 KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
-KS_DECLARE(ks_hash_t *) blade_protocol_controllers_get(blade_protocol_t *bp);
+KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid);
 KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid);
-KS_DECLARE(ks_status_t) blade_protocol_controllers_remove(blade_protocol_t *bp, const char *nodeid);
+KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp);
+KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name);
+KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
+KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target);
+KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target);
 KS_END_EXTERN_C
 
 #endif
index e98dcb8117fad0fef347d04012cd7f48ac4651f8..f68b09f8c1328e8bcf0d63540f5782dd521fd36f 100644 (file)
 #include <blade.h>
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data);
 KS_DECLARE(ks_status_t) blade_rpc_destroy(blade_rpc_t **brpcP);
 KS_DECLARE(blade_handle_t *) blade_rpc_handle_get(blade_rpc_t *brpc);
 KS_DECLARE(const char *) blade_rpc_method_get(blade_rpc_t *brpc);
 KS_DECLARE(const char *) blade_rpc_protocol_get(blade_rpc_t *brpc);
 KS_DECLARE(const char *) blade_rpc_realm_get(blade_rpc_t *brpc);
 KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brpc);
-KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc);
+KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc);
 
 KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
                                                                                                         blade_handle_t *bh,
@@ -51,14 +51,15 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
                                                                                                         const char *session_id,
                                                                                                         cJSON *json,
                                                                                                         blade_rpc_response_callback_t callback,
-                                                                                                        void *data);
+                                                                                                        cJSON *data);
 KS_DECLARE(ks_status_t) blade_rpc_request_destroy(blade_rpc_request_t **brpcreqP);
+KS_DECLARE(ks_status_t) blade_rpc_request_duplicate(blade_rpc_request_t **brpcreqP, blade_rpc_request_t *brpcreq);
 KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
-KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
+KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
 
 KS_DECLARE(ks_status_t) blade_rpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);
 
index b84ac257ad0495adaff748abe8aed7a8b63018eb..710464e8598d54daf594b4f2ed00245bcd39c37e 100644 (file)
@@ -62,7 +62,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
 KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
 KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data);
 KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
 KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
index 267c96efb773423f1ac0817d9630fbd3fea74f4f..80f6671053e48786b3e208f3afb4b17cfa42e2b2 100644 (file)
@@ -59,24 +59,25 @@ KS_DECLARE(blade_sessionmgr_t *) blade_handle_sessionmgr_get(blade_handle_t *bh)
 
 KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data);
+
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
 KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
 KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *broadcaster_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
-KS_DECLARE(const char *) blade_rpcbroadcast_request_broadcaster_nodeid_get(blade_rpc_request_t *brpcreq);
+KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
 KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
 
 KS_END_EXTERN_C
index a0057e84721c96375f69aca52b2d62dd8d0b7869..96052a744e1b449958be6e39cc877a3be5b5381c 100644 (file)
 #include <blade.h>
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm);
+KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *protocol, const char *realm, const char *channel);
 KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP);
-KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub);
 KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub);
 KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub);
+KS_DECLARE(const char *) blade_subscription_channel_get(blade_subscription_t *bsub);
 KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
 KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
 KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);
index f586baaa5d33adb6c6092e590cc492e69fdc0c8a..9b4b0bda2c7d5bd4d8efd1526019e338dc3ecfb1 100644 (file)
@@ -39,11 +39,11 @@ KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_subscriptionmgr_create(blade_subscriptionmgr_t **bsmgrP, blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_subscriptionmgr_destroy(blade_subscriptionmgr_t **bsmgrP);
 KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscriptionmgr_t *bsmgr);
-KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *event, const char *protocol, const char *realm);
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target);
-KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target);
-KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_t *bsmgr, const char *target);
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *broadcaster_nodeid, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
+KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
+KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
 KS_END_EXTERN_C
 
 #endif
index 2c1eee0396e64453a023c6fe48f68deddf370d63..df7f7ae8c120bceb019316bbad0f50cd2781cf58 100644 (file)
@@ -62,8 +62,8 @@ typedef struct blade_connectionmgr_s blade_connectionmgr_t;
 typedef struct blade_sessionmgr_s blade_sessionmgr_t;
 typedef struct blade_session_callback_data_s blade_session_callback_data_t;
 
-typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);
-typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, void *data);
+typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, cJSON *data);
+typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, cJSON *data);
 
 
 typedef enum {
index d5212a8f81c2ab1096cd66e5c90b6ce7659cb84b..a44dfe735a1c14898844635041e9daca9bfcf0ba 100644 (file)
@@ -47,6 +47,7 @@ KS_DECLARE(blade_session_t *) blade_upstreammgr_session_get(blade_upstreammgr_t
 KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_set(blade_upstreammgr_t *bumgr, const char *id);
 KS_DECLARE(ks_bool_t) blade_upstreammgr_masterid_compare(blade_upstreammgr_t *bumgr, const char *id);
 KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_copy(blade_upstreammgr_t *bumgr, ks_pool_t *pool, const char **id);
+KS_DECLARE(ks_bool_t) blade_upstreammgr_masterlocal(blade_upstreammgr_t *bumgr);
 //KS_DECLARE(ks_hash_t *) blade_upstreammgr_realm_lookup(blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_upstreammgr_realm_add(blade_upstreammgr_t *bumgr, const char *realm);
 KS_DECLARE(ks_status_t) blade_upstreammgr_realm_remove(blade_upstreammgr_t *bumgr, const char *realm);
index 302524b210f121e10e7e9efe7346530688fc7cf0..351b6c935f0e7a72fdb31b81cbaa868580600bff 100644 (file)
@@ -290,10 +290,15 @@ void command_execute(blade_handle_t *bh, char *args)
 
 void command_subscribe(blade_handle_t *bh, char *args)
 {
+       cJSON *channels = NULL;
+
        ks_assert(bh);
        ks_assert(args);
 
-       blade_handle_rpcsubscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
+       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
+       cJSON_Delete(channels);
 }
 
 /* For Emacs:
index c8b7351c77e3b60018b1f49aa839ad88979915da..d3d149d868f9257a697b30fc4147e6eb607caf3a 100644 (file)
@@ -81,11 +81,12 @@ ks_bool_t test_echo_request_handler(blade_rpc_request_t *brpcreq, void *data)
        cJSON_AddStringToObject(result, "text", text);
 
        blade_rpcexecute_response_send(brpcreq, result);
+       cJSON_Delete(result);
 
        return KS_FALSE;
 }
 
-ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_broadcast_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -98,7 +99,7 @@ ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
        bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_response_sessionid_get(brpcres));
        ks_assert(bs);
 
-       ks_log(KS_LOG_DEBUG, "Session (%s) test.event response processing\n", blade_session_id_get(bs));
+       ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast response processing\n", blade_session_id_get(bs));
 
        blade_session_read_unlock(bs);
 
@@ -243,7 +244,7 @@ void command_publish(blade_handle_t *bh, char *args)
        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
        // @todo build up json-based method schema for each protocolrpc registered above, and pass into blade_handle_rpcpublish() to attach to the request, to be stored in the blade_protocol_t tracked by the master node
-       blade_handle_rpcpublish(bh, "test", "mydomain.com", blade_publish_response_handler, NULL);
+       blade_handle_rpcpublish(bh, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
 }
 
 void command_broadcast(blade_handle_t *bh, char *args)
@@ -251,7 +252,7 @@ void command_broadcast(blade_handle_t *bh, char *args)
        ks_assert(bh);
        ks_assert(args);
 
-       blade_handle_rpcbroadcast(bh, NULL, "test.event", "test", "mydomain.com", NULL, test_event_response_handler, NULL);
+       blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "event", NULL, test_broadcast_response_handler, NULL);
 }
 
 
index 2ad6ff229eceab31c8c766b9661a53c8c3faee62..be09279ee76a91bde53148f8ac513101230c131c 100644 (file)
@@ -33,7 +33,7 @@ static const struct command_def_s command_defs[] = {
 
 const char *g_testcon_nodeid = NULL;
 
-ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -87,7 +87,7 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data
        return KS_FALSE;
 }
 
-ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -111,7 +111,7 @@ ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
        return KS_FALSE;
 }
 
-ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -135,7 +135,7 @@ ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
        return KS_FALSE;
 }
 
-ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -159,11 +159,10 @@ ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
        return KS_FALSE;
 }
 
-ks_bool_t test_join_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
-       const char *broadcaster_nodeid = NULL;
        cJSON *params = NULL;
        //cJSON *result = NULL;
 
@@ -178,70 +177,7 @@ ks_bool_t test_join_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
        params = blade_rpcbroadcast_request_params_get(brpcreq);
        ks_assert(params);
 
-       broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
-       ks_assert(broadcaster_nodeid);
-
-       ks_log(KS_LOG_DEBUG, "Session (%s) test.join (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
-
-       blade_session_read_unlock(bs);
-
-       return KS_FALSE;
-}
-
-ks_bool_t test_leave_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
-{
-       blade_handle_t *bh = NULL;
-       blade_session_t *bs = NULL;
-       const char *broadcaster_nodeid = NULL;
-       cJSON *params = NULL;
-       //cJSON *result = NULL;
-
-       ks_assert(brpcreq);
-
-       bh = blade_rpc_request_handle_get(brpcreq);
-       ks_assert(bh);
-
-       bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
-       ks_assert(bs);
-
-       params = blade_rpcbroadcast_request_params_get(brpcreq);
-       ks_assert(params);
-
-       broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
-       ks_assert(broadcaster_nodeid);
-
-       ks_log(KS_LOG_DEBUG, "Session (%s) test.leave (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
-
-       blade_session_read_unlock(bs);
-
-       return KS_FALSE;
-}
-
-ks_bool_t test_talk_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
-{
-       blade_handle_t *bh = NULL;
-       blade_session_t *bs = NULL;
-       const char *broadcaster_nodeid = NULL;
-       cJSON *params = NULL;
-       //cJSON *result = NULL;
-
-       ks_assert(brpcreq);
-
-       bh = blade_rpc_request_handle_get(brpcreq);
-       ks_assert(bh);
-
-       bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
-       ks_assert(bs);
-
-       broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
-       ks_assert(broadcaster_nodeid);
-
-       params = blade_rpcbroadcast_request_params_get(brpcreq);
-       ks_assert(params);
-
-       // @todo pull out text from params
-
-       ks_log(KS_LOG_DEBUG, "Session (%s) test.talk (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
+       ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast processing\n", blade_session_id_get(bs));
 
        blade_session_read_unlock(bs);
 
@@ -391,6 +327,7 @@ void command_locate(blade_handle_t *bh, char *args)
 void command_join(blade_handle_t *bh, char *args)
 {
        cJSON *params = NULL;
+       cJSON *channels = NULL;
 
        ks_assert(bh);
        ks_assert(args);
@@ -402,17 +339,19 @@ void command_join(blade_handle_t *bh, char *args)
 
 
        params = cJSON_CreateObject();
-
        blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", "mydomain.com", params, test_join_response_handler, NULL);
+       cJSON_Delete(params);
 
-       blade_handle_rpcsubscribe(bh, "test.join", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_join_broadcast_handler, NULL);
-       blade_handle_rpcsubscribe(bh, "test.leave", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_leave_broadcast_handler, NULL);
-       blade_handle_rpcsubscribe(bh, "test.talk", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_talk_broadcast_handler, NULL);
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
+       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_broadcast_handler, NULL);
+       cJSON_Delete(channels);
 }
 
 void command_leave(blade_handle_t *bh, char *args)
 {
        cJSON *params = NULL;
+       cJSON *channels = NULL;
 
        ks_assert(bh);
        ks_assert(args);
@@ -423,12 +362,13 @@ void command_leave(blade_handle_t *bh, char *args)
        }
 
        params = cJSON_CreateObject();
-
        blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", "mydomain.com", params, test_leave_response_handler, NULL);
+       cJSON_Delete(params);
 
-       blade_handle_rpcsubscribe(bh, "test.join", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
-       blade_handle_rpcsubscribe(bh, "test.leave", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
-       blade_handle_rpcsubscribe(bh, "test.talk", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
+       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, NULL, NULL, NULL, NULL);
+       cJSON_Delete(channels);
 }
 
 void command_talk(blade_handle_t *bh, char *args)
@@ -448,10 +388,9 @@ void command_talk(blade_handle_t *bh, char *args)
        }
 
        params = cJSON_CreateObject();
-
        cJSON_AddStringToObject(params, "text", args);
-
        blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", "mydomain.com", params, test_talk_response_handler, NULL);
+       cJSON_Delete(params);
 }
 
 /* For Emacs:
index 19ebe6d30a28d38b31987f8a80f061666b8ee8a3..52eab4069602ebf7a47650679c5fda852c0318d6 100644 (file)
@@ -88,7 +88,7 @@ ks_status_t testproto_destroy(testproto_t **testP)
        return KS_STATUS_SUCCESS;
 }
 
-ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *data)
+ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
 {
        //testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -97,7 +97,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *dat
        ks_assert(brpcres);
        ks_assert(data);
 
-       //test = (testproto_t *)data;
+       //test = (testproto_t *)cJSON_GetPtrValue(data);
 
        bh = blade_rpc_response_handle_get(brpcres);
        ks_assert(bh);
@@ -112,7 +112,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *dat
        return KS_FALSE;
 }
 
-ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -120,48 +120,70 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
        const char *requester_nodeid = NULL;
        const char *key = NULL;
        cJSON *params = NULL;
+       cJSON *channels = NULL;
        cJSON *result = NULL;
 
        ks_assert(brpcreq);
        ks_assert(data);
 
-       test = (testproto_t *)data;
+       test = (testproto_t *)cJSON_GetPtrValue(data);
 
        bh = blade_rpc_request_handle_get(brpcreq);
        ks_assert(bh);
 
+       // session for execute response
        bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
        ks_assert(bs);
 
        requester_nodeid = blade_rpcexecute_request_requester_nodeid_get(brpcreq);
        ks_assert(requester_nodeid);
 
+       // inner rpcexecute parameters
        params = blade_rpcexecute_request_params_get(brpcreq);
        ks_assert(params);
 
        ks_log(KS_LOG_DEBUG, "Session (%s) test.join request processing\n", blade_session_id_get(bs));
 
+       // add to participants
        key = ks_pstrdup(test->pool, requester_nodeid);
        ks_assert(key);
 
+       // @todo to properly maintain protocol details tied to a specific node like this participants list requires a way to know if a specific node of interest goes offline to cleanup associated details
+       // refer back to work notes on ideas about this
        ks_hash_write_lock(test->participants);
        ks_hash_insert(test->participants, (void *)key, (void *)KS_TRUE);
        ks_hash_write_unlock(test->participants);
 
-       blade_session_read_unlock(bs);
+       // authorize channels with the master for the requester
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+
+       blade_handle_rpcauthorize(bh, requester_nodeid, KS_FALSE, "test", "mydomain.com", channels, NULL, NULL);
+
+       cJSON_Delete(channels);
 
+       // send rpcexecute response to the requester
        result = cJSON_CreateObject();
 
        blade_rpcexecute_response_send(brpcreq, result);
 
+       cJSON_Delete(result);
+
+       blade_session_read_unlock(bs);
+
+       // broadcast to authorized nodes that have subscribed, that the requester has joined
        params = cJSON_CreateObject();
 
-       blade_handle_rpcbroadcast(bh, requester_nodeid, "test.join", "test", "mydomain.com", params, NULL, NULL);
+       cJSON_AddStringToObject(params, "joiner-nodeid", requester_nodeid);
+
+       blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "join", params, NULL, NULL);
+
+       cJSON_Delete(params);
 
        return KS_FALSE;
 }
 
-ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -174,7 +196,7 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
        ks_assert(brpcreq);
        ks_assert(data);
 
-       test = (testproto_t *)data;
+       test = (testproto_t *)cJSON_GetPtrValue(data);
 
        bh = blade_rpc_request_handle_get(brpcreq);
        ks_assert(bh);
@@ -202,12 +224,14 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
 
        params = cJSON_CreateObject();
 
-       blade_handle_rpcbroadcast(bh, requester_nodeid, "test.leave", "test", "mydomain.com", params, NULL, NULL);
+       cJSON_AddStringToObject(params, "leaver-nodeid", requester_nodeid);
+
+       blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "leave", params, NULL, NULL);
 
        return KS_FALSE;
 }
 
-ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
+ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 {
        //testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -220,7 +244,7 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
        ks_assert(brpcreq);
        ks_assert(data);
 
-       //test = (testproto_t *)data;
+       //test = (testproto_t *)cJSON_GetPtrValue(data);
 
        bh = blade_rpc_request_handle_get(brpcreq);
        ks_assert(bh);
@@ -249,7 +273,9 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
 
        cJSON_AddStringToObject(params, "text", text);
 
-       blade_handle_rpcbroadcast(bh, requester_nodeid, "test.talk", "test", "mydomain.com", params, NULL, NULL);
+       cJSON_AddStringToObject(params, "talker-nodeid", requester_nodeid);
+
+       blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "talk", params, NULL, NULL);
 
        return KS_FALSE;
 }
@@ -314,19 +340,24 @@ int main(int argc, char **argv)
                blade_identity_destroy(&target);
 
                if (connected) {
+                       cJSON *channels = NULL;
+
                        // @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
                        ks_sleep_ms(3000);
 
-                       blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, test);
+                       blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, cJSON_CreatePtr((uintptr_t)test));
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, test);
+                       blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, cJSON_CreatePtr((uintptr_t)test));
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, test);
+                       blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, cJSON_CreatePtr((uintptr_t)test));
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_handle_rpcpublish(bh, "test", "mydomain.com", test_publish_response_handler, test);
+                       channels = cJSON_CreateArray();
+                       cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+
+                       blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, cJSON_CreatePtr((uintptr_t)test));
                }
        }
 
index a90400906f4fdb23d92ddf5470338f6dbf2c2764..09ffaf5796ca5fd9ee04ef29253ec516fb46436e 100644 (file)
@@ -45,6 +45,9 @@ extern "C"
        
 KS_DECLARE(cJSON *) cJSON_CreateStringPrintf(const char *fmt, ...);
 KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *string);
+KS_DECLARE(cJSON *) cJSON_CreatePtr(uintptr_t pointer);
+KS_DECLARE(uintptr_t) cJSON_GetPtrValue(const cJSON *object);
+KS_DECLARE(uintptr_t) cJSON_GetObjectPtr(const cJSON *object, const char *string);
 
 static inline cJSON *ks_json_add_child_obj(cJSON *json, const char *name, cJSON *obj)
 {
index 44f8bafab08a677b06674c757c04703ba2ee13a1..d80dc9d7e7524ca05f08cba21e64af78db177487 100644 (file)
@@ -19,7 +19,7 @@ KS_DECLARE(cJSON *) cJSON_CreateStringPrintf(const char *fmt, ...)
        return item;
 }
 
-KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *string)
+KS_DECLARE(const char *) cJSON_GetObjectCstr(const cJSON *object, const char *string)
 {
        cJSON *cj = cJSON_GetObjectItem(object, string);
 
@@ -28,6 +28,25 @@ KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *str
           return cj->valuestring;
 }
 
+KS_DECLARE(cJSON *) cJSON_CreatePtr(uintptr_t pointer)
+{
+       // @todo check for 32bit and use integer storage instead
+       return cJSON_CreateStringPrintf("%p", (void *)pointer);
+}
+
+KS_DECLARE(uintptr_t) cJSON_GetPtrValue(const cJSON *object)
+{
+       // @todo check for 32bit and use integer storage instead
+       void *pointer = NULL;
+       if (object && object->type == cJSON_String) sscanf_s(object->valuestring, "%p", &pointer);
+       return (uintptr_t)pointer;
+}
+
+KS_DECLARE(uintptr_t) cJSON_GetObjectPtr(const cJSON *object, const char *string)
+{
+       return cJSON_GetPtrValue(cJSON_GetObjectItem(object, string));
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c