ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
{
+ blade_handle_t *bh = NULL;
blade_jsonrpc_request_t *bjsonrpcreq = NULL;
blade_jsonrpc_response_t *bjsonrpcres = NULL;
const char *jsonrpc = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) processing\n", bs->id);
+ bh = blade_session_handle_get(bs);
+ ks_assert(bh);
jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc");
if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
// 2) Receiving a request (server: method callee or provider)
blade_jsonrpc_t *bjsonrpc = NULL;
blade_jsonrpc_request_callback_t callback = NULL;
+ cJSON *params = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method);
+ params = cJSON_GetObjectItem(json, "params");
+ if (params) {
+ const char *params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid");
+ const char *params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid");
+ if (params_requester_nodeid && params_responder_nodeid && !blade_handle_local_nodeid_compare(bh, params_responder_nodeid)) {
+ // not meant for local processing, continue with standard unicast routing for requests
+ blade_session_t *bs_router = blade_handle_route_lookup(bh, params_responder_nodeid);
+ if (!bs_router) {
+ bs_router = blade_handle_sessions_upstream(bh);
+ if (!bs_router) {
+ cJSON *res = NULL;
+ cJSON *res_error = NULL;
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) but upstream session unavailable\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid);
+ blade_jsonrpc_error_raw_create(&res, &res_error, id, -32603, "Upstream session unavailable");
+
+ // needed in case this error must propagate further than the session which sent it
+ cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid);
+ cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
+
+ blade_session_send(bs, res, NULL);
+ return KS_STATUS_DISCONNECTED;
+ }
+ }
+
+ if (bs_router == bs) {
+ // @todo avoid circular by sending back an error instead, really should not happen but check for posterity in case a node is misbehaving for some reason
+ }
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router));
+ blade_session_send(bs_router, json, NULL);
+ blade_session_read_unlock(bs_router);
+
+ return KS_STATUS_SUCCESS;
+ }
+ }
+
+ // reach here if the request was not captured for routing, this SHOULD always mean the message is to be processed by local handlers
bjsonrpc = blade_handle_jsonrpc_lookup(bs->handle, method);
if (!bjsonrpc) {
// @note This is scenario 4
// 4) Receiving a response or error (client: method caller or consumer)
blade_jsonrpc_response_callback_t callback = NULL;
+ cJSON *error = NULL;
+ cJSON *result = NULL;
+ cJSON *object = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) receiving response (%s)\n", bs->id, id);
+ error = cJSON_GetObjectItem(json, "error");
+ result = cJSON_GetObjectItem(json, "result");
+ object = error ? error : result;
+
+ if (object) {
+ const char *object_requester_nodeid = cJSON_GetObjectCstr(object, "requester-nodeid");
+ const char *object_responder_nodeid = cJSON_GetObjectCstr(object, "responder-nodeid");
+ if (object_requester_nodeid && object_responder_nodeid && !blade_handle_local_nodeid_compare(bh, object_requester_nodeid)) {
+ // not meant for local processing, continue with standard unicast routing for responses
+ blade_session_t *bs_router = blade_handle_route_lookup(bh, object_requester_nodeid);
+ if (!bs_router) {
+ bs_router = blade_handle_sessions_upstream(bh);
+ if (!bs_router) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) but upstream session unavailable\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid);
+ return KS_STATUS_DISCONNECTED;
+ }
+ }
+
+ if (bs_router == bs) {
+ // @todo avoid circular, really should not happen but check for posterity in case a node is misbehaving for some reason
+ }
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router));
+ blade_session_send(bs_router, json, NULL);
+ blade_session_read_unlock(bs_router);
+
+ return KS_STATUS_SUCCESS;
+ }
+ }
+
bjsonrpcreq = blade_handle_requests_lookup(bs->handle, id);
if (!bjsonrpcreq) {
// @todo hangup session entirely?
ks_hash_remove(bh->routes, (void *)nodeid);
// @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through
- // this node so the routes leading here need to be cleared by passing a "blade.route" upstream to remove the routes, this
- // should actually happen only for local sessions, and blade.route should be always passed upstream AND processed locally, so
- // we don't want to duplicate blade.route calls already being passed up if this route is not a local session
+ // this node so the routes leading here need to be cleared by passing a "blade.register" upstream to remove the routes, this
+ // should actually happen only for local sessions, and blade.register should be always passed upstream AND processed locally, so
+ // we don't want to duplicate blade.register calls already being passed up if this route is not a local session
// @note everything below here is for master-only cleanup when a node is no longer routable
goto done;
}
- // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
- // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
-
- if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
- // not meant for local processing, continue with routing which on a publish request, it always goes upstream to the master node
- blade_session_t *bsu = blade_handle_sessions_upstream(bh);
- if (!bsu) {
- cJSON *res_error = NULL;
-
- ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
- blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
-
- // needed in case this error must propagate further than the session which sent it
- cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
- cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
-
- blade_session_send(bs, res, NULL);
- goto done;
- }
-
- // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
- // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
- // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
- // routed would not enter into these handlers and would not leave a footprint passing through routers
- ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
- blade_session_send(bsu, req, blade_protocol_publish_response_handler);
- blade_session_read_unlock(bsu);
-
- goto done;
- }
-
- // this local node must be responder-nodeid for the request, so process the request
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm);
cJSON *res_error = NULL;
cJSON *res_result = NULL;
cJSON *res_object = NULL;
- const char *requester_nodeid = NULL;
- const char *responder_nodeid = NULL;
+ //const char *requester_nodeid = NULL;
+ //const char *responder_nodeid = NULL;
ks_assert(bres);
}
res_object = res_error ? res_error : res_result;
- requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
- responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
-
- if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
- blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
- if (!bsd) {
- ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
- goto done;
- }
-
- ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
- blade_session_send(bsd, res, NULL);
- blade_session_read_unlock(bsd);
-
- goto done;
- }
+ //requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
+ //responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
- // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
ks_log(KS_LOG_DEBUG, "Session (%s) publish response processing\n", blade_session_id_get(bs));
if (res_error) {
goto done;
}
- // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
- // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
-
- if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
- // not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node
- blade_session_t *bsu = blade_handle_sessions_upstream(bh);
- if (!bsu) {
- cJSON *res_error = NULL;
-
- ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
- blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
-
- // needed in case this error must propagate further than the session which sent it
- cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
- cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
-
- blade_session_send(bs, res, NULL);
- goto done;
- }
-
- // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
- // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
- // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
- // routed would not enter into these handlers and would not leave a footprint passing through routers
- ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
- blade_session_send(bsu, req, blade_protocol_locate_response_handler);
- blade_session_read_unlock(bsu);
-
- goto done;
- }
-
- // this local node must be responder-nodeid for the request, so process the request
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
res_result_providers = cJSON_CreateObject();
requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
- if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
- blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
- if (!bsd) {
- ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
- goto done;
- }
-
- ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
- blade_session_send(bsd, res, NULL);
- blade_session_read_unlock(bsd);
-
- goto done;
- }
-
- // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
if (res_error) {