]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: Updated publish and broadcast to support adding and removing channels at...
authorShane Bryldt <astaelan@gmail.com>
Fri, 18 Aug 2017 22:30:08 +0000 (16:30 -0600)
committerShane Bryldt <astaelan@gmail.com>
Fri, 18 Aug 2017 22:30:08 +0000 (16:30 -0600)
15 files changed:
libs/libblade/src/blade_mastermgr.c
libs/libblade/src/blade_protocol.c
libs/libblade/src/blade_stack.c
libs/libblade/src/blade_subscriptionmgr.c
libs/libblade/src/include/blade_mastermgr.h
libs/libblade/src/include/blade_protocol.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_subscriptionmgr.h
libs/libblade/src/include/blade_types.h
libs/libblade/test/bladec.c
libs/libblade/test/blades.c
libs/libblade/test/testcli.c
libs/libblade/test/testcon.c
libs/libks/src/include/ks_pool.h
libs/libks/src/ks_pool.c

index ec0c4fce6502f20b9c20ab917c9f3e889910feba..9693806b57693961bbdeb7ed007a92ccae14acfe 100644 (file)
@@ -122,8 +122,12 @@ KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const ch
                ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
 
                if (blade_protocol_purge(bp, nodeid)) {
-                       if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
-                       ks_hash_insert(cleanup, (void *)key, bp);
+                       if (!blade_protocol_controller_available(bp)) {
+                               if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+                               ks_hash_insert(cleanup, (void *)key, bp);
+                       } else {
+                               // @todo not the last controller, may need to propagate that the controller is no longer available?
+                       }
                }
        }
        if (cleanup) {
@@ -133,6 +137,8 @@ KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const ch
 
                        ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
 
+                       blade_subscriptionmgr_broadcast(blade_handle_subscriptionmgr_get(bmmgr->handle), BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE, NULL, blade_protocol_name_get(bp), blade_protocol_realm_get(bp), NULL, NULL, NULL, NULL, NULL);
+
                        ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
                        ks_hash_remove(bmmgr->protocols, (void *)key);
                }
@@ -192,12 +198,49 @@ KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr,
                ks_hash_insert(bmmgr->protocols, (void *)ks_pstrdup(pool, key), bp);
        }
 
-       blade_protocol_controllers_add(bp, controller);
+       blade_protocol_controller_add(bp, controller);
+
+       ks_hash_write_unlock(bmmgr->protocols);
 
        ks_pool_free(&key);
 
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller)
+{
+       ks_pool_t *pool = NULL;
+       blade_protocol_t *bp = NULL;
+       char *key = NULL;
+
+       ks_assert(bmmgr);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(controller);
+
+       pool = ks_pool_get(bmmgr);
+
+       key = ks_psprintf(pool, "%s@%s", protocol, realm);
+
+       ks_hash_write_lock(bmmgr->protocols);
+
+       bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_UNLOCKED);
+       if (bp) {
+               if (blade_protocol_controller_remove(bp, controller)) {
+                       if (!blade_protocol_controller_available(bp)) {
+                               // @todo broadcast protocol removal to remove all channel subscriptions
+                               ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
+                               ks_hash_remove(bmmgr->protocols, (void *)key);
+                       } else {
+                               // @todo not the last controller, may need to propagate when a specific controller becomes unavailable though
+                       }
+               }
+       }
+
        ks_hash_write_unlock(bmmgr->protocols);
 
+       ks_pool_free(&key);
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -249,7 +292,9 @@ KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr,
                goto done;
        }
 
-       blade_protocol_channel_remove(bp, channel);
+       if (blade_protocol_channel_remove(bp, channel)) {
+               blade_subscriptionmgr_broadcast(blade_handle_subscriptionmgr_get(bmmgr->handle), BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE, NULL, blade_protocol_name_get(bp), blade_protocol_realm_get(bp), channel, NULL, NULL, NULL, NULL);
+       }
 
 done:
        ks_pool_free(&key);
index 836642be6e4904d491d2c657425d6c5dcbdfc074..e3de68a77f57b6ed3c1885242c0ceb0d22950315 100644 (file)
@@ -98,10 +98,20 @@ KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
+KS_DECLARE(const char *) blade_protocol_name_get(blade_protocol_t *bp)
 {
-       ks_bool_t ret = KS_FALSE;
+       ks_assert(bp);
+       return bp->name;
+}
+
+KS_DECLARE(const char *) blade_protocol_realm_get(blade_protocol_t *bp)
+{
+       ks_assert(bp);
+       return bp->realm;
+}
 
+KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
+{
        ks_assert(bp);
        ks_assert(nodeid);
 
@@ -119,22 +129,16 @@ KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nod
        }
        ks_hash_write_unlock(bp->channels);
 
-       ks_hash_write_lock(bp->controllers);
-       if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
-               ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
-       }
-       ret = ks_hash_count(bp->controllers) == 0;
-       ks_hash_write_unlock(bp->controllers);
-
-       return ret;
+       return blade_protocol_controller_remove(bp, nodeid);
 }
 
-KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
+KS_DECLARE(cJSON *) blade_protocol_controller_pack(blade_protocol_t *bp)
 {
        cJSON *controllers = cJSON_CreateObject();
 
        ks_assert(bp);
 
+       ks_hash_read_lock(bp->controllers);
        for (ks_hash_iterator_t *it = ks_hash_first(bp->controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
                const char *key = NULL;
                void *value = NULL;
@@ -143,11 +147,12 @@ KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
 
                cJSON_AddItemToArray(controllers, cJSON_CreateString(key));
        }
+       ks_hash_read_unlock(bp->controllers);
 
        return controllers;
 }
 
-KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid)
+KS_DECLARE(ks_status_t) blade_protocol_controller_add(blade_protocol_t *bp, const char *nodeid)
 {
        char *key = NULL;
 
@@ -162,6 +167,29 @@ KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, con
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(ks_bool_t) blade_protocol_controller_remove(blade_protocol_t *bp, const char *nodeid)
+{
+       ks_bool_t ret  = KS_FALSE;
+
+       ks_assert(bp);
+       ks_assert(nodeid);
+
+       ks_hash_write_lock(bp->controllers);
+       if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
+               ret = KS_TRUE;
+               ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
+       }
+       ks_hash_write_unlock(bp->controllers);
+
+       return ret;
+}
+
+KS_DECLARE(ks_bool_t) blade_protocol_controller_available(blade_protocol_t *bp)
+{
+       ks_assert(bp);
+       return ks_hash_count(bp->controllers) > 0;
+}
+
 KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -195,16 +223,23 @@ done:
        return ret;
 }
 
-KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
+KS_DECLARE(ks_bool_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
 {
+       ks_bool_t ret = KS_FALSE;
+       ks_hash_t *authorized = NULL;
+
        ks_assert(bp);
        ks_assert(name);
 
-       ks_hash_remove(bp->channels, (void *)name);
-
-       ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
+       ks_hash_write_lock(bp->channels);
+       if ((authorized = ks_hash_remove(bp->channels, (void *)name))) {
+               ret = KS_TRUE;
+               ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
+               ks_hash_destroy(&authorized);
+       }
+       ks_hash_write_unlock(bp->channels);
 
-       return KS_STATUS_SUCCESS;
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target)
@@ -233,8 +268,7 @@ KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, k
                        if (ks_hash_remove(authorizations, (void *)target)) {
                                ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", target, bp->name, bp->realm, channel);
                        } else ret = KS_STATUS_NOT_FOUND;
-               }
-               else {
+               } else {
                        ks_hash_insert(authorizations, (void *)ks_pstrdup(ks_pool_get(bp), target), (void *)KS_TRUE);
                        ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Added: %s to %s@%s/%s\n", target, bp->name, bp->realm, channel);
                }
index c5f6c7d00647c634b486356f86950a85ff31876e..315c4f72150717012b9883dd0a85a44e09199534 100644 (file)
@@ -352,7 +352,7 @@ struct blade_rpcsubscribe_data_s {
        const char *relayed_messageid;
 };
 
-ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
+ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, blade_rpcsubscribe_command_t command, const char *protocol, const char *realm, cJSON *channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
 
 // blade.register request generator
 KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
@@ -456,7 +456,7 @@ done:
 
 
 // blade.publish request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpublish_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -478,9 +478,29 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
        pool = ks_pool_get(bh);
        ks_assert(pool);
 
+       // @todo validate command and parameters
+       switch (command) {
+       case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
+               break;
+       case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
+               break;
+       case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
+               if (!channels || cJSON_GetArraySize(channels) <= 0) {
+                       ret = KS_STATUS_ARG_NULL;
+                       goto done;
+               }
+               break;
+       case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE:
+               break;
+       default:
+               ret = KS_STATUS_ARG_INVALID;
+               goto done;
+       }
+
+       // create the response
        blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.publish");
 
-       // fill in the req_params
+       cJSON_AddNumberToObject(req_params, "command", command);
        cJSON_AddStringToObject(req_params, "protocol", protocol);
        cJSON_AddStringToObject(req_params, "realm", realm);
 
@@ -496,10 +516,6 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
        cJSON_AddStringToObject(req_params, "responder-nodeid", id);
        ks_pool_free(&id);
 
-       // @todo may want to switch this system to use a blade_rpcpublish_args_t with validation on the contents on this list internally
-       // and to produce the entire json block internally in case the channel args change to include additional information like an encryption key,
-       // however if passing encryption keys then they should be asymetrically encrypted using a public key provided by the master so that
-       // the channel keys can be transmitted without intermediate nodes being able to snoop them
        if (channels) cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
 
        // @todo add a parameter containing a block of json for schema definitions for each of the methods being published
@@ -523,6 +539,8 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        cJSON *req = NULL;
        cJSON *req_params = NULL;
        cJSON *req_params_channels = NULL;
+       cJSON *req_params_command = NULL;
+       blade_rpcpublish_command_t command = BLADE_RPCPUBLISH_COMMAND_NONE;
        const char *req_params_protocol = NULL;
        const char *req_params_realm = NULL;
        const char *req_params_requester_nodeid = NULL;
@@ -549,6 +567,22 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                goto done;
        }
 
+       req_params_command = cJSON_GetObjectItem(req_params, "command");
+       if (!req_params_command) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'command'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+       command = (blade_rpcpublish_command_t)req_params_command->valueint;
+       switch (command) {
+       case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
+       case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
+       case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
+       case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE: break;
+       default: goto done;
+       }
+
        req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'protocol'\n", blade_session_id_get(bs));
@@ -590,9 +624,12 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                goto done;
        }
 
+       // @todo get enumeration parameter to represent a publish command, including add_protocol, remove_protocol, and update_channels
+       // @todo switch channels to separate add_channels and remove_channels
+
        req_params_channels = cJSON_GetObjectItem(req_params, "channels");
        if (req_params_channels) {
-               int size = 0;
+               cJSON *element = NULL;
 
                if (req_params_channels->type != cJSON_Array) {
                        ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
@@ -601,9 +638,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                        goto done;
                }
 
-               size = cJSON_GetArraySize(req_params_channels);
-               for (int index = 0; index < size; ++index) {
-                       cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
+               cJSON_ArrayForEach(element, req_params_channels) {
                        if (element->type != cJSON_String) {
                                ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
                                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
@@ -615,16 +650,41 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
 
        ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
 
-       blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
-
-       if (req_params_channels) {
-               int size = cJSON_GetArraySize(req_params_channels);
-               for (int index = 0; index < size; ++index) {
-                       cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
-                       blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+       // @todo switch on publish command, make the following code for add_protocol
+       switch (command) {
+       case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
+               blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
+               if (req_params_channels) {
+                       cJSON *element = NULL;
+                       cJSON_ArrayForEach(element, req_params_channels) {
+                               blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+                       }
                }
+               break;
+       case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
+               blade_mastermgr_controller_remove(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
+               break;
+       case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
+               if (req_params_channels) {
+                       cJSON *element = NULL;
+                       cJSON_ArrayForEach(element, req_params_channels) {
+                               blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+                       }
+               }
+               break;
+       case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE:
+               if (req_params_channels) {
+                       cJSON *element = NULL;
+                       cJSON_ArrayForEach(element, req_params_channels) {
+                               blade_mastermgr_channel_remove(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+                       }
+               }
+               break;
+       default:
+               goto done;
        }
 
+
        // build the actual response finally
        blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
 
@@ -632,6 +692,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        cJSON_AddStringToObject(res_result, "realm", req_params_realm);
        cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
        cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
+       // @todo include a list of channels that failed to be added or removed if applicable?
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
        blade_session_send(bs, res, NULL, NULL);
@@ -855,7 +916,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        blade_session_send(bs, res, NULL, NULL);
 
        if (res_result_unauthorized_channels) {
-               blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, NULL, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
+               blade_handle_rpcsubscribe_raw(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, req_params_protocol, req_params_realm, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
        }
 
 done:
@@ -1000,7 +1061,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
 
        bp = blade_mastermgr_protocol_lookup(bh->mastermgr, req_params_protocol, req_params_realm);
-       if (bp) res_result_controllers = blade_protocol_controllers_pack(bp);
+       if (bp) res_result_controllers = blade_protocol_controller_pack(bp);
 
 
        // build the actual response finally
@@ -1325,7 +1386,15 @@ static void blade_rpcsubscribe_data_cleanup(void *ptr, void *arg, ks_pool_cleanu
 }
 
 // blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh,
+                                                                                                 blade_rpcsubscribe_command_t command,
+                                                                                                 const char *protocol,
+                                                                                                 const char *realm,
+                                                                                                 cJSON *channels,
+                                                                                                 blade_rpc_response_callback_t callback,
+                                                                                                 void *data,
+                                                                                                 blade_rpc_request_callback_t channel_callback,
+                                                                                                 void *channel_data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        ks_pool_t *pool = NULL;
@@ -1336,7 +1405,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
        ks_assert(bh);
        ks_assert(protocol);
        ks_assert(realm);
-       ks_assert(subscribe_channels || unsubscribe_channels);
+       ks_assert(channels);
 
        pool = ks_pool_get(bh);
        ks_assert(pool);
@@ -1359,7 +1428,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
        temp_data->channel_data = channel_data;
        ks_pool_set_cleanup(temp_data, NULL, blade_rpcsubscribe_data_cleanup);
 
-       ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
+       ret = blade_handle_rpcsubscribe_raw(bh, command, protocol, realm, channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
 
        ks_pool_free(&localid);
 
@@ -1369,7 +1438,15 @@ done:
        return ret;
 }
 
-ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data)
+ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh,
+                                                                                 blade_rpcsubscribe_command_t command,
+                                                                                 const char *protocol,
+                                                                                 const char *realm,
+                                                                                 cJSON *channels,
+                                                                                 const char *subscriber,
+                                                                                 ks_bool_t downstream,
+                                                                                 blade_rpc_response_callback_t callback,
+                                                                                 blade_rpcsubscribe_data_t *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -1380,12 +1457,12 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protoc
        ks_assert(bh);
        ks_assert(protocol);
        ks_assert(realm);
-       ks_assert(subscribe_channels || unsubscribe_channels);
+       ks_assert(channels);
        ks_assert(subscriber);
 
        if (downstream) {
-               // @note if a master is sending a downstream update, it may only use unsubscribe_channels, cannot force a subscription without a subscriber callback
-               if (subscribe_channels) {
+               // @note if a master is sending a downstream update, it may only remove subscriptions, it cannot force a subscription without the subscriber providing the callback
+               if (command != BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
                        ret = KS_STATUS_NOT_ALLOWED;
                        goto done;
                }
@@ -1402,22 +1479,21 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protoc
        pool = ks_pool_get(bh);
        ks_assert(pool);
 
-       if (unsubscribe_channels) {
+       if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
                cJSON *channel = NULL;
-               cJSON_ArrayForEach(channel, unsubscribe_channels) {
+               cJSON_ArrayForEach(channel, channels) {
                        blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, protocol, realm, channel->valuestring, subscriber);
                }
        }
 
        blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
 
+       cJSON_AddNumberToObject(req_params, "command", command);
        cJSON_AddStringToObject(req_params, "protocol", protocol);
        cJSON_AddStringToObject(req_params, "realm", realm);
        cJSON_AddStringToObject(req_params, "subscriber-nodeid", subscriber);
        if (downstream) cJSON_AddTrueToObject(req_params, "downstream");
-
-       if (subscribe_channels) cJSON_AddItemToObject(req_params, "subscribe-channels", cJSON_Duplicate(subscribe_channels, 1));
-       if (unsubscribe_channels) cJSON_AddItemToObject(req_params, "unsubscribe-channels", cJSON_Duplicate(unsubscribe_channels, 1));
+       cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
 
        ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
 
@@ -1438,13 +1514,14 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        ks_pool_t *pool = NULL;
        cJSON *req = NULL;
        cJSON *req_params = NULL;
+       cJSON *req_params_command = NULL;
+       blade_rpcsubscribe_command_t command = BLADE_RPCSUBSCRIBE_COMMAND_NONE;
        const char *req_params_protocol = NULL;
        const char *req_params_realm = NULL;
        const char *req_params_subscriber_nodeid = NULL;
        cJSON *req_params_downstream = NULL;
        ks_bool_t downstream = KS_FALSE;
-       cJSON *req_params_subscribe_channels = NULL;
-       cJSON *req_params_unsubscribe_channels = NULL;
+       cJSON *req_params_channels = NULL;
        ks_bool_t masterlocal = KS_FALSE;
        cJSON *res = NULL;
        cJSON *res_result = NULL;
@@ -1473,6 +1550,20 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
                goto done;
        }
 
+       req_params_command = cJSON_GetObjectItem(req_params, "command");
+       if (!req_params_command) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'command'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+       command = (blade_rpcsubscribe_command_t)req_params_command->valueint;
+       switch (command) {
+       case BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD:
+       case BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE: break;
+       default: goto done;
+       }
+
        req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
@@ -1502,12 +1593,11 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        req_params_downstream = cJSON_GetObjectItem(req_params, "downstream");
        downstream = req_params_downstream && req_params_downstream->type == cJSON_True;
 
-       req_params_subscribe_channels = cJSON_GetObjectItem(req_params, "subscribe-channels");
-       req_params_unsubscribe_channels = cJSON_GetObjectItem(req_params, "unsubscribe-channels");
+       req_params_channels = cJSON_GetObjectItem(req_params, "channels");
 
-       if (!req_params_subscribe_channels && !req_params_unsubscribe_channels) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscribe-channels' or 'unsubscribe-channels'\n", blade_session_id_get(bs));
-               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscribe-channels or unsubscribe-channels");
+       if (!req_params_channels) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'channels'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
                blade_session_send(bs, res, NULL, NULL);
                goto done;
        }
@@ -1521,9 +1611,9 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
                // @note This is normally handled by blade_handle_rpcsubscribe_raw() to ensure authorization removals are processed during the request path
                // including on the node they start on, whether that is the master or the subscriber
-               if (req_params_unsubscribe_channels) {
+               if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
                        cJSON *channel = NULL;
-                       cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
+                       cJSON_ArrayForEach(channel, req_params_channels) {
                                blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
                        }
                }
@@ -1535,11 +1625,11 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
                cJSON_AddStringToObject(res_result, "subscriber-nodeid", req_params_subscriber_nodeid);
                if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
 
-               if (req_params_subscribe_channels) {
-                       // @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request containing subscribe-channels
+               if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD) {
+                       // @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request to add a subscriber
                        cJSON *channel = NULL;
 
-                       cJSON_ArrayForEach(channel, req_params_subscribe_channels) {
+                       cJSON_ArrayForEach(channel, req_params_channels) {
                                if (blade_mastermgr_channel_verify(bh->mastermgr, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid)) {
                                        blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
                                        if (!res_result_subscribe_channels) res_result_subscribe_channels = cJSON_CreateArray();
@@ -1563,7 +1653,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
                temp_data->relayed_messageid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
                ks_pool_set_cleanup(temp_data, NULL, blade_rpcsubscribe_data_cleanup);
 
-               blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
+               blade_handle_rpcsubscribe_raw(bh, command, req_params_protocol, req_params_realm, req_params_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
        }
 
 done:
@@ -1694,11 +1784,10 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(bh);
-       ks_assert(event);
        ks_assert(protocol);
        ks_assert(realm);
 
-       ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, NULL,  protocol, realm, channel, event, params, callback, data);
+       ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL,  protocol, realm, channel, event, params, callback, data);
 
        // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
        // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
@@ -1707,6 +1796,8 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
        return ret;
 }
 
+// @todo blade_handle_rpcbroadcast_raw() to encapsulate adding subcommands to broadcast to support protocol removal, protocol channel removal, and normal event broadcast
+
 // blade.broadcast request handler
 ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
@@ -1715,6 +1806,8 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        blade_session_t *bs = NULL;
        cJSON *req = NULL;
        cJSON *req_params = NULL;
+       cJSON *req_params_command = NULL;
+       blade_rpcbroadcast_command_t command = BLADE_RPCBROADCAST_COMMAND_NONE;
        const char *req_params_protocol = NULL;
        const char *req_params_realm = NULL;
        const char *req_params_channel = NULL;
@@ -1760,41 +1853,57 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
                goto done;
        }
 
-       req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
-       if (!req_params_channel) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
-               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
+       req_params_command = cJSON_GetObjectItem(req_params, "command");
+       if (!req_params_command) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'command'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
                blade_session_send(bs, res, NULL, NULL);
                goto done;
        }
-
-       req_params_event = cJSON_GetObjectCstr(req_params, "event");
-       if (!req_params_event) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
-               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
-               blade_session_send(bs, res, NULL, NULL);
-               goto done;
+       command = (blade_rpcbroadcast_command_t)req_params_command->valueint;
+       switch (command) {
+       case BLADE_RPCBROADCAST_COMMAND_EVENT:
+               req_params_event = cJSON_GetObjectCstr(req_params, "event");
+               if (!req_params_event) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
+                       blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+                       blade_session_send(bs, res, NULL, NULL);
+                       goto done;
+               }
+       case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
+               req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
+               if (!req_params_channel) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
+                       blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
+                       blade_session_send(bs, res, NULL, NULL);
+                       goto done;
+               }
+               break;
+       case BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE: break;
+       default: goto done;
        }
 
        req_params_params = cJSON_GetObjectItem(req_params, "params");
 
-       blade_subscriptionmgr_broadcast(bh->subscriptionmgr, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
+       blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
 
-       bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
-       if (bsub) {
-               const char *localid = NULL;
-               ks_pool_t *pool = NULL;
+       if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
+               bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
+               if (bsub) {
+                       const char *localid = NULL;
+                       ks_pool_t *pool = NULL;
 
-               pool = ks_pool_get(bh);
+                       pool = ks_pool_get(bh);
 
-               blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
-               ks_assert(localid);
+                       blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
+                       ks_assert(localid);
 
-               if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
-                       callback = blade_subscription_callback_get(bsub);
-                       if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
+                       if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
+                               callback = blade_subscription_callback_get(bsub);
+                               if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
+                       }
+                       ks_pool_free(&localid);
                }
-               ks_pool_free(&localid);
        }
 
        // build the actual response finally
@@ -1803,8 +1912,8 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        // @todo this is not neccessary, can obtain this from the original request
        cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
        cJSON_AddStringToObject(res_result, "realm", req_params_realm);
-       cJSON_AddStringToObject(res_result, "channel", req_params_channel);
-       cJSON_AddStringToObject(res_result, "event", req_params_event);
+       if (req_params_channel) cJSON_AddStringToObject(res_result, "channel", req_params_channel);
+       if (req_params_event) cJSON_AddStringToObject(res_result, "event", req_params_event);
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
        blade_session_send(bs, res, NULL, NULL);
index d2f96e31ad1002d69a63ae04c8b0790d02e749d6..75c11eef61b34406ac2558ba0780fa11e290f483 100644 (file)
@@ -145,6 +145,56 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla
        return bsub;
 }
 
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel)
+{
+       ks_pool_t *pool = NULL;
+       char *bsub_key = NULL;
+       blade_subscription_t *bsub = NULL;
+       ks_hash_t *subscribers = NULL;
+       ks_hash_t *subscriptions = NULL;
+
+       ks_assert(bsmgr);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(channel);
+
+       pool = ks_pool_get(bsmgr);
+
+       bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
+
+       ks_hash_write_lock(bsmgr->subscriptions);
+
+       bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+
+       subscribers = blade_subscription_subscribers_get(bsub);
+       ks_assert(subscribers);
+
+       for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               void *key = NULL;
+               void *value = NULL;
+
+               ks_hash_this(it, (const void **)&key, NULL, &value);
+
+               subscriptions = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, key, KS_UNLOCKED);
+
+               ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", key, bsub_key);
+               ks_hash_remove(subscriptions, bsub_key);
+
+               if (ks_hash_count(subscriptions) == 0) {
+                       ks_hash_remove(bsmgr->subscriptions_cleanup, key);
+               }
+       }
+
+       ks_log(KS_LOG_DEBUG, "Subscription Removed: %s\n", bsub_key);
+       ks_hash_remove(bsmgr->subscriptions, (void *)bsub_key);
+
+       ks_hash_write_unlock(bsmgr->subscriptions);
+
+       ks_pool_free(&bsub_key);
+
+       return KS_STATUS_SUCCESS;
+}
+
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
 {
        ks_pool_t *pool = NULL;
@@ -295,7 +345,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con
        }
 }
 
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
 {
        ks_pool_t *pool = NULL;
        const char *bsub_key = NULL;
@@ -303,67 +353,152 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
        blade_session_t *bs = NULL;
        cJSON *req = NULL;
        cJSON *req_params = NULL;
+       ks_hash_t *routers = NULL;
+       ks_hash_t *channels = NULL;
 
        ks_assert(bsmgr);
        ks_assert(protocol);
        ks_assert(realm);
-       ks_assert(channel);
 
        pool = ks_pool_get(bsmgr);
 
-       bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
+       switch (command) {
+       case BLADE_RPCBROADCAST_COMMAND_EVENT:
+               ks_assert(event);
+       case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
+               ks_assert(channel);
+               bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
 
-       blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.broadcast");
-       cJSON_AddStringToObject(req_params, "protocol", protocol);
-       cJSON_AddStringToObject(req_params, "realm", realm);
-       cJSON_AddStringToObject(req_params, "channel", channel);
-       cJSON_AddStringToObject(req_params, "event", event);
-       if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+               ks_hash_read_lock(bsmgr->subscriptions);
 
-       ks_hash_read_lock(bsmgr->subscriptions);
+               bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+               if (bsub) {
+                       ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
 
-       bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
-       if (bsub) {
-               ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
+                       ks_assert(subscribers);
+
+                       for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                               void *key = NULL;
+                               void *value = NULL;
+
+                               ks_hash_this(it, (const void **)&key, NULL, &value);
 
-               ks_assert(subscribers);
+                               if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
 
-               for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                               // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
+                               if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
+
+                               bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
+                               if (bs) {
+                                       if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+                                       if (!ks_hash_search(routers, blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, blade_session_id_get(bs), bs);
+                                       else blade_session_read_unlock(bs);
+                               }
+                       }
+                       if (command == BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE) {
+                               if (!channels) ks_hash_create(&channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+                               ks_hash_insert(channels, channel, (void *)KS_TRUE);
+                       }
+               }
+
+               ks_hash_read_unlock(bsmgr->subscriptions);
+               break;
+       case BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE:
+               bsub_key = ks_psprintf(pool, "%s@%s", protocol, realm);
+
+               ks_hash_read_lock(bsmgr->subscriptions);
+
+               for (ks_hash_iterator_t *it = ks_hash_first(bsmgr->subscriptions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
                        void *key = NULL;
                        void *value = NULL;
 
                        ks_hash_this(it, (const void **)&key, NULL, &value);
 
-                       if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
+                       bsub = (blade_subscription_t *)value;
 
-                       // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
-                       if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
+                       if (ks_stristr(bsub_key, (const char *)key) == (const char *)key) {
+                               ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
 
-                       bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
-                       if (bs) {
-                               ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
+                               ks_assert(subscribers);
 
-                               blade_session_send(bs, req, callback, data);
+                               for (ks_hash_iterator_t *it2 = ks_hash_first(subscribers, KS_UNLOCKED); it2; it2 = ks_hash_next(&it2)) {
+                                       void *key2 = NULL;
+                                       void *value2 = NULL;
 
-                               blade_session_read_unlock(bs);
+                                       ks_hash_this(it2, (const void **)&key2, NULL, &value2);
+
+                                       if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
+
+                                       // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
+                                       if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key2)) continue;
+
+                                       bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2);
+                                       if (bs) {
+                                               if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+                                               if (!ks_hash_search(routers, blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, blade_session_id_get(bs), bs);
+                                               else blade_session_read_unlock(bs);
+                                       }
+                               }
+
+                               if (!channels) ks_hash_create(&channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+                               ks_hash_insert(channels, blade_subscription_channel_get(bsub), (void *)KS_TRUE);
                        }
                }
+
+               ks_hash_read_unlock(bsmgr->subscriptions);
+               break;
+       default: return KS_STATUS_ARG_INVALID;
        }
 
-       ks_hash_read_unlock(bsmgr->subscriptions);
 
        bs = blade_upstreammgr_session_get(blade_handle_upstreammgr_get(bsmgr->handle));
        if (bs) {
                if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
+                       if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+                       ks_hash_insert(routers, blade_session_id_get(bs), bs);
+               }
+               else blade_session_read_unlock(bs);
+       }
+
+       if (channels) {
+               for (ks_hash_iterator_t *it = ks_hash_first(channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       void *key = NULL;
+                       void *value = NULL;
+
+                       ks_hash_this(it, (const void **)&key, NULL, &value);
+
+                       blade_subscriptionmgr_subscription_remove(bsmgr, protocol, realm, (const char *)key);
+               }
+               ks_hash_destroy(&channels);
+       }
+
+       if (routers) {
+               for (ks_hash_iterator_t *it = ks_hash_first(routers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       void *key = NULL;
+                       void *value = NULL;
+
+                       ks_hash_this(it, (const void **)&key, NULL, &value);
+
+                       bs = (blade_session_t *)value;
+
+                       blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.broadcast");
+                       cJSON_AddNumberToObject(req_params, "command", command);
+                       cJSON_AddStringToObject(req_params, "protocol", protocol);
+                       cJSON_AddStringToObject(req_params, "realm", realm);
+                       if (channel) cJSON_AddStringToObject(req_params, "channel", channel);
+                       if (event) cJSON_AddStringToObject(req_params, "event", event);
+                       if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+
                        ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
 
                        blade_session_send(bs, req, callback, data);
-               }
 
-               blade_session_read_unlock(bs);
-       }
+                       cJSON_Delete(req);
 
-       cJSON_Delete(req);
+                       blade_session_read_unlock(bs);
+               }
+               ks_hash_destroy(&routers);
+       }
 
        ks_pool_free(&bsub_key);
 
index db1d993b85052445ccd8b19b7f8586eed4bf0842..50fd67cae2a3967d7a4d93a7e5c0aa8e8989737e 100644 (file)
@@ -42,6 +42,7 @@ KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr
 KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid);
 KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm);
 KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
+KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
 KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
 KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
 KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target);
index 7e9e5242babc9540aa383382e6ae034fb728a014..9a31b335b58f0f989fcdb9d8ab7f67e0911b4b62 100644 (file)
 KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
 KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
+KS_DECLARE(const char *) blade_protocol_name_get(blade_protocol_t *bp);
+KS_DECLARE(const char *) blade_protocol_realm_get(blade_protocol_t *bp);
 KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid);
-KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid);
-KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp);
+KS_DECLARE(cJSON *) blade_protocol_controller_pack(blade_protocol_t *bp);
+KS_DECLARE(ks_status_t) blade_protocol_controller_add(blade_protocol_t *bp, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_protocol_controller_remove(blade_protocol_t *bp, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_protocol_controller_available(blade_protocol_t *bp);
 KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name);
-KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
+KS_DECLARE(ks_bool_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
 KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target);
 KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target);
 KS_END_EXTERN_C
index c08fd52090ec0b380b2c5174829008683fc3953d..f64326ffde3285624d5e36eff88d84e0b2c539ac 100644 (file)
@@ -60,7 +60,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
 
 KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpublish_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
 
 KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
 
@@ -73,7 +73,7 @@ KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brp
 KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
 KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, blade_rpcsubscribe_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
 
 KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
 KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
index f903d23012c6778f36c814da1fce3179761f6f71..045130c2e507f6e416edf704695a8df575c85fa2 100644 (file)
@@ -40,10 +40,11 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_create(blade_subscriptionmgr_t **b
 KS_DECLARE(ks_status_t) blade_subscriptionmgr_destroy(blade_subscriptionmgr_t **bsmgrP);
 KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscriptionmgr_t *bsmgr);
 KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
 KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
 KS_END_EXTERN_C
 
 #endif
index 2c1eee0396e64453a023c6fe48f68deddf370d63..d6bf895ff50850f8adcb2205997d40ba6a605c50 100644 (file)
@@ -134,6 +134,28 @@ struct blade_transport_callbacks_s {
 typedef void (*blade_session_callback_t)(blade_session_t *bs, blade_session_state_condition_t condition, void *data);
 
 
+typedef enum {
+       BLADE_RPCPUBLISH_COMMAND_NONE,
+       BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD,
+       BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE,
+       BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD,
+       BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE,
+} blade_rpcpublish_command_t;
+
+typedef enum {
+       BLADE_RPCSUBSCRIBE_COMMAND_NONE,
+       BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD,
+       BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE,
+} blade_rpcsubscribe_command_t;
+
+typedef enum {
+       BLADE_RPCBROADCAST_COMMAND_NONE,
+       BLADE_RPCBROADCAST_COMMAND_EVENT,
+       BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE,
+       BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE,
+} blade_rpcbroadcast_command_t;
+
+
 KS_END_EXTERN_C
 
 #endif
index ae46df9258524a35571dfd13e1ea48a110c81834..cb6a2fb4bcc03fe2aa83a7482cd6155c3d3316ea 100644 (file)
@@ -297,7 +297,7 @@ void command_subscribe(blade_handle_t *bh, char *args)
 
        channels = cJSON_CreateArray();
        cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
-       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
+       blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD, "test", "mydomain.com", channels, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
        cJSON_Delete(channels);
 }
 
index 7453f5b8ef5fd971027028147110dc4e6ed5ff39..668d6ad2b77b1885c77fd03f10bcaaa95bbf9691 100644 (file)
@@ -244,7 +244,7 @@ void command_publish(blade_handle_t *bh, char *args)
        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
        // @todo build up json-based method schema for each protocolrpc registered above, and pass into blade_handle_rpcpublish() to attach to the request, to be stored in the blade_protocol_t tracked by the master node
-       blade_handle_rpcpublish(bh, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
+       blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
 }
 
 void command_broadcast(blade_handle_t *bh, char *args)
index 1ae23841687ad8e07b92370697511f83b2ba7f8f..213c1593863a3eaee0bf640403d09f831cb7f61a 100644 (file)
@@ -70,22 +70,25 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data
        ks_assert(res_result_realm);
 
        res_result_controllers = cJSON_GetObjectItem(res_result, "controllers");
-       ks_assert(res_result_controllers);
 
        ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) response processing\n", blade_session_id_get(bs), res_result_protocol, res_result_realm);
 
-       for (int index = 0; index < cJSON_GetArraySize(res_result_controllers); ++index) {
-               cJSON *elem = cJSON_GetArrayItem(res_result_controllers, index);
-               if (elem->type == cJSON_String) {
-                       nodeid = elem->valuestring;
+       if (res_result_controllers) {
+               for (int index = 0; index < cJSON_GetArraySize(res_result_controllers); ++index) {
+                       cJSON *elem = cJSON_GetArrayItem(res_result_controllers, index);
+                       if (elem->type == cJSON_String) {
+                               nodeid = elem->valuestring;
+                       }
                }
        }
 
        blade_session_read_unlock(bs);
 
        if (nodeid) {
+               if (g_testcon_nodeid) ks_pool_free(&g_testcon_nodeid);
                g_testcon_nodeid = ks_pstrdup(ks_pool_get(bh), nodeid);
        }
+
        ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, g_testcon_nodeid);
 
        return KS_FALSE;
@@ -384,7 +387,8 @@ void command_subscribe(blade_handle_t *bh, char *args)
 
        channels = cJSON_CreateArray();
        cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
-       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_channel_handler, NULL);
+       if (args && args[0]) cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+       blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD, "test", "mydomain.com", channels, NULL, NULL, test_channel_handler, NULL);
        cJSON_Delete(channels);
 }
 
@@ -399,7 +403,8 @@ void command_unsubscribe(blade_handle_t *bh, char *args)
 
        channels = cJSON_CreateArray();
        cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
-       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
+       if (args && args[0]) cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+       blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, "test", "mydomain.com", channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
        cJSON_Delete(channels);
 }
 
index 1955093e08e0539c5fd79e5928b04c466bd79fec..3a583ce05f39f33779dde9bdbd3e0df796b1ade1 100644 (file)
@@ -16,9 +16,13 @@ struct command_def_s {
 };
 
 void command_quit(blade_handle_t *bh, char *args);
+void command_channeladd(blade_handle_t *bh, char *args);
+void command_channelremove(blade_handle_t *bh, char *args);
 
 static const struct command_def_s command_defs[] = {
        { "quit", command_quit },
+       { "channeladd", command_channeladd },
+       { "channelremove", command_channelremove },
 
        { NULL, NULL }
 };
@@ -27,9 +31,12 @@ struct testproto_s {
        blade_handle_t *handle;
        ks_pool_t *pool;
        ks_hash_t *participants;
+       ks_hash_t *channels;
 };
 typedef struct testproto_s testproto_t;
 
+testproto_t *g_test = NULL;
+
 static void testproto_cleanup(void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
 {
        //testproto_t *test = (testproto_t *)ptr;
@@ -62,6 +69,7 @@ ks_status_t testproto_create(testproto_t **testP, blade_handle_t *bh)
        test->pool = pool;
 
        ks_hash_create(&test->participants, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
+       ks_hash_create(&test->channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
 
        ks_pool_set_cleanup(test, NULL, testproto_cleanup);
 
@@ -157,6 +165,13 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
        // authorize channels with the master for the requester
        channels = cJSON_CreateArray();
        cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+       for (ks_hash_iterator_t *it = ks_hash_first(test->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               void *key = NULL;
+               void *value = NULL;
+
+               ks_hash_this(it, &key, NULL, &value);
+               cJSON_AddItemToArray(channels, cJSON_CreateString((const char *)key));
+       }
 
        blade_handle_rpcauthorize(bh, requester_nodeid, KS_FALSE, "test", "mydomain.com", channels, NULL, NULL);
 
@@ -220,6 +235,13 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
        // deauthorize channels with the master for the requester
        channels = cJSON_CreateArray();
        cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+       for (ks_hash_iterator_t *it = ks_hash_first(test->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               void *key = NULL;
+               void *value = NULL;
+
+               ks_hash_this(it, &key, NULL, &value);
+               cJSON_AddItemToArray(channels, cJSON_CreateString((const char *)key));
+       }
 
        blade_handle_rpcauthorize(bh, requester_nodeid, KS_TRUE, "test", "mydomain.com", channels, NULL, NULL);
 
@@ -310,7 +332,6 @@ int main(int argc, char **argv)
        config_setting_t *config_blade = NULL;
        const char *cfgpath = "testcon.cfg";
        const char *autoconnect = NULL;
-       testproto_t *test = NULL;
 
        ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
 
@@ -346,7 +367,7 @@ int main(int argc, char **argv)
                return EXIT_FAILURE;
        }
 
-       testproto_create(&test, bh);
+       testproto_create(&g_test, bh);
 
        if (autoconnect) {
                blade_connection_t *bc = NULL;
@@ -366,19 +387,19 @@ int main(int argc, char **argv)
                        // @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
                        ks_sleep_ms(3000);
 
-                       blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)test);
+                       blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)g_test);
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)test);
+                       blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)g_test);
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)test);
+                       blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)g_test);
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
                        channels = cJSON_CreateArray();
                        cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
 
-                       blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, (void *)test);
+                       blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
 
                        cJSON_Delete(channels);
                }
@@ -388,7 +409,7 @@ int main(int argc, char **argv)
 
        blade_handle_destroy(&bh);
 
-       testproto_destroy(&test);
+       testproto_destroy(&g_test);
 
        config_destroy(&config);
 
@@ -459,6 +480,51 @@ void command_quit(blade_handle_t *bh, char *args)
        g_shutdown = KS_TRUE;
 }
 
+void command_channeladd(blade_handle_t *bh, char *args)
+{
+       cJSON *channels = NULL;
+
+       ks_assert(bh);
+       ks_assert(args);
+
+       if (!args[0]) {
+               ks_log(KS_LOG_INFO, "Requires channel argument");
+               return;
+       }
+
+       ks_hash_insert(g_test->channels, (void *)ks_pstrdup(g_test->pool, args), (void *)KS_TRUE);
+
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+
+       blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
+
+       cJSON_Delete(channels);
+}
+
+void command_channelremove(blade_handle_t *bh, char *args)
+{
+       cJSON *channels = NULL;
+
+       ks_assert(bh);
+       ks_assert(args);
+
+       if (!args[0]) {
+               ks_log(KS_LOG_INFO, "Requires channel argument");
+               return;
+       }
+
+
+       if (ks_hash_remove(g_test->channels, (void *)args)) {
+               channels = cJSON_CreateArray();
+               cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+
+               blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
+
+               cJSON_Delete(channels);
+       }
+}
+
 /* For Emacs:
 * Local Variables:
 * mode:c
index de8a12ccf1e13ed98427c3434436259a61965925..5b03a20f9b45ac5c0f294596c361f122f61747a4 100644 (file)
@@ -151,7 +151,7 @@ KS_DECLARE(ks_status_t) ks_pool_clear(ks_pool_t *pool);
 KS_DECLARE(ks_bool_t) ks_pool_verify(void *addr);
 
 // @todo fill in documentation
-KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr);
+inline KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr);
 
 /*
  * void *ks_pool_alloc
index 77da9d3bbefa71245fd28c1e6ef40ab10cf1be86..76346aa2e953ac40c7b964383dcd90f23a3c6900 100644 (file)
@@ -596,31 +596,26 @@ done:
 // @todo fill in documentation
 KS_DECLARE(ks_bool_t) ks_pool_verify(void *addr)
 {
-       ks_pool_prefix_t *prefix = NULL;
        if (!addr) return KS_FALSE;
-       prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
-       if (check_prefix(prefix) != KS_STATUS_SUCCESS) return KS_FALSE;
+       if (check_prefix((ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE)) != KS_STATUS_SUCCESS) return KS_FALSE;
        return KS_TRUE;
 }
 
 // @todo fill in documentation
-KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr)
+inline KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr)
 {
-       ks_pool_prefix_t *prefix = NULL;
+       ks_assert(addr);
+#ifdef DEBUG
+       ks_pool_prefix_t *prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
        ks_status_t ret = KS_STATUS_SUCCESS;
-       ks_pool_t *pool = NULL;
-
-       if (!addr) goto done;
 
-       prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
-       if (check_prefix(prefix) != KS_STATUS_SUCCESS) goto done;
-
-       if ((ret = check_pool(prefix->pool)) == KS_STATUS_SUCCESS) pool = prefix->pool;
-
-done:
+       ret = check_prefix(prefix);
        ks_assert(ret == KS_STATUS_SUCCESS);
 
-       return pool;
+       ret = check_pool(prefix->pool);
+       ks_assert(ret == KS_STATUS_SUCCESS);
+#endif
+       return ((ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE))->pool;
 }
 
 /*