]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: More work on the event channel workflow, switched callback data back to...
authorShane Bryldt <astaelan@gmail.com>
Tue, 1 Aug 2017 22:30:25 +0000 (16:30 -0600)
committerShane Bryldt <astaelan@gmail.com>
Tue, 1 Aug 2017 22:30:25 +0000 (16:30 -0600)
libs/libblade/src/blade_rpc.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_stack.c
libs/libblade/src/blade_subscriptionmgr.c
libs/libblade/src/include/blade_rpc.h
libs/libblade/src/include/blade_session.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_subscriptionmgr.h
libs/libblade/src/include/blade_types.h
libs/libblade/test/testcli.c
libs/libblade/test/testcon.c

index ca1a3712cc80d5f16f36634b92de862b9425ba6c..29510344f13af0d933b81a0a705edf5eb648578e 100644 (file)
@@ -42,7 +42,7 @@ struct blade_rpc_s {
        const char *realm;
 
        blade_rpc_request_callback_t callback;
-       cJSON *data;
+       void *data;
 };
 
 struct blade_rpc_request_s {
@@ -54,7 +54,7 @@ struct blade_rpc_request_s {
        cJSON *message;
        const char *message_id; // pulled from message for easier keying
        blade_rpc_response_callback_t callback;
-       cJSON *data;
+       void *data;
        // @todo ttl to wait for response before injecting an error response locally
 };
 
@@ -72,22 +72,22 @@ struct blade_rpc_response_s {
 
 static void blade_rpc_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
 {
-       blade_rpc_t *brpc = (blade_rpc_t *)ptr;
+       //blade_rpc_t *brpc = (blade_rpc_t *)ptr;
 
-       ks_assert(brpc);
+       //ks_assert(brpc);
 
        switch (action) {
        case KS_MPCL_ANNOUNCE:
                break;
        case KS_MPCL_TEARDOWN:
-               if (brpc->data) cJSON_Delete(brpc->data);
+               // @todo delete data if present, requires update to ks_pool for self tracking the pool in allocation header
                break;
        case KS_MPCL_DESTROY:
                break;
        }
 }
 
-KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data)
 {
        blade_rpc_t *brpc = NULL;
        ks_pool_t *pool = NULL;
@@ -170,7 +170,7 @@ KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brp
        return brpc->callback;
 }
 
-KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc)
+KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc)
 {
        ks_assert(brpc);
 
@@ -190,7 +190,7 @@ static void blade_rpc_request_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_
        case KS_MPCL_TEARDOWN:
                ks_pool_free(brpcreq->pool, (void **)&brpcreq->session_id);
                cJSON_Delete(brpcreq->message);
-               if (brpcreq->data) cJSON_Delete(brpcreq->data);
+               // @todo delete data if present, requires update to ks_pool for self tracking the pool in allocation header
                break;
        case KS_MPCL_DESTROY:
                break;
@@ -203,7 +203,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
                                                                                                         const char *session_id,
                                                                                                         cJSON *json,
                                                                                                         blade_rpc_response_callback_t callback,
-                                                                                                        cJSON *data)
+                                                                                                        void *data)
 {
        blade_rpc_request_t *brpcreq = NULL;
 
@@ -278,7 +278,7 @@ KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_r
        return brpcreq->callback;
 }
 
-KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
+KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
 {
        ks_assert(brpcreq);
        return brpcreq->data;
index aecd52c273d144d167daf9cd2dc20015bf16ae7f..92fb4144b142bcb71f8155f2ba96fd5ec1bf7ff9 100644 (file)
@@ -582,7 +582,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
 }
 
 
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data)
 {
        blade_rpc_request_t *brpcreq = NULL;
        const char *method = NULL;
index 0f153e951f5ed49a1c8e7940954af9b72df1296d..c2538b5470471df3838c7133edac376a3410d296 100644 (file)
@@ -47,14 +47,14 @@ struct blade_handle_s {
        blade_sessionmgr_t *sessionmgr;
 };
 
-ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
-ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
-ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
-ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
-ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
-ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
-ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data);
-ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
+ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, void *data);
+ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
 
 
 static void blade_handle_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
@@ -352,8 +352,20 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
 // @todo all higher level errors should be handled by each of the calls internally so that a normal result response can be sent with an error block inside the result
 // which is important for implementation of blade.execute where errors can be relayed back to the requester properly
 
+typedef struct blade_rpcsubscribe_data_s blade_rpcsubscribe_data_t;
+struct blade_rpcsubscribe_data_s {
+       ks_pool_t *pool;
+       blade_rpc_response_callback_t original_callback;
+       void *original_data;
+       blade_rpc_request_callback_t channel_callback;
+       void *channel_data;
+       const char *relayed_messageid;
+};
+
+ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
+
 // blade.register request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -390,7 +402,7 @@ done:
 }
 
 // blade.register request handler
-ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -454,7 +466,7 @@ done:
 
 
 // blade.publish request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -514,7 +526,7 @@ done:
 }
 
 // blade.publish request handler
-ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -644,7 +656,7 @@ done:
 
 
 // blade.authorize request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -704,7 +716,7 @@ done:
 }
 
 // blade.authorize request handler
-ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -830,8 +842,6 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON
                        if (remove) {
                                if (!res_result_unauthorized_channels) res_result_unauthorized_channels = cJSON_CreateArray();
                                cJSON_AddItemToArray(res_result_unauthorized_channels, cJSON_CreateString(channel->valuestring));
-                               // @todo unauthorizing channels should force a subscribe remove request for the target if they are subscribed, to prevent further events from reaching the target
-                               // this will require the master node to invoke the subscription removal as opposed to the target who normally invokes subscribe
                        } else {
                                if (!res_result_authorized_channels) res_result_authorized_channels = cJSON_CreateArray();
                                cJSON_AddItemToArray(res_result_authorized_channels, cJSON_CreateString(channel->valuestring));
@@ -854,6 +864,10 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
        blade_session_send(bs, res, NULL, NULL);
 
+       if (res_result_unauthorized_channels) {
+               blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, NULL, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
+       }
+
 done:
 
        if (res) cJSON_Delete(res);
@@ -867,7 +881,7 @@ done:
 // @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located
 // to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting
 // every protocol update to everyone which may actually be a better way than an explicit locate request
-KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -918,7 +932,7 @@ done:
 }
 
 // blade.locate request handler
-ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -1021,7 +1035,7 @@ done:
 
 
 // blade.execute request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -1074,7 +1088,7 @@ done:
 }
 
 // blade.execute request handler
-ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        ks_bool_t ret = KS_FALSE;
        blade_handle_t *bh = NULL;
@@ -1303,20 +1317,40 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ
 }
 
 
+static void blade_rpcsubscribe_data_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_rpcsubscribe_data_t *brpcsd = (blade_rpcsubscribe_data_t *)ptr;
+
+       ks_assert(brpcsd);
+
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               if (brpcsd->relayed_messageid) ks_pool_free(brpcsd->pool, &brpcsd->relayed_messageid);
+               break;
+       case KS_MPCL_DESTROY:
+               break;
+       }
+}
+
 // blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
+       ks_pool_t *pool = NULL;
        blade_session_t *bs = NULL;
        const char *localid = NULL;
-       blade_subscription_t *bsub = NULL;
-       cJSON *temp_data = NULL;
+       blade_rpcsubscribe_data_t *temp_data = NULL;
 
        ks_assert(bh);
        ks_assert(protocol);
        ks_assert(realm);
        ks_assert(subscribe_channels || unsubscribe_channels);
 
+       pool = blade_handle_pool_get(bh);
+       ks_assert(pool);
+
        // @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
        if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
                ret = KS_STATUS_DISCONNECTED;
@@ -1326,21 +1360,16 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
        blade_upstreammgr_localid_copy(bh->upstreammgr, bh->pool, &localid);
        ks_assert(localid);
 
-       if (unsubscribe_channels) {
-               cJSON *channel = NULL;
-               cJSON_ArrayForEach(channel, unsubscribe_channels) {
-                       blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, protocol, realm, channel->valuestring, localid);
-               }
-       }
-
-       temp_data = cJSON_CreateObject();
+       // @note since this is allocated in the handle's pool, if the handle is shutdown during a pending request, then the data
+       // memory will be cleaned up with the handle, otherwise should be cleaned up in the response callback
+       temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t));
+       temp_data->pool = pool;
+       temp_data->original_callback = callback;
+       temp_data->original_data = data;
+       temp_data->channel_callback = channel_callback;
+       temp_data->channel_data = channel_data;
+       ks_pool_set_cleanup(pool, temp_data, NULL, blade_rpcsubscribe_data_cleanup);
        
-       if (callback) cJSON_AddItemToObject(temp_data, "callback", cJSON_CreatePtr((uintptr_t)callback));
-       if (data) cJSON_AddItemToObject(temp_data, "data", data);
-
-       if (channel_callback) cJSON_AddItemToObject(temp_data, "channel-callback", cJSON_CreatePtr((uintptr_t)channel_callback));
-       if (channel_data) cJSON_AddItemToObject(temp_data, "channel-data", channel_data);
-
        ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
 
        ks_pool_free(bh->pool, &localid);
@@ -1351,7 +1380,7 @@ done:
        return ret;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data)
+ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -1384,6 +1413,13 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const
        pool = blade_handle_pool_get(bh);
        ks_assert(pool);
 
+       if (unsubscribe_channels) {
+               cJSON *channel = NULL;
+               cJSON_ArrayForEach(channel, unsubscribe_channels) {
+                       blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, protocol, realm, channel->valuestring, subscriber);
+               }
+       }
+
        blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
 
        cJSON_AddStringToObject(req_params, "protocol", protocol);
@@ -1406,7 +1442,7 @@ done:
 }
 
 // blade.subscribe request handler
-ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -1491,16 +1527,18 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON
 
        ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
 
-       if (req_params_unsubscribe_channels) {
-               cJSON *channel = NULL;
-               cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
-                       blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
-               }
-       }
-
        masterlocal = blade_upstreammgr_masterlocal(blade_handle_upstreammgr_get(bh));
 
        if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
+               // @note This is normally handled by blade_handle_rpcsubscribe_raw() to ensure authorization removals are processed during the request path
+               // including on the node they start on, whether that is the master or the subscriber
+               if (req_params_unsubscribe_channels) {
+                       cJSON *channel = NULL;
+                       cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
+                               blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
+                       }
+               }
+
                blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
 
                cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
@@ -1532,13 +1570,12 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON
                // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
                blade_session_send(bs, res, NULL, NULL);
        } else {
-               cJSON *temp_data = cJSON_CreateObject();
-
-               // @note track this so that when this local node gets a response to this propagated request we know what messageid to propagate the response with
-               cJSON_AddStringToObject(temp_data, "messageid", blade_rpc_request_messageid_get(brpcreq));
+               blade_rpcsubscribe_data_t *temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t));
+               temp_data->pool = pool;
+               temp_data->relayed_messageid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
+               ks_pool_set_cleanup(pool, temp_data, NULL, blade_rpcsubscribe_data_cleanup);
 
                blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
-               cJSON_Delete(temp_data);
        }
 
 done:
@@ -1550,16 +1587,13 @@ done:
 }
 
 // blade.subscribe response handler
-ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        ks_bool_t ret = KS_FALSE;
+       blade_rpc_request_t *brpcreq = NULL;
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
-       blade_rpc_response_callback_t original_callback = NULL;
-       cJSON *original_data = NULL;
-       blade_rpc_request_callback_t channel_callback = NULL;
-       cJSON *channel_data = NULL;
-       const char *messageid = NULL;
+       blade_rpcsubscribe_data_t *temp_data = NULL;
        cJSON *res = NULL;
        cJSON *res_result = NULL;
        const char *res_result_protocol = NULL;
@@ -1573,19 +1607,15 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
        ks_assert(brpcres);
        ks_assert(data);
 
+       brpcreq = blade_rpc_response_request_get(brpcres);
+
        bh = blade_rpc_response_handle_get(brpcres);
        ks_assert(bh);
 
        bs = blade_sessionmgr_session_lookup(bh->sessionmgr, blade_rpc_response_sessionid_get(brpcres));
        ks_assert(bs);
 
-       original_data = cJSON_GetObjectItem(data, "data");
-       original_callback = (blade_rpc_response_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "callback");
-       channel_data = cJSON_GetObjectItem(data, "channel-data");
-       channel_callback = (blade_rpc_request_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "channel-callback");
-
-       // @note when messageid exists, it means this message is only intended to be examined and relayed, the local node is not the subscriber
-       messageid = cJSON_GetObjectCstr(data, "messageid");
+       temp_data = (blade_rpcsubscribe_data_t *)data;
 
        res = blade_rpc_response_message_get(brpcres);
        ks_assert(res);
@@ -1631,15 +1661,15 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
                cJSON_ArrayForEach(channel, res_result_subscribe_channels) {
                        blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, res_result_protocol, res_result_realm, channel->valuestring, res_result_subscriber_nodeid);
                        // @note these will only get assigned on the last response, received by the subscriber
-                       if (channel_callback) blade_subscription_callback_set(bsub, channel_callback);
-                       if (channel_data) blade_subscription_callback_data_set(bsub, channel_data);
+                       if (temp_data && temp_data->channel_callback) blade_subscription_callback_set(bsub, temp_data->channel_callback);
+                       if (temp_data && temp_data->channel_data) blade_subscription_callback_data_set(bsub, temp_data->channel_data);
                }
        }
 
        // @note this will only happen on the last response, received by the subscriber
-       if (original_callback) ret = original_callback(brpcres, original_data);
+       if (temp_data && temp_data->original_callback) ret = temp_data->original_callback(brpcres, temp_data->original_data);
 
-       if (messageid) {
+       if (temp_data && temp_data->relayed_messageid) {
                blade_session_t *relay = NULL;
                if (downstream) {
                        if (!(relay = blade_upstreammgr_session_get(bh->upstreammgr))) {
@@ -1651,7 +1681,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
                        }
                }
 
-               blade_rpc_response_raw_create(&res, &res_result, messageid);
+               blade_rpc_response_raw_create(&res, &res_result, temp_data->relayed_messageid);
 
                cJSON_AddStringToObject(res_result, "protocol", res_result_protocol);
                cJSON_AddStringToObject(res_result, "realm", res_result_realm);
@@ -1668,13 +1698,14 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
        }
 
 done:
+       if (temp_data) ks_pool_free(temp_data->pool, &temp_data);
        blade_session_read_unlock(bs);
        return ret;
 }
 
 
 // blade.broadcast request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
 
@@ -1693,7 +1724,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
 }
 
 // blade.broadcast request handler
-ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        ks_bool_t ret = KS_FALSE;
        blade_handle_t *bh = NULL;
index ea40e5c11418340bde47ae211fa865b680226836..fc34b32d71a922a742f26e9cb2b61d02f787b5da 100644 (file)
@@ -292,7 +292,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con
        }
 }
 
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
 {
        const char *bsub_key = NULL;
        blade_subscription_t *bsub = NULL;
index f68b09f8c1328e8bcf0d63540f5782dd521fd36f..a966088af4cc4d21a921dc1d6a0aa7e10d5b819b 100644 (file)
 #include <blade.h>
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data);
 KS_DECLARE(ks_status_t) blade_rpc_destroy(blade_rpc_t **brpcP);
 KS_DECLARE(blade_handle_t *) blade_rpc_handle_get(blade_rpc_t *brpc);
 KS_DECLARE(const char *) blade_rpc_method_get(blade_rpc_t *brpc);
 KS_DECLARE(const char *) blade_rpc_protocol_get(blade_rpc_t *brpc);
 KS_DECLARE(const char *) blade_rpc_realm_get(blade_rpc_t *brpc);
 KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brpc);
-KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc);
+KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc);
 
 KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
                                                                                                         blade_handle_t *bh,
@@ -51,7 +51,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
                                                                                                         const char *session_id,
                                                                                                         cJSON *json,
                                                                                                         blade_rpc_response_callback_t callback,
-                                                                                                        cJSON *data);
+                                                                                                        void *data);
 KS_DECLARE(ks_status_t) blade_rpc_request_destroy(blade_rpc_request_t **brpcreqP);
 KS_DECLARE(ks_status_t) blade_rpc_request_duplicate(blade_rpc_request_t **brpcreqP, blade_rpc_request_t *brpcreq);
 KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *brpcreq);
@@ -59,7 +59,7 @@ KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *br
 KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
-KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
+KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
 
 KS_DECLARE(ks_status_t) blade_rpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);
 
index 710464e8598d54daf594b4f2ed00245bcd39c37e..b84ac257ad0495adaff748abe8aed7a8b63018eb 100644 (file)
@@ -62,7 +62,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
 KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
 KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data);
 KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
 KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
index 80f6671053e48786b3e208f3afb4b17cfa42e2b2..211a2d1d8ae84ee8334023d3f4b3007e0c01de0e 100644 (file)
@@ -59,25 +59,24 @@ KS_DECLARE(blade_sessionmgr_t *) blade_handle_sessionmgr_get(blade_handle_t *bh)
 
 KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, void *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
 KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
 KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data);
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
 KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
 
 KS_END_EXTERN_C
index 9b4b0bda2c7d5bd4d8efd1526019e338dc3ecfb1..f903d23012c6778f36c814da1fce3179761f6f71 100644 (file)
@@ -43,7 +43,7 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
 KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
 KS_END_EXTERN_C
 
 #endif
index df7f7ae8c120bceb019316bbad0f50cd2781cf58..2c1eee0396e64453a023c6fe48f68deddf370d63 100644 (file)
@@ -62,8 +62,8 @@ typedef struct blade_connectionmgr_s blade_connectionmgr_t;
 typedef struct blade_sessionmgr_s blade_sessionmgr_t;
 typedef struct blade_session_callback_data_s blade_session_callback_data_t;
 
-typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, cJSON *data);
-typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, cJSON *data);
+typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);
+typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, void *data);
 
 
 typedef enum {
index be09279ee76a91bde53148f8ac513101230c131c..d618b8c823f21e61805ebaa940bf1a87be4abf0f 100644 (file)
@@ -18,6 +18,8 @@ struct command_def_s {
 void command_quit(blade_handle_t *bh, char *args);
 void command_locate(blade_handle_t *bh, char *args);
 void command_join(blade_handle_t *bh, char *args);
+void command_subscribe(blade_handle_t *bh, char *args);
+void command_unsubscribe(blade_handle_t *bh, char *args);
 void command_leave(blade_handle_t *bh, char *args);
 void command_talk(blade_handle_t *bh, char *args);
 
@@ -25,6 +27,8 @@ static const struct command_def_s command_defs[] = {
        { "quit", command_quit },
        { "locate", command_locate },
        { "join", command_join },
+       { "subscribe", command_subscribe },
+       { "unsubscribe", command_unsubscribe },
        { "leave", command_leave },
        { "talk", command_talk },
 
@@ -33,7 +37,7 @@ static const struct command_def_s command_defs[] = {
 
 const char *g_testcon_nodeid = NULL;
 
-ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -87,7 +91,7 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *dat
        return KS_FALSE;
 }
 
-ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -111,7 +115,7 @@ ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
        return KS_FALSE;
 }
 
-ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -135,7 +139,7 @@ ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data
        return KS_FALSE;
 }
 
-ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -159,7 +163,35 @@ ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
        return KS_FALSE;
 }
 
-ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t test_subscribe_response_handler(blade_rpc_response_t *brpcres, void *data)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       cJSON *res = NULL;
+       cJSON *res_result = NULL;
+
+       ks_assert(brpcres);
+
+       bh = blade_rpc_response_handle_get(brpcres);
+       ks_assert(bh);
+
+       bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_response_sessionid_get(brpcres));
+       ks_assert(bs);
+
+       res = blade_rpc_response_message_get(brpcres);
+       ks_assert(res);
+
+       res_result = cJSON_GetObjectItem(res, "result");
+       ks_assert(res_result);
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) blade.subscribe response processing\n", blade_session_id_get(bs));
+
+       blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
+
+ks_bool_t test_channel_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        blade_handle_t *bh = NULL;
        blade_session_t *bs = NULL;
@@ -177,7 +209,7 @@ ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
        params = blade_rpcbroadcast_request_params_get(brpcreq);
        ks_assert(params);
 
-       ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast processing\n", blade_session_id_get(bs));
+       ks_log(KS_LOG_DEBUG, "Session (%s) test channel event processing\n", blade_session_id_get(bs));
 
        blade_session_read_unlock(bs);
 
@@ -327,7 +359,6 @@ void command_locate(blade_handle_t *bh, char *args)
 void command_join(blade_handle_t *bh, char *args)
 {
        cJSON *params = NULL;
-       cJSON *channels = NULL;
 
        ks_assert(bh);
        ks_assert(args);
@@ -337,14 +368,38 @@ void command_join(blade_handle_t *bh, char *args)
                return;
        }
 
-
        params = cJSON_CreateObject();
        blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", "mydomain.com", params, test_join_response_handler, NULL);
        cJSON_Delete(params);
+}
+
+void command_subscribe(blade_handle_t *bh, char *args)
+{
+       cJSON *channels = NULL;
+
+       if (!g_testcon_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Protocol controller has not been located\n");
+               return;
+       }
+
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_channel_handler, NULL);
+       cJSON_Delete(channels);
+}
+
+void command_unsubscribe(blade_handle_t *bh, char *args)
+{
+       cJSON *channels = NULL;
+
+       if (!g_testcon_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Protocol controller has not been located\n");
+               return;
+       }
 
        channels = cJSON_CreateArray();
-       cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
-       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_broadcast_handler, NULL);
+       cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
        cJSON_Delete(channels);
 }
 
@@ -364,11 +419,6 @@ void command_leave(blade_handle_t *bh, char *args)
        params = cJSON_CreateObject();
        blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", "mydomain.com", params, test_leave_response_handler, NULL);
        cJSON_Delete(params);
-
-       channels = cJSON_CreateArray();
-       cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
-       blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, NULL, NULL, NULL, NULL);
-       cJSON_Delete(channels);
 }
 
 void command_talk(blade_handle_t *bh, char *args)
index 52eab4069602ebf7a47650679c5fda852c0318d6..ec3a50c70599def29619d3d246ea6aea7fe5debe 100644 (file)
@@ -88,7 +88,7 @@ ks_status_t testproto_destroy(testproto_t **testP)
        return KS_STATUS_SUCCESS;
 }
 
-ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
+ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *data)
 {
        //testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -97,7 +97,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *da
        ks_assert(brpcres);
        ks_assert(data);
 
-       //test = (testproto_t *)cJSON_GetPtrValue(data);
+       //test = (testproto_t *)data;
 
        bh = blade_rpc_response_handle_get(brpcres);
        ks_assert(bh);
@@ -112,7 +112,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *da
        return KS_FALSE;
 }
 
-ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -126,7 +126,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
        ks_assert(brpcreq);
        ks_assert(data);
 
-       test = (testproto_t *)cJSON_GetPtrValue(data);
+       test = (testproto_t *)data;
 
        bh = blade_rpc_request_handle_get(brpcreq);
        ks_assert(bh);
@@ -183,7 +183,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
        return KS_FALSE;
 }
 
-ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -191,12 +191,13 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
        const char *requester_nodeid = NULL;
        //const char *key = NULL;
        cJSON *params = NULL;
+       cJSON *channels = NULL;
        cJSON *result = NULL;
 
        ks_assert(brpcreq);
        ks_assert(data);
 
-       test = (testproto_t *)cJSON_GetPtrValue(data);
+       test = (testproto_t *)data;
 
        bh = blade_rpc_request_handle_get(brpcreq);
        ks_assert(bh);
@@ -216,22 +217,36 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
        ks_hash_remove(test->participants, (void *)requester_nodeid);
        ks_hash_write_unlock(test->participants);
 
-       blade_session_read_unlock(bs);
+       // deauthorize channels with the master for the requester
+       channels = cJSON_CreateArray();
+       cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+
+       blade_handle_rpcauthorize(bh, requester_nodeid, KS_TRUE, "test", "mydomain.com", channels, NULL, NULL);
+
+       cJSON_Delete(channels);
 
+       // send rpcexecute response to the requester
        result = cJSON_CreateObject();
 
        blade_rpcexecute_response_send(brpcreq, result);
 
+       cJSON_Delete(result);
+
+       blade_session_read_unlock(bs);
+
+       // broadcast to authorized nodes that have subscribed, that the requester has left
        params = cJSON_CreateObject();
 
        cJSON_AddStringToObject(params, "leaver-nodeid", requester_nodeid);
 
        blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "leave", params, NULL, NULL);
 
+       cJSON_Delete(params);
+
        return KS_FALSE;
 }
 
-ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
+ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
 {
        //testproto_t *test = NULL;
        blade_handle_t *bh = NULL;
@@ -244,7 +259,7 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
        ks_assert(brpcreq);
        ks_assert(data);
 
-       //test = (testproto_t *)cJSON_GetPtrValue(data);
+       //test = (testproto_t *)data;
 
        bh = blade_rpc_request_handle_get(brpcreq);
        ks_assert(bh);
@@ -263,12 +278,16 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 
        ks_log(KS_LOG_DEBUG, "Session (%s) test.talk (%s) request processing\n", blade_session_id_get(bs), requester_nodeid);
 
-       blade_session_read_unlock(bs);
-
+       // send rpcexecute response to the requester
        result = cJSON_CreateObject();
 
        blade_rpcexecute_response_send(brpcreq, result);
 
+       cJSON_Delete(result);
+
+       blade_session_read_unlock(bs);
+
+       // broadcast to authorized nodes that have subscribed, that the requester has said something
        params = cJSON_CreateObject();
 
        cJSON_AddStringToObject(params, "text", text);
@@ -277,6 +296,8 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
 
        blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "talk", params, NULL, NULL);
 
+       cJSON_Delete(params);
+
        return KS_FALSE;
 }
 
@@ -345,19 +366,21 @@ int main(int argc, char **argv)
                        // @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
                        ks_sleep_ms(3000);
 
-                       blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, cJSON_CreatePtr((uintptr_t)test));
+                       blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)test);
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, cJSON_CreatePtr((uintptr_t)test));
+                       blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)test);
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
-                       blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, cJSON_CreatePtr((uintptr_t)test));
+                       blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)test);
                        blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
 
                        channels = cJSON_CreateArray();
                        cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
 
-                       blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, cJSON_CreatePtr((uintptr_t)test));
+                       blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, (void *)test);
+
+                       cJSON_Delete(channels);
                }
        }