]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: First pass on adding support for blade.locate
authorShane Bryldt <astaelan@gmail.com>
Wed, 31 May 2017 04:44:46 +0000 (22:44 -0600)
committerShane Bryldt <astaelan@gmail.com>
Wed, 31 May 2017 04:45:06 +0000 (22:45 -0600)
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade_stack.h
libs/libblade/test/blades.c

index 888fd1d30fea1803e8a1c53f3f18ec7397118b68..ca3ab130466b10b5136c10ac4c6e17a791fbfd3a 100644 (file)
@@ -88,6 +88,9 @@ struct blade_handle_s {
 ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, void *data);
 ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres);
 
+ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data);
+ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres);
+
 
 typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
 struct blade_handle_session_state_callback_registration_s {
@@ -328,6 +331,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
        blade_jsonrpc_create(&bjsonrpc, bh, "blade.publish", blade_protocol_publish_request_handler, NULL);
        blade_handle_jsonrpc_register(bjsonrpc);
 
+       blade_jsonrpc_create(&bjsonrpc, bh, "blade.locate", blade_protocol_locate_request_handler, NULL);
+       blade_handle_jsonrpc_register(bjsonrpc);
+
        blade_transport_wss_create(&bt, bh);
        ks_assert(bt);
        bh->default_transport = bt;
@@ -1042,7 +1048,7 @@ KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *b
 
 
 // blade.publish request generator
-// @todo add additional async callback to be called upon a publish response to inform caller of the result?
+// @todo add additional async callback to be called upon a publish response to inform caller of the result from the publish response handler
 KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -1073,8 +1079,14 @@ KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *n
        cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
        ks_rwl_read_unlock(bh->master_nodeid_rwl);
 
+       // @todo add a parameter containing a block of json for schema definitions for each of the methods being published
+
        ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
-       ret = blade_session_send(bs, req, blade_protocol_publish_response_handler);
+       ret = blade_session_send(bs, req, blade_protocol_publish_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response
+
+       // @todo upon return, a provider should register the methods for this protocol to be locally available
+       // prior to receiving a response, if the response is an error then unregister, but if it is successful
+       // then the node is already primed to receive any immediate requests
 
 done:
        if (req) cJSON_Delete(req);
@@ -1229,6 +1241,7 @@ ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq,
        blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
 
        cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+       cJSON_AddStringToObject(res_result, "realm", req_params_realm);
        cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
        cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
 
@@ -1309,6 +1322,300 @@ done:
        return KS_FALSE;
 }
 
+
+// blade.locate request generator
+// @todo add additional async callback to be called upon a locate response to inform caller of the result from the locate response handler
+KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       blade_session_t *bs = NULL;
+
+       ks_assert(bh);
+       ks_assert(name);
+       ks_assert(realm);
+
+       if (!(bs = blade_handle_sessions_upstream(bh))) {
+               ret = KS_STATUS_DISCONNECTED;
+               goto done;
+       }
+
+       blade_jsonrpc_request_raw_create(blade_handle_pool_get(bh), &req, &req_params, NULL, "blade.locate");
+
+       // fill in the req_params
+       cJSON_AddStringToObject(req_params, "protocol", name);
+       cJSON_AddStringToObject(req_params, "realm", realm);
+
+       ks_rwl_read_lock(bh->local_nodeid_rwl);
+       cJSON_AddStringToObject(req_params, "requester-nodeid", bh->local_nodeid);
+       ks_rwl_read_unlock(bh->local_nodeid_rwl);
+
+       ks_rwl_read_lock(bh->master_nodeid_rwl);
+       cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
+       ks_rwl_read_unlock(bh->master_nodeid_rwl);
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) locate request started\n", blade_session_id_get(bs));
+       ret = blade_session_send(bs, req, blade_protocol_locate_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response
+
+       // @todo upon return, a provider should register the methods for this protocol to be locally available
+       // prior to receiving a response, if the response is an error then unregister, but if it is successful
+       // then the node is already primed to receive any immediate requests
+
+done:
+       if (req) cJSON_Delete(req);
+       if (bs) blade_session_read_unlock(bs);
+
+       return ret;
+}
+
+// blade.locate request handler
+ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       const char *req_params_protocol = NULL;
+       const char *req_params_realm = NULL;
+       const char *req_params_requester_nodeid = NULL;
+       const char *req_params_responder_nodeid = NULL;
+       cJSON *res = NULL;
+       cJSON *res_result = NULL;
+       cJSON *res_result_providers;
+       blade_protocol_t *bp = NULL;
+       const char *bp_key = NULL;
+
+       ks_assert(breq);
+
+       bh = blade_jsonrpc_request_handle_get(breq);
+       ks_assert(bh);
+
+       bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_request_sessionid_get(breq));
+       ks_assert(bs);
+
+       req = blade_jsonrpc_request_message_get(breq);
+       ks_assert(req);
+
+       req_params = cJSON_GetObjectItem(req, "params");
+       if (!req_params) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'params' object\n", blade_session_id_get(bs));
+               blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params object");
+               blade_session_send(bs, res, NULL);
+               goto done;
+       }
+
+       req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+       if (!req_params_protocol) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'protocol'\n", blade_session_id_get(bs));
+               blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params protocol");
+               blade_session_send(bs, res, NULL);
+               goto done;
+       }
+
+       req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+       if (!req_params_realm) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'realm'\n", blade_session_id_get(bs));
+               blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params realm");
+               blade_session_send(bs, res, NULL);
+               goto done;
+       }
+
+       // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
+
+       req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
+       if (!req_params_requester_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'requester-nodeid'\n", blade_session_id_get(bs));
+               blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params requester-nodeid");
+               blade_session_send(bs, res, NULL);
+               goto done;
+       }
+
+       req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
+       if (!req_params_responder_nodeid) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'responder-nodeid'\n", blade_session_id_get(bs));
+               blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params responder-nodeid");
+               blade_session_send(bs, res, NULL);
+               goto done;
+       }
+
+       if (!blade_handle_master_nodeid_compare(bh, req_params_responder_nodeid)) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
+               blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Invalid params responder-nodeid");
+               blade_session_send(bs, res, NULL);
+               goto done;
+       }
+
+       // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
+       // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
+
+       if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
+               // not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node
+               blade_session_t *bsu = blade_handle_sessions_upstream(bh);
+               if (!bsu) {
+                       cJSON *res_error = NULL;
+
+                       ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
+                       blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
+
+                       // needed in case this error must propagate further than the session which sent it
+                       cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
+                       cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
+
+                       blade_session_send(bs, res, NULL);
+                       goto done;
+               }
+
+               // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
+               // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
+               // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
+               // routed would not enter into these handlers and would not leave a footprint passing through routers
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
+               blade_session_send(bsu, req, blade_protocol_locate_response_handler);
+               blade_session_read_unlock(bsu);
+
+               goto done;
+       }
+
+       // this local node must be responder-nodeid for the request, so process the request
+       ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
+
+       res_result_providers = cJSON_CreateObject();
+
+       bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm);
+
+       ks_hash_read_lock(bh->protocols);
+
+       bp = (blade_protocol_t *)ks_hash_search(bh->protocols, (void *)bp_key, KS_UNLOCKED);
+       if (bp) {
+               ks_hash_t *providers = blade_protocol_providers_get(bp);
+               for (ks_hash_iterator_t *it = ks_hash_first(providers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       const char *key = NULL;
+                       void *value = NULL;
+
+                       ks_hash_this(it, (const void **)&key, NULL, &value);
+
+                       cJSON_AddItemToArray(res_result_providers, cJSON_CreateString(key));
+               }
+       }
+
+       ks_hash_read_unlock(bh->protocols);
+
+
+       // build the actual response finally
+       blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
+
+       cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+       cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+       cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
+       cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
+       cJSON_AddItemToObject(res_result, "providers", res_result_providers);
+
+       // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+       blade_session_send(bs, res, NULL);
+
+done:
+
+       if (res) cJSON_Delete(res);
+       if (bs) blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
+
+// blade.locate response handler
+ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       cJSON *res = NULL;
+       cJSON *res_error = NULL;
+       cJSON *res_result = NULL;
+       cJSON *res_object = NULL;
+       cJSON *res_result_providers = NULL;
+       const char *requester_nodeid = NULL;
+       const char *responder_nodeid = NULL;
+       const char *res_result_protocol = NULL;
+       const char *res_result_realm = NULL;
+
+       ks_assert(bres);
+
+       bh = blade_jsonrpc_response_handle_get(bres);
+       ks_assert(bh);
+
+       bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_response_sessionid_get(bres));
+       ks_assert(bs);
+
+       res = blade_jsonrpc_response_message_get(bres);
+       ks_assert(res);
+
+       res_error = cJSON_GetObjectItem(res, "error");
+       res_result = cJSON_GetObjectItem(res, "result");
+
+       if (!res_error && !res_result) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs));
+               goto done;
+       }
+       res_object = res_error ? res_error : res_result;
+
+       requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
+       responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
+
+       if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
+               blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
+               if (!bsd) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
+                       goto done;
+               }
+
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
+               blade_session_send(bsd, res, NULL);
+               blade_session_read_unlock(bsd);
+
+               goto done;
+       }
+
+       // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
+       ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
+
+       if (res_error) {
+               // @todo process error response
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       // process result response
+
+       res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
+       if (!res_result_protocol) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
+       if (!res_result_realm) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       res_result_providers = cJSON_GetObjectItem(res_result, "providers");
+       if (!res_result_providers) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs));
+               goto done;
+       }
+
+       for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) {
+               cJSON *elem = cJSON_GetArrayItem(res_result_providers, index);
+               if (elem->type == cJSON_String) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring);
+               }
+       }
+
+done:
+       if (bs) blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 7b9f56aed5830907be83491b1d048acc6de774aa..05c47acf0f203f02bfbbd6fefd3f574e0562c165 100644 (file)
@@ -92,6 +92,7 @@ KS_DECLARE(ks_status_t) blade_handle_session_state_callback_unregister(blade_han
 KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *bs, blade_session_state_condition_t condition);
 
 KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm);
+KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm);
 KS_END_EXTERN_C
 
 #endif
index 4d7b13b09e1e073e060de52de159f60202ee2fb7..6402ecfe81ab1edc6c75a7625d19b3344fd133bf 100644 (file)
@@ -16,9 +16,11 @@ struct command_def_s {
 };
 
 void command_quit(blade_handle_t *bh, char *args);
+void command_locate(blade_handle_t *bh, char *args);
 
 static const struct command_def_s command_defs[] = {
        { "quit", command_quit },
+       { "locate", command_locate },
 
        { NULL, NULL }
 };
@@ -110,7 +112,7 @@ int main(int argc, char **argv)
 
                blade_identity_destroy(&target);
 
-               ks_sleep_ms(5000);
+               ks_sleep_ms(5000); // @todo use session state change callback to know when the session is ready, this ensures it's ready before trying to publish upstream
 
                blade_protocol_publish(bh, "test", "mydomain.com");
        }
@@ -191,6 +193,13 @@ void command_quit(blade_handle_t *bh, char *args)
        g_shutdown = KS_TRUE;
 }
 
+void command_locate(blade_handle_t *bh, char *args)
+{
+       ks_assert(bh);
+       ks_assert(args);
+
+       blade_protocol_locate(bh, "test", "mydomain.com");
+}