ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL);
blade_handle_corerpc_register(brpc);
+ blade_rpc_create(&brpc, bh, "blade.broadcast", NULL, NULL, blade_protocol_broadcast_request_handler, NULL);
+ blade_handle_corerpc_register(brpc);
+
// register internal transport for secure websockets
blade_transport_wss_create(&bt, bh);
ks_assert(bt);
return KS_FALSE;
}
+
+// blade.broadcast request generator
+KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(bh);
+ ks_assert(event);
+ ks_assert(protocol);
+ ks_assert(realm);
+
+ // this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to
+ ret = blade_protocol_broadcast_raw(bh, NULL, event, protocol, realm, params, callback, data);
+
+ // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
+ // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
+ // is where this normally occurs
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+{
+ const char *bsub_key = NULL;
+ blade_subscription_t *bsub = NULL;
+ blade_session_t *bs = NULL;
+
+ ks_assert(bh);
+ ks_assert(event);
+ ks_assert(protocol);
+ ks_assert(realm);
+
+ bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
+
+ ks_hash_read_lock(bh->subscriptions);
+
+ bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+ if (bsub) {
+ ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
+
+ ks_assert(subscribers);
+
+ for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+
+ ks_hash_this(it, &key, NULL, &value);
+
+ if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
+
+ if (blade_handle_local_nodeid_compare(bh, (const char *)key)) continue;
+
+ bs = blade_handle_sessions_lookup(bh, (const char *)key);
+ if (bs) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
+
+ blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast");
+
+ cJSON_AddStringToObject(req_params, "event", event);
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
+ cJSON_AddStringToObject(req_params, "realm", realm);
+
+ if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+
+ blade_session_send(bs, req, callback, data);
+
+ cJSON_Delete(req);
+
+ blade_session_read_unlock(bs);
+ }
+ }
+ }
+
+ ks_hash_read_unlock(bh->subscriptions);
+
+ ks_pool_free(bh->pool, &bsub_key);
+
+ bs = blade_handle_sessions_upstream(bh);
+ if (bs) {
+ if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
+
+ blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast");
+
+ cJSON_AddStringToObject(req_params, "event", event);
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
+ cJSON_AddStringToObject(req_params, "realm", realm);
+
+ if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+
+ blade_session_send(bs, req, callback, data);
+
+ cJSON_Delete(req);
+ }
+
+ blade_session_read_unlock(bs);
+ }
+ return KS_STATUS_SUCCESS;
+}
+
+// blade.broadcast request handler
+ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
+{
+ ks_bool_t ret = KS_FALSE;
+ blade_handle_t *bh = NULL;
+ blade_session_t *bs = NULL;
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ const char *req_params_event = NULL;
+ const char *req_params_protocol = NULL;
+ const char *req_params_realm = NULL;
+ cJSON *req_params_params = NULL;
+ const char *bsub_key = NULL;
+ blade_subscription_t *bsub = NULL;
+ blade_rpc_request_callback_t callback = NULL;
+ cJSON *res = NULL;
+ cJSON *res_result = NULL;
+
+ ks_assert(brpcreq);
+
+ bh = blade_rpc_request_handle_get(brpcreq);
+ ks_assert(bh);
+
+ bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
+ ks_assert(bs);
+
+ req = blade_rpc_request_message_get(brpcreq);
+ ks_assert(req);
+
+ req_params = cJSON_GetObjectItem(req, "params");
+ if (!req_params) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'params' object\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_event = cJSON_GetObjectCstr(req_params, "event");
+ if (!req_params_event) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+ if (!req_params_protocol) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+ if (!req_params_realm) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'realm'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+
+ req_params_params = cJSON_GetObjectCstr(req_params, "params");
+
+
+ blade_protocol_broadcast_raw(bh, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL);
+
+
+ bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", req_params_protocol, req_params_realm, req_params_event);
+
+ ks_hash_read_lock(bh->subscriptions);
+
+ bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+ if (bsub) {
+ ks_rwl_read_lock(bh->local_nodeid_rwl);
+ if (ks_hash_search(blade_subscription_subscribers_get(bsub), bh->local_nodeid, KS_UNLOCKED)) {
+ callback = blade_subscription_callback_get(bsub);
+ if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
+ }
+ ks_rwl_read_unlock(bh->local_nodeid_rwl);
+ }
+
+ ks_hash_read_unlock(bh->subscriptions);
+
+ ks_pool_free(bh->pool, &bsub_key);
+
+ // build the actual response finally
+ blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+
+ cJSON_AddStringToObject(res_result, "event", req_params_event);
+ cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+ cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+
+ // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+ blade_session_send(bs, res, NULL, NULL);
+
+
+done:
+
+ if (res) cJSON_Delete(res);
+ if (bs) blade_session_read_unlock(bs);
+
+ return ret;
+}
+
+KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq)
+{
+ cJSON *req = NULL;
+ cJSON *req_params = NULL;
+ cJSON *req_params_params = NULL;
+
+ ks_assert(brpcreq);
+
+ req = blade_rpc_request_message_get(brpcreq);
+ ks_assert(req);
+
+ req_params = cJSON_GetObjectItem(req, "params");
+ if (req_params) req_params_params = cJSON_GetObjectItem(req_params, "params");
+
+ return req_params_params;
+}
+
/* For Emacs:
* Local Variables:
* mode:c