]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10739: [libblade] Added TTL to request which now produces an error response when...
authorShane Bryldt <astaelan@gmail.com>
Thu, 19 Oct 2017 14:44:31 +0000 (08:44 -0600)
committerShane Bryldt <astaelan@gmail.com>
Thu, 19 Oct 2017 14:44:31 +0000 (08:44 -0600)
12 files changed:
libs/libblade/src/blade_rpc.c
libs/libblade/src/blade_rpcmgr.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_sessionmgr.c
libs/libblade/src/blade_stack.c
libs/libblade/src/blade_subscriptionmgr.c
libs/libblade/src/include/blade_rpc.h
libs/libblade/src/include/blade_rpcmgr.h
libs/libblade/src/include/blade_session.h
libs/libblade/src/include/blade_stack.h
libs/libblade/test/testcli.c
libs/libblade/test/testcon.c

index 9e92fbc01617b4f9cfcddb7285ecadd2ed4de6e3..cfc0e78ce508cc1e586eca0fae4b3b549921670b 100644 (file)
@@ -50,9 +50,11 @@ struct blade_rpc_request_s {
 
        cJSON *message;
        const char *message_id; // pulled from message for easier keying
+
+       ks_time_t ttl;
+
        blade_rpc_response_callback_t callback;
        void *data;
-       // @todo ttl to wait for response before injecting an error response locally
 };
 
 struct blade_rpc_response_s {
@@ -253,6 +255,22 @@ KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *br
        return brpcreq->message_id;
 }
 
+KS_DECLARE(ks_status_t) blade_rpc_request_ttl_set(blade_rpc_request_t *brpcreq, ks_time_t ttl)
+{
+       ks_assert(brpcreq);
+
+       brpcreq->ttl = ks_time_now() + (ttl * KS_USEC_PER_SEC);
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_bool_t) blade_rpc_request_expired(blade_rpc_request_t *brpcreq)
+{
+       ks_assert(brpcreq);
+
+       return brpcreq->ttl <= ks_time_now();
+}
+
 KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq)
 {
        ks_assert(brpcreq);
index a8014388beca31450e1a8a1656d2ddb81e8bb9b5..21b91c3335519936861c305e0e6d6edadfdd98b3 100644 (file)
@@ -39,7 +39,9 @@ struct blade_rpcmgr_s {
        ks_hash_t *corerpcs; // method, blade_rpc_t*
        ks_hash_t *protocolrpcs; // method, blade_rpc_t*
 
-       ks_hash_t *requests; // id, KS_TRUE
+       ks_hash_t *requests; // id, blade_rpc_request_t*
+       ks_time_t requests_ttlcheck;
+       ks_mutex_t* requests_ttlcheck_lock;
 };
 
 
@@ -100,6 +102,9 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_create(blade_rpcmgr_t **brpcmgrP, blade_han
        ks_hash_create(&brpcmgr->requests, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
        ks_assert(brpcmgr->requests);
 
+       ks_mutex_create(&brpcmgr->requests_ttlcheck_lock, KS_MUTEX_FLAG_NON_RECURSIVE, pool);
+       ks_assert(brpcmgr->requests_ttlcheck_lock);
+
        ks_pool_set_cleanup(brpcmgr, NULL, blade_rpcmgr_cleanup);
 
        *brpcmgrP = brpcmgr;
@@ -218,7 +223,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_add(blade_rpcmgr_t *brpcmgr, bl
 
        ks_log(KS_LOG_DEBUG, "ProtocolRPC Added: %s\n", key);
 
-       return KS_STATUS_SUCCESS;
+return KS_STATUS_SUCCESS;
 
 }
 
@@ -294,6 +299,59 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, bla
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(ks_status_t) blade_rpcmgr_request_timeouts(blade_rpcmgr_t *brpcmgr)
+{
+       blade_session_t *loopback = NULL;
+
+       ks_assert(brpcmgr);
+
+       // All this stuff is to ensure that the requests hash is not locked up when it does not need to be,
+       // and this will also ensure that sessions will not lock up if any other session is trying to deal
+       // with timeouts already
+       if (ks_mutex_trylock(brpcmgr->requests_ttlcheck_lock) != KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
+
+       if (brpcmgr->requests_ttlcheck > ks_time_now()) {
+               ks_mutex_unlock(brpcmgr->requests_ttlcheck_lock);
+               return KS_STATUS_SUCCESS;
+       }
+
+       ks_hash_write_lock(brpcmgr->requests);
+
+       // Give a one second delay between timeout checking
+       brpcmgr->requests_ttlcheck = ks_time_now() + KS_USEC_PER_SEC;
+
+       ks_mutex_unlock(brpcmgr->requests_ttlcheck_lock);
+
+       // Now find all the expired requests and send out loopback error responses, which will invoke request removal when received and processed
+       for (ks_hash_iterator_t *it = ks_hash_first(brpcmgr->requests, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const char *key = NULL;
+               blade_rpc_request_t *value = NULL;
+               cJSON *res = NULL;
+
+               ks_hash_this(it, (const void **)&key, NULL, (void **)&value);
+
+               if (!blade_rpc_request_expired(value)) continue;
+
+               if (!loopback) loopback = blade_sessionmgr_loopback_lookup(blade_handle_sessionmgr_get(brpcmgr->handle));
+
+               ks_log(KS_LOG_DEBUG, "Request (%s) TTL timeout\n", key);
+
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(value), -32000, "Request timed out");
+               // @todo may want to include requester-nodeid and responder-nodeid into the error block when they are present in the original request
+               // even though a response or error without them is treated as locally processed, it may be useful to know who the request was attempted with
+               // when multiple options are available such as a blade.execute where the protocol has multiple possible controllers to pick from
+               blade_session_send(loopback, res, 0, NULL, NULL);
+
+               cJSON_Delete(res);
+       }
+
+       if (loopback) blade_session_read_unlock(loopback);
+
+       ks_hash_write_unlock(brpcmgr->requests);
+
+       return KS_STATUS_SUCCESS;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index d449ff725b81069e154fa90b2388a6d0bf9d610c..d3dd6be117e9f36764f3e160ed3b7173cb06317b 100644 (file)
@@ -412,7 +412,8 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j
     ks_assert(bs);
     ks_assert(json);
 
-       if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) ret = blade_session_receiving_push(bs, json);
+       if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK)
+               ret = blade_session_receiving_push(bs, json);
        else {
                json_copy = cJSON_Duplicate(json, 1);
                if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond);
@@ -477,8 +478,6 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
                                // we can start stuffing any messages queued for output on the session straight to the connection right away, may need to only
                                // do this when in session ready state but there may be implications of other states sending messages through the session
                                while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
-                                       // @todo short-circuit with blade_session_receiving_push on the same session if the message has responder-nodeid == requester-nodeid == local_nodeid
-                                       // which would allow a system to send messages to itself, such as calling a protocolrpc immediately without bouncing upstream first
                                        blade_connection_sending_push(bc, json);
                                        cJSON_Delete(json);
                                }
@@ -561,11 +560,13 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
                cJSON_Delete(json);
        }
 
+       blade_rpcmgr_request_timeouts(blade_handle_rpcmgr_get(blade_session_handle_get(bs)));
+
        return KS_STATUS_SUCCESS;
 }
 
 
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data)
 {
        blade_rpc_request_t *brpcreq = NULL;
        const char *method = NULL;
@@ -580,22 +581,19 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla
        ks_assert(id);
 
        if (method) {
-               // @note This is scenario 1
-               // 1) Sending a request (client: method caller or consumer)
                ks_log(KS_LOG_DEBUG, "Session (%s) sending request (%s) for %s\n", bs->id, id, method);
 
                blade_rpc_request_create(&brpcreq, bs->handle, ks_pool_get(bs->handle), bs->id, json, callback, data);
                ks_assert(brpcreq);
 
-               // @todo set request TTL and figure out when requests are checked for expiration (separate thread in the handle?)
+               if (ttl <= 0) ttl = 10;
+               blade_rpc_request_ttl_set(brpcreq, ttl);
+
                blade_rpcmgr_request_add(blade_handle_rpcmgr_get(bs->handle), brpcreq);
        } else {
-               // @note This is scenario 3
-               // 3) Sending a response or error (server: method callee or provider)
                ks_log(KS_LOG_DEBUG, "Session (%s) sending response (%s)\n", bs->id, id);
        }
 
-       //blade_session_sending_push(bs, json);
        if (!bs->connection) {
                blade_session_sending_push(bs, json);
        } else {
@@ -653,13 +651,15 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                blade_rpc_t *brpc = NULL;
                blade_rpc_request_callback_t callback = NULL;
                cJSON *params = NULL;
+               const char *params_requester_nodeid = NULL;
+               const char *params_responder_nodeid = NULL;
 
                ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method);
 
                params = cJSON_GetObjectItem(json, "params");
                if (params) {
-                       const char *params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid");
-                       const char *params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid");
+                       params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid");
+                       params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid");
                        if (params_requester_nodeid && params_responder_nodeid && !blade_routemgr_local_check(blade_handle_routemgr_get(bh), params_responder_nodeid)) {
                                // not meant for local processing, continue with standard unicast routing for requests
                                blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid);
@@ -677,7 +677,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                                cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid);
                                                cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
 
-                                               blade_session_send(bs, res, NULL, NULL);
+                                               blade_session_send(bs, res, 0, NULL, NULL);
                                                return KS_STATUS_DISCONNECTED;
                                        }
                                }
@@ -687,7 +687,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                }
                                
                                ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router));
-                               blade_session_send(bs_router, json, NULL, NULL);
+                               blade_session_send(bs_router, json, 0, NULL, NULL);
                                blade_session_read_unlock(bs_router);
 
                                // @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which
@@ -704,8 +704,18 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                brpc = blade_rpcmgr_corerpc_lookup(blade_handle_rpcmgr_get(bs->handle), method);
 
                if (!brpc) {
+                       cJSON *res = NULL;
+                       cJSON *res_error = NULL;
+
                        ks_log(KS_LOG_DEBUG, "Received unknown rpc method %s\n", method);
-                       // @todo send error response, code = -32601 (method not found)
+                       blade_rpc_error_raw_create(&res, &res_error, id, -32601, "RPC method not found");
+
+                       // needed in case this error must propagate further than the session which sent it
+                       if (params_requester_nodeid) cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid);
+                       if (params_responder_nodeid) cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid);
+
+                       blade_session_send(bs, res, 0, NULL, NULL);
+
                        return KS_STATUS_FAIL;
                }
                callback = blade_rpc_callback_get(brpc);
@@ -750,7 +760,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                }
 
                                ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router));
-                               blade_session_send(bs_router, json, NULL, NULL);
+                               blade_session_send(bs_router, json, 0, NULL, NULL);
                                blade_session_read_unlock(bs_router);
 
                                // @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a
index c4b2eefadbb5518ea6d49abce95cdd936ba18e04..6faa01e5ebe123537f028c7fbd7617f2ce29b2e4 100644 (file)
@@ -149,6 +149,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, conf
 
        ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback));
 
+       blade_sessionmgr_session_add(bsmgr, bsmgr->loopback);
+
+       blade_session_state_set(bsmgr->loopback, BLADE_SESSION_STATE_STARTUP);
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -158,10 +162,10 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr)
 
        ks_assert(bsmgr);
 
-       if (bsmgr->loopback) {
-               blade_session_hangup(bsmgr->loopback);
-               ks_sleep_ms(100);
-       }
+       //if (bsmgr->loopback) {
+       //      blade_session_hangup(bsmgr->loopback);
+       //      ks_sleep_ms(100);
+       //}
 
        ks_hash_read_lock(bsmgr->sessions);
        for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
index 70e1943deb7d74803254f26991c807677ce197d3..334e24a3af1125ed8aac451bde7b873e40cc82db 100644 (file)
@@ -380,7 +380,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcroute(blade_handle_t *bh, const char *no
 
        ks_log(KS_LOG_DEBUG, "Session (%s) route request (%s %s) started\n", blade_session_id_get(bs), remove ? "removing" : "adding", nodeid);
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, 0, callback, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -417,7 +417,7 @@ ks_bool_t blade_rpcroute_request_handler(blade_rpc_request_t *brpcreq, void *dat
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) route request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -425,7 +425,7 @@ ks_bool_t blade_rpcroute_request_handler(blade_rpc_request_t *brpcreq, void *dat
        if (!req_params_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) route request missing 'nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
        req_params_remove = cJSON_GetObjectItem(req_params, "remove");
@@ -442,7 +442,7 @@ ks_bool_t blade_rpcroute_request_handler(blade_rpc_request_t *brpcreq, void *dat
        }
 
        blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
-       blade_session_send(bs, res, NULL, NULL);
+       blade_session_send(bs, res, 0, NULL, NULL);
 
 done:
 
@@ -530,7 +530,7 @@ ks_status_t blade_handle_rpcregister_raw(blade_handle_t *bh, const char *identit
        cJSON_AddStringToObject(req_params, "identity", identity);
        cJSON_AddStringToObject(req_params, "nodeid", nodeid);
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, 0, callback, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -571,7 +571,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -579,7 +579,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *
        if (!req_params_identity) {
                ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'identity'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params identity");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -587,7 +587,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *
        if (blade_identity_parse(identity, req_params_identity) != KS_STATUS_SUCCESS) {
                ks_log(KS_LOG_DEBUG, "Session (%s) register request invalid 'identity'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params identity");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -595,7 +595,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *
        if (!req_params_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -612,7 +612,7 @@ ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *
                cJSON_AddStringToObject(res_result, "nodeid", req_params_nodeid);
 
                // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
        } else {
                blade_rpcregister_data_t *temp_data = (blade_rpcregister_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcregister_data_t));
                temp_data->original_requestid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
@@ -696,7 +696,7 @@ ks_bool_t blade_rpcregister_response_handler(blade_rpc_response_t *brpcres, void
                cJSON_AddStringToObject(res_result, "identity", res_result_identity);
                cJSON_AddStringToObject(res_result, "nodeid", res_result_nodeid);
 
-               blade_session_send(relay, res, NULL, NULL);
+               blade_session_send(relay, res, 0, NULL, NULL);
 
                cJSON_Delete(res);
 
@@ -765,7 +765,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpub
 
        ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, 0, callback, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -804,11 +804,14 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        req = blade_rpc_request_message_get(brpcreq);
        ks_assert(req);
 
+       // @todo error messages in here SHOULD include requester-nodeid and responder-nodeid in the error responses to ensure
+       // proper return routing of the error message
+
        req_params = cJSON_GetObjectItem(req, "params");
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -816,7 +819,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_command) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'command'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
        command = (blade_rpcpublish_command_t)req_params_command->valueint;
@@ -832,7 +835,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'protocol'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -840,7 +843,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_requester_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'requester-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -848,14 +851,14 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_responder_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'responder-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
        if (!blade_routemgr_master_check(bh->routemgr, req_params_responder_nodeid)) {
                ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -867,7 +870,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                if (req_params_channels->type != cJSON_Array) {
                        ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
                        blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
-                       blade_session_send(bs, res, NULL, NULL);
+                       blade_session_send(bs, res, 0, NULL, NULL);
                        goto done;
                }
 
@@ -875,7 +878,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                        if (req_params_channels_element->type != cJSON_Object) {
                                ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected object\n", blade_session_id_get(bs));
                                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
-                               blade_session_send(bs, res, NULL, NULL);
+                               blade_session_send(bs, res, 0, NULL, NULL);
                                goto done;
                        }
 
@@ -883,7 +886,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                        if (!req_params_channels_element_name) {
                                ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'channels' element name\n", blade_session_id_get(bs));
                                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels element name");
-                               blade_session_send(bs, res, NULL, NULL);
+                               blade_session_send(bs, res, 0, NULL, NULL);
                                goto done;
                        }
 
@@ -891,7 +894,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
                        if (req_params_channels_element_flags && req_params_channels_element_flags->type != cJSON_Number) {
                                ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element flags type, expected number\n", blade_session_id_get(bs));
                                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels element flags type");
-                               blade_session_send(bs, res, NULL, NULL);
+                               blade_session_send(bs, res, 0, NULL, NULL);
                                goto done;
                        }
                }
@@ -946,7 +949,7 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
        // @todo include a list of channels that failed to be added or removed if applicable?
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-       blade_session_send(bs, res, NULL, NULL);
+       blade_session_send(bs, res, 0, NULL, NULL);
 
 done:
 
@@ -996,7 +999,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char
 
        ks_log(KS_LOG_DEBUG, "Session (%s) authorize request started\n", blade_session_id_get(bs));
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, 0, callback, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -1037,11 +1040,14 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        req = blade_rpc_request_message_get(brpcreq);
        ks_assert(req);
 
+       // @todo error messages in here need to include requester-nodeid and responder-nodeid in the error responses to ensure
+       // proper return routing of the error message
+
        req_params = cJSON_GetObjectItem(req, "params");
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1049,7 +1055,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'protocol'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1060,7 +1066,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_authorized_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'authorized-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params authorized-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1068,7 +1074,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_requester_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'requester-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1076,7 +1082,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_responder_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'responder-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1084,14 +1090,14 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_channels) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'channels'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
        if (req_params_channels->type != cJSON_Array) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1099,7 +1105,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
                if (channel->type != cJSON_String) {
                        ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
                        blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
-                       blade_session_send(bs, res, NULL, NULL);
+                       blade_session_send(bs, res, 0, NULL, NULL);
                        goto done;
                }
        }
@@ -1107,7 +1113,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (!blade_routemgr_master_check(bh->routemgr, req_params_responder_nodeid)) {
                ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1140,7 +1146,7 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void
        if (res_result_failed_channels)  cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-       blade_session_send(bs, res, NULL, NULL);
+       blade_session_send(bs, res, 0, NULL, NULL);
 
        if (res_result_unauthorized_channels) {
                blade_handle_rpcsubscribe_raw(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, req_params_protocol, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
@@ -1188,7 +1194,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *p
 
        ks_log(KS_LOG_DEBUG, "Session (%s) locate request started\n", blade_session_id_get(bs));
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, 0, callback, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -1223,11 +1229,14 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        req = blade_rpc_request_message_get(brpcreq);
        ks_assert(req);
 
+       // @todo error messages in here need to include requester-nodeid and responder-nodeid in the error responses to ensure
+       // proper return routing of the error message
+
        req_params = cJSON_GetObjectItem(req, "params");
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1235,7 +1244,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'protocol'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1243,7 +1252,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        if (!req_params_requester_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'requester-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1251,14 +1260,14 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        if (!req_params_responder_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'responder-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
        if (!blade_routemgr_master_check(bh->routemgr, req_params_responder_nodeid)) {
                ks_log(KS_LOG_DEBUG, "Session (%s) locate request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1280,7 +1289,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
        if (res_result_controllers) cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-       blade_session_send(bs, res, NULL, NULL);
+       blade_session_send(bs, res, 0, NULL, NULL);
 
 done:
 
@@ -1292,7 +1301,7 @@ done:
 
 
 // blade.execute request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_session_t *bs = NULL;
@@ -1324,7 +1333,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *
 
        ks_log(KS_LOG_DEBUG, "Session (%s) execute request started\n", blade_session_id_get(bs));
 
-       ret = blade_session_send(bs, req, callback, data);
+       ret = blade_session_send(bs, req, 0, callback, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -1360,11 +1369,14 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d
        req = blade_rpc_request_message_get(brpcreq);
        ks_assert(req);
 
+       // @todo error messages in here need to include requester-nodeid and responder-nodeid in the error responses to ensure
+       // proper return routing of the error message
+
        req_params = cJSON_GetObjectItem(req, "params");
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1372,7 +1384,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_method) {
                ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'method'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params method");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1380,7 +1392,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'protocol'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1388,7 +1400,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_requester_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'requester-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1396,7 +1408,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!req_params_responder_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) execute request missing 'responder-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1408,7 +1420,7 @@ ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *d
        if (!brpc) {
                ks_log(KS_LOG_DEBUG, "Session (%s) execute request unknown method\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Unknown params method");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1539,7 +1551,7 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ
        if (result) cJSON_AddItemToObject(res_result, "result", cJSON_Duplicate(result, 1));
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-       blade_session_send(bs, res, NULL, NULL);
+       blade_session_send(bs, res, 0, NULL, NULL);
 
        cJSON_Delete(res);
 
@@ -1667,7 +1679,7 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh,
 
        ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
 
-       ret = blade_session_send(bs, req, blade_rpcsubscribe_response_handler, data);
+       ret = blade_session_send(bs, req, 0, blade_rpcsubscribe_response_handler, data);
 
 done:
        if (req) cJSON_Delete(req);
@@ -1715,7 +1727,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1723,7 +1735,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_command) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'command'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
        command = (blade_rpcsubscribe_command_t)req_params_command->valueint;
@@ -1737,7 +1749,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1745,7 +1757,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_subscriber_nodeid) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscriber-nodeid");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1759,7 +1771,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_channels) {
                ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'channels'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1805,7 +1817,7 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
                // could be pulled from the original request associated to the response
 
                // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
        } else {
                blade_rpcsubscribe_data_t *temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t));
                temp_data->original_requestid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
@@ -1914,7 +1926,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, voi
                if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", cJSON_Duplicate(res_result_subscribe_channels, 1));
                if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", cJSON_Duplicate(res_result_failed_channels, 1));
 
-               blade_session_send(relay, res, NULL, NULL);
+               blade_session_send(relay, res, 0, NULL, NULL);
 
                cJSON_Delete(res);
 
@@ -1973,7 +1985,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params) {
                ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'params' object\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1981,7 +1993,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_protocol) {
                ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
 
@@ -1989,7 +2001,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        if (!req_params_command) {
                ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'command'\n", blade_session_id_get(bs));
                blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
-               blade_session_send(bs, res, NULL, NULL);
+               blade_session_send(bs, res, 0, NULL, NULL);
                goto done;
        }
        command = (blade_rpcbroadcast_command_t)req_params_command->valueint;
@@ -1999,7 +2011,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
                if (!req_params_event) {
                        ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
                        blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
-                       blade_session_send(bs, res, NULL, NULL);
+                       blade_session_send(bs, res, 0, NULL, NULL);
                        goto done;
                }
        case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
@@ -2007,7 +2019,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
                if (!req_params_channel) {
                        ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
                        blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
-                       blade_session_send(bs, res, NULL, NULL);
+                       blade_session_send(bs, res, 0, NULL, NULL);
                        goto done;
                }
                break;
@@ -2035,7 +2047,7 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
        if (req_params_event) cJSON_AddStringToObject(res_result, "event", req_params_event);
 
        // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
-       blade_session_send(bs, res, NULL, NULL);
+       blade_session_send(bs, res, 0, NULL, NULL);
 
 done:
 
index 59cabdbf279ca3017c1e7729c061876b827c8652..be25dae5b495514f75308fc890ba2e8659290e8e 100644 (file)
@@ -483,7 +483,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
 
                        ks_log(KS_LOG_DEBUG, "Broadcasting: protocol %s, channel %s through %s\n", protocol, channel, blade_session_id_get(bs));
 
-                       blade_session_send(bs, req, callback, data);
+                       blade_session_send(bs, req, 0, callback, data);
 
                        cJSON_Delete(req);
 
index 69e83344b2333f28affe14fe8b6f2da6ebbf99e8..a381794368d6a3a7857c2d44f8678c75fbb8efec 100644 (file)
@@ -57,6 +57,8 @@ KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *b
 KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
+KS_DECLARE(ks_status_t) blade_rpc_request_ttl_set(blade_rpc_request_t *brpcreq, ks_time_t ttl);
+KS_DECLARE(ks_bool_t) blade_rpc_request_expired(blade_rpc_request_t *brpcreq);
 KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
 
index 2d931898129609c0afe5243d1b1d1f9c01aa0ad8..646815d389371f908e70600ca554386dad1ce954 100644 (file)
@@ -48,6 +48,7 @@ KS_DECLARE(ks_status_t) blade_rpcmgr_protocolrpc_remove(blade_rpcmgr_t *brpcmgr,
 KS_DECLARE(blade_rpc_request_t *) blade_rpcmgr_request_lookup(blade_rpcmgr_t *brpcmgr, const char *id);
 KS_DECLARE(ks_status_t) blade_rpcmgr_request_add(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq);
 KS_DECLARE(ks_status_t) blade_rpcmgr_request_remove(blade_rpcmgr_t *brpcmgr, blade_rpc_request_t *brpcreq);
+KS_DECLARE(ks_status_t) blade_rpcmgr_request_timeouts(blade_rpcmgr_t *brpcmgr);
 KS_END_EXTERN_C
 
 #endif
index fb7a2c64ce3ac85c3be2bf3b70ed546e01d503ff..0a292fc08b550b4bbd6afbec08f71e60fb17d478 100644 (file)
@@ -61,7 +61,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
 KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
 KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
-KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data);
 KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
 KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);
index 01d6e04eebb084ef93e9cf96d5a76331ccfe35a3..3a5e4edbf67500f0b41378f0bbcb00642ba71057 100644 (file)
@@ -68,7 +68,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char
 
 KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, blade_rpc_response_callback_t callback, void *data);
 
-KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, cJSON *params, ks_time_t ttl, blade_rpc_response_callback_t callback, void *data);
 KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
 KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
index d904c448af359d639291f44d3d949455e6fdc91e..ad06ee19637d0f0712a4ad8804074f33e7782b7e 100644 (file)
@@ -368,7 +368,7 @@ void command_join(blade_handle_t *bh, char *args)
        }
 
        params = cJSON_CreateObject();
-       blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", params, test_join_response_handler, NULL);
+       blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", params, 0, test_join_response_handler, NULL);
        cJSON_Delete(params);
 }
 
@@ -417,7 +417,7 @@ void command_leave(blade_handle_t *bh, char *args)
        }
 
        params = cJSON_CreateObject();
-       blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", params, test_leave_response_handler, NULL);
+       blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", params, 0, test_leave_response_handler, NULL);
        cJSON_Delete(params);
 }
 
@@ -440,7 +440,7 @@ void command_talk(blade_handle_t *bh, char *args)
        params = cJSON_CreateObject();
        cJSON_AddStringToObject(params, "text", args);
        //blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", params, test_talk_response_handler, NULL);
-       blade_handle_rpcexecute(bh, "blade:testcon@freeswitch.com", "test.talk", "test", params, test_talk_response_handler, NULL);
+       blade_handle_rpcexecute(bh, "blade:testcon@freeswitch.com", "test.talk", "test", params, 0, test_talk_response_handler, NULL);
        cJSON_Delete(params);
 }
 
index 4dffa70330bf20787a7054b46722cd036d378a80..a12355fa49e658239d594f8ad725d3d2069452ba 100644 (file)
@@ -180,7 +180,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
        // send rpcexecute response to the requester
        result = cJSON_CreateObject();
 
-       blade_rpcexecute_response_send(brpcreq, result);
+       //blade_rpcexecute_response_send(brpcreq, result);
 
        cJSON_Delete(result);