};
+ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
}
// register internal core rpcs for blade.xxx
+ blade_rpc_create(&brpc, bh, "blade.register", NULL, NULL, blade_protocol_register_request_handler, NULL);
+ blade_handle_corerpc_register(brpc);
+
blade_rpc_create(&brpc, bh, "blade.publish", NULL, NULL, blade_protocol_publish_request_handler, NULL);
blade_handle_corerpc_register(brpc);
goto done;
}
- if (bh->master_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid);
+ if (bh->local_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid);
if (nodeid) bh->local_nodeid = ks_pstrdup(bh->pool, nodeid);
ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid);
ks_log(KS_LOG_DEBUG, "Route Added: %s through %s\n", key, value);
- // @todo when a route is added, upstream needs to be notified that the identity can be found through the session to the
- // upstream router, and likewise up the chain to the Master Router Node, to create a complete route from anywhere else
+ blade_protocol_register(bh, nodeid, KS_FALSE, NULL, NULL);
return KS_STATUS_SUCCESS;
}
ks_hash_remove(bh->routes, (void *)nodeid);
- // @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through
- // this node so the routes leading here need to be cleared by passing a "blade.register" upstream to remove the routes, this
- // should actually happen only for local sessions, and blade.register should be always passed upstream AND processed locally, so
- // we don't want to duplicate blade.register calls already being passed up if this route is not a local session
+ blade_protocol_register(bh, nodeid, KS_TRUE, NULL, NULL);
// @note everything below here is for master-only cleanup when a node is no longer routable
ks_assert(bh);
ks_rwl_read_lock(bh->local_nodeid_rwl);
- bs = blade_handle_sessions_lookup(bh, bh->local_nodeid);
+ if (bh->local_nodeid) bs = blade_handle_sessions_lookup(bh, bh->local_nodeid);
ks_rwl_read_unlock(bh->local_nodeid_rwl);
return bs;
// @todo all higher level errors should be handled by each of the calls internally so that a normal result response can be sent with an error block inside the result
// which is important for implementation of blade.execute where errors can be relayed back to the requester properly
+// blade.register request generator
+KS_DECLARE(ks_status_t) blade_protocol_register(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_session_t *bs = NULL;
+ ks_pool_t *pool = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+
+ ks_assert(bh);
+ ks_assert(nodeid);
+
+ if (!(bs = blade_handle_sessions_upstream(bh))) {
+ ret = KS_STATUS_DISCONNECTED;
+ goto done;
+ }
+
+ pool = blade_handle_pool_get(bh);
+ ks_assert(pool);
+
+ blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.register");
+
+ // fill in the req_params
+ cJSON_AddStringToObject(req_params, "nodeid", nodeid);
+ if (remove) cJSON_AddTrueToObject(req_params, "remove");
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) register request (%s %s) started\n", blade_session_id_get(bs), remove ? "removing" : "adding", nodeid);
+
+ ret = blade_session_send(bs, req, callback, data);
+
+done:
+ if (req) cJSON_Delete(req);
+ if (bs) blade_session_read_unlock(bs);
+
+ return ret;
+}
+
+// blade.register request handler
+ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, void *data)
+{
+ blade_handle_t *bh = NULL;
+ blade_session_t *bs = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ const char *req_params_nodeid = NULL;
+ cJSON *req_params_remove = NULL;
+ ks_bool_t remove = KS_FALSE;
+ cJSON *res = NULL;
+ cJSON *res_result = NULL;
+
+ ks_assert(brpcreq);
+
+ bh = blade_rpc_request_handle_get(brpcreq);
+ ks_assert(bh);
+
+ bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
+ ks_assert(bs);
+
+ req = blade_rpc_request_message_get(brpcreq);
+ ks_assert(req);
+
+ req_params = cJSON_GetObjectItem(req, "params");
+ if (!req_params) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'params' object\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_nodeid = cJSON_GetObjectCstr(req_params, "nodeid");
+ if (!req_params_nodeid) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'nodeid'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ req_params_remove = cJSON_GetObjectItem(req_params, "remove");
+ remove = req_params_remove && req_params_remove->type == cJSON_True;
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) register request (%s %s) processing\n", blade_session_id_get(bs), remove ? "removing" : "adding", req_params_nodeid);
+
+ if (remove) {
+ blade_session_route_remove(bs, req_params_nodeid);
+ blade_handle_route_remove(bh, req_params_nodeid);
+ } else {
+ blade_session_route_add(bs, req_params_nodeid);
+ blade_handle_route_add(bh, req_params_nodeid, blade_session_id_get(bs));
+ }
+
+ blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+ blade_session_send(bs, res, NULL, NULL);
+
+done:
+
+ if (res) cJSON_Delete(res);
+ if (bs) blade_session_read_unlock(bs);
+
+ return KS_FALSE;
+}
+
+
// blade.publish request generator
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
{
ret = blade_session_send(bs, req, callback, data);
- // @todo upon return, a provider should register the methods for this protocol to be locally available
- // prior to receiving a response, if the response is an error then unregister, but if it is successful
- // then the node is already primed to receive any immediate requests
-
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
blade_session_read_unlock(bs);
}
-// blade.locate response handler
-//ks_bool_t blade_protocol_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
-//{
-// ks_bool_t ret = KS_FALSE;
-// blade_handle_t *bh = NULL;
-// blade_session_t *bs = NULL;
-// blade_rpc_response_callback_wrapper_t *wrapper = NULL;
-// blade_rpc_response_callback_t callback = NULL;
-// cJSON *res = NULL;
-// cJSON *res_error = NULL;
-// cJSON *res_result = NULL;
-// cJSON *res_object = NULL;
-// //cJSON *res_result_providers = NULL;
-// const char *requester_nodeid = NULL;
-// const char *responder_nodeid = NULL;
-// //const char *res_result_protocol = NULL;
-// //const char *res_result_realm = NULL;
-//
-// ks_assert(brpcres);
-// ks_assert(data);
-//
-// wrapper = (blade_rpc_response_callback_wrapper_t *)data;
-// callback = wrapper->callback;
-// data = wrapper->data;
-// ks_pool_free(wrapper->pool, &wrapper);
-//
-// bh = blade_rpc_response_handle_get(brpcres);
-// ks_assert(bh);
-//
-// bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres));
-// ks_assert(bs);
-//
-// res = blade_rpc_response_message_get(brpcres);
-// ks_assert(res);
-//
-// res_error = cJSON_GetObjectItem(res, "error");
-// res_result = cJSON_GetObjectItem(res, "result");
-//
-// if (!res_error && !res_result) {
-// ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs));
-// goto done;
-// }
-// res_object = res_error ? res_error : res_result;
-//
-// requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
-// responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
-//
-// ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
-//
-// if (callback) ret = callback(brpcres, data);
-//
-// //if (res_error) {
-// // // @todo process error response
-// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs));
-// // goto done;
-// //}
-//
-// // process result response
-//
-// //res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
-// //if (!res_result_protocol) {
-// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs));
-// // goto done;
-// //}
-//
-// //res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
-// //if (!res_result_realm) {
-// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs));
-// // goto done;
-// //}
-//
-// //res_result_providers = cJSON_GetObjectItem(res_result, "providers");
-// //if (!res_result_providers) {
-// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs));
-// // goto done;
-// //}
-//
-// //for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) {
-// // cJSON *elem = cJSON_GetArrayItem(res_result_providers, index);
-// // if (elem->type == cJSON_String) {
-// // ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring);
-// // }
-// //}
-//
-//done:
-// if (bs) blade_session_read_unlock(bs);
-//
-// return ret;
-//}
-
/* For Emacs:
* Local Variables:
* mode:c