ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, void *data);
ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres);
+ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data);
+ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres);
+
typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
struct blade_handle_session_state_callback_registration_s {
blade_jsonrpc_create(&bjsonrpc, bh, "blade.publish", blade_protocol_publish_request_handler, NULL);
blade_handle_jsonrpc_register(bjsonrpc);
+ blade_jsonrpc_create(&bjsonrpc, bh, "blade.locate", blade_protocol_locate_request_handler, NULL);
+ blade_handle_jsonrpc_register(bjsonrpc);
+
blade_transport_wss_create(&bt, bh);
ks_assert(bt);
bh->default_transport = bt;
// blade.publish request generator
-// @todo add additional async callback to be called upon a publish response to inform caller of the result?
+// @todo add additional async callback to be called upon a publish response to inform caller of the result from the publish response handler
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm)
{
ks_status_t ret = KS_STATUS_SUCCESS;
cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
ks_rwl_read_unlock(bh->master_nodeid_rwl);
+ // @todo add a parameter containing a block of json for schema definitions for each of the methods being published
+
ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
- ret = blade_session_send(bs, req, blade_protocol_publish_response_handler);
+ ret = blade_session_send(bs, req, blade_protocol_publish_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response
+
+ // @todo upon return, a provider should register the methods for this protocol to be locally available
+ // prior to receiving a response, if the response is an error then unregister, but if it is successful
+ // then the node is already primed to receive any immediate requests
done:
if (req) cJSON_Delete(req);
blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+ cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
return KS_FALSE;
}
+
+// blade.locate request generator
+// @todo add additional async callback to be called upon a locate response to inform caller of the result from the locate response handler
+KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ blade_session_t *bs = NULL;
+
+ ks_assert(bh);
+ ks_assert(name);
+ ks_assert(realm);
+
+ if (!(bs = blade_handle_sessions_upstream(bh))) {
+ ret = KS_STATUS_DISCONNECTED;
+ goto done;
+ }
+
+ blade_jsonrpc_request_raw_create(blade_handle_pool_get(bh), &req, &req_params, NULL, "blade.locate");
+
+ // fill in the req_params
+ cJSON_AddStringToObject(req_params, "protocol", name);
+ cJSON_AddStringToObject(req_params, "realm", realm);
+
+ ks_rwl_read_lock(bh->local_nodeid_rwl);
+ cJSON_AddStringToObject(req_params, "requester-nodeid", bh->local_nodeid);
+ ks_rwl_read_unlock(bh->local_nodeid_rwl);
+
+ ks_rwl_read_lock(bh->master_nodeid_rwl);
+ cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
+ ks_rwl_read_unlock(bh->master_nodeid_rwl);
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request started\n", blade_session_id_get(bs));
+ ret = blade_session_send(bs, req, blade_protocol_locate_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response
+
+ // @todo upon return, a provider should register the methods for this protocol to be locally available
+ // prior to receiving a response, if the response is an error then unregister, but if it is successful
+ // then the node is already primed to receive any immediate requests
+
+done:
+ if (req) cJSON_Delete(req);
+ if (bs) blade_session_read_unlock(bs);
+
+ return ret;
+}
+
+// blade.locate request handler
+ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data)
+{
+ blade_handle_t *bh = NULL;
+ blade_session_t *bs = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ const char *req_params_protocol = NULL;
+ const char *req_params_realm = NULL;
+ const char *req_params_requester_nodeid = NULL;
+ const char *req_params_responder_nodeid = NULL;
+ cJSON *res = NULL;
+ cJSON *res_result = NULL;
+ cJSON *res_result_providers;
+ blade_protocol_t *bp = NULL;
+ const char *bp_key = NULL;
+
+ ks_assert(breq);
+
+ bh = blade_jsonrpc_request_handle_get(breq);
+ ks_assert(bh);
+
+ bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_request_sessionid_get(breq));
+ ks_assert(bs);
+
+ req = blade_jsonrpc_request_message_get(breq);
+ ks_assert(req);
+
+ req_params = cJSON_GetObjectItem(req, "params");
+ if (!req_params) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'params' object\n", blade_session_id_get(bs));
+ blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params object");
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+ if (!req_params_protocol) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'protocol'\n", blade_session_id_get(bs));
+ blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params protocol");
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+ if (!req_params_realm) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'realm'\n", blade_session_id_get(bs));
+ blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params realm");
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
+
+ req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
+ if (!req_params_requester_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'requester-nodeid'\n", blade_session_id_get(bs));
+ blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params requester-nodeid");
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
+ if (!req_params_responder_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'responder-nodeid'\n", blade_session_id_get(bs));
+ blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params responder-nodeid");
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ if (!blade_handle_master_nodeid_compare(bh, req_params_responder_nodeid)) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
+ blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Invalid params responder-nodeid");
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
+ // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
+
+ if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
+ // not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node
+ blade_session_t *bsu = blade_handle_sessions_upstream(bh);
+ if (!bsu) {
+ cJSON *res_error = NULL;
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
+ blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
+
+ // needed in case this error must propagate further than the session which sent it
+ cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
+ cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
+
+ blade_session_send(bs, res, NULL);
+ goto done;
+ }
+
+ // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
+ // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
+ // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
+ // routed would not enter into these handlers and would not leave a footprint passing through routers
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
+ blade_session_send(bsu, req, blade_protocol_locate_response_handler);
+ blade_session_read_unlock(bsu);
+
+ goto done;
+ }
+
+ // this local node must be responder-nodeid for the request, so process the request
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
+
+ res_result_providers = cJSON_CreateObject();
+
+ bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm);
+
+ ks_hash_read_lock(bh->protocols);
+
+ bp = (blade_protocol_t *)ks_hash_search(bh->protocols, (void *)bp_key, KS_UNLOCKED);
+ if (bp) {
+ ks_hash_t *providers = blade_protocol_providers_get(bp);
+ for (ks_hash_iterator_t *it = ks_hash_first(providers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const char *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, &value);
+
+ cJSON_AddItemToArray(res_result_providers, cJSON_CreateString(key));
+ }
+ }
+
+ ks_hash_read_unlock(bh->protocols);
+
+
+ // build the actual response finally
+ blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
+
+ cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+ cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+ cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
+ cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
+ cJSON_AddItemToObject(res_result, "providers", res_result_providers);
+
+ // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+ blade_session_send(bs, res, NULL);
+
+done:
+
+ if (res) cJSON_Delete(res);
+ if (bs) blade_session_read_unlock(bs);
+
+ return KS_FALSE;
+}
+
+// blade.locate response handler
+ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres)
+{
+ blade_handle_t *bh = NULL;
+ blade_session_t *bs = NULL;
+ cJSON *res = NULL;
+ cJSON *res_error = NULL;
+ cJSON *res_result = NULL;
+ cJSON *res_object = NULL;
+ cJSON *res_result_providers = NULL;
+ const char *requester_nodeid = NULL;
+ const char *responder_nodeid = NULL;
+ const char *res_result_protocol = NULL;
+ const char *res_result_realm = NULL;
+
+ ks_assert(bres);
+
+ bh = blade_jsonrpc_response_handle_get(bres);
+ ks_assert(bh);
+
+ bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_response_sessionid_get(bres));
+ ks_assert(bs);
+
+ res = blade_jsonrpc_response_message_get(bres);
+ ks_assert(res);
+
+ res_error = cJSON_GetObjectItem(res, "error");
+ res_result = cJSON_GetObjectItem(res, "result");
+
+ if (!res_error && !res_result) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs));
+ goto done;
+ }
+ res_object = res_error ? res_error : res_result;
+
+ requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
+ responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
+
+ if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
+ blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
+ if (!bsd) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
+ goto done;
+ }
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
+ blade_session_send(bsd, res, NULL);
+ blade_session_read_unlock(bsd);
+
+ goto done;
+ }
+
+ // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
+
+ if (res_error) {
+ // @todo process error response
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ // process result response
+
+ res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
+ if (!res_result_protocol) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
+ if (!res_result_realm) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ res_result_providers = cJSON_GetObjectItem(res_result, "providers");
+ if (!res_result_providers) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs));
+ goto done;
+ }
+
+ for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) {
+ cJSON *elem = cJSON_GetArrayItem(res_result_providers, index);
+ if (elem->type == cJSON_String) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring);
+ }
+ }
+
+done:
+ if (bs) blade_session_read_unlock(bs);
+
+ return KS_FALSE;
+}
+
/* For Emacs:
* Local Variables:
* mode:c