ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
if (blade_protocol_purge(bp, nodeid)) {
- if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
- ks_hash_insert(cleanup, (void *)key, bp);
+ if (!blade_protocol_controller_available(bp)) {
+ if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+ ks_hash_insert(cleanup, (void *)key, bp);
+ } else {
+ // @todo not the last controller, may need to propagate that the controller is no longer available?
+ }
}
}
if (cleanup) {
ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
+ blade_subscriptionmgr_broadcast(blade_handle_subscriptionmgr_get(bmmgr->handle), BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE, NULL, blade_protocol_name_get(bp), blade_protocol_realm_get(bp), NULL, NULL, NULL, NULL, NULL);
+
ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
ks_hash_remove(bmmgr->protocols, (void *)key);
}
ks_hash_insert(bmmgr->protocols, (void *)ks_pstrdup(pool, key), bp);
}
- blade_protocol_controllers_add(bp, controller);
+ blade_protocol_controller_add(bp, controller);
+
+ ks_hash_write_unlock(bmmgr->protocols);
ks_pool_free(&key);
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller)
+{
+ ks_pool_t *pool = NULL;
+ blade_protocol_t *bp = NULL;
+ char *key = NULL;
+
+ ks_assert(bmmgr);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(controller);
+
+ pool = ks_pool_get(bmmgr);
+
+ key = ks_psprintf(pool, "%s@%s", protocol, realm);
+
+ ks_hash_write_lock(bmmgr->protocols);
+
+ bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_UNLOCKED);
+ if (bp) {
+ if (blade_protocol_controller_remove(bp, controller)) {
+ if (!blade_protocol_controller_available(bp)) {
+ // @todo broadcast protocol removal to remove all channel subscriptions
+ ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
+ ks_hash_remove(bmmgr->protocols, (void *)key);
+ } else {
+ // @todo not the last controller, may need to propagate when a specific controller becomes unavailable though
+ }
+ }
+ }
+
ks_hash_write_unlock(bmmgr->protocols);
+ ks_pool_free(&key);
+
return KS_STATUS_SUCCESS;
}
goto done;
}
- blade_protocol_channel_remove(bp, channel);
+ if (blade_protocol_channel_remove(bp, channel)) {
+ blade_subscriptionmgr_broadcast(blade_handle_subscriptionmgr_get(bmmgr->handle), BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE, NULL, blade_protocol_name_get(bp), blade_protocol_realm_get(bp), channel, NULL, NULL, NULL, NULL);
+ }
done:
ks_pool_free(&key);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
+KS_DECLARE(const char *) blade_protocol_name_get(blade_protocol_t *bp)
{
- ks_bool_t ret = KS_FALSE;
+ ks_assert(bp);
+ return bp->name;
+}
+
+KS_DECLARE(const char *) blade_protocol_realm_get(blade_protocol_t *bp)
+{
+ ks_assert(bp);
+ return bp->realm;
+}
+KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
+{
ks_assert(bp);
ks_assert(nodeid);
}
ks_hash_write_unlock(bp->channels);
- ks_hash_write_lock(bp->controllers);
- if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
- ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
- }
- ret = ks_hash_count(bp->controllers) == 0;
- ks_hash_write_unlock(bp->controllers);
-
- return ret;
+ return blade_protocol_controller_remove(bp, nodeid);
}
-KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
+KS_DECLARE(cJSON *) blade_protocol_controller_pack(blade_protocol_t *bp)
{
cJSON *controllers = cJSON_CreateObject();
ks_assert(bp);
+ ks_hash_read_lock(bp->controllers);
for (ks_hash_iterator_t *it = ks_hash_first(bp->controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
void *value = NULL;
cJSON_AddItemToArray(controllers, cJSON_CreateString(key));
}
+ ks_hash_read_unlock(bp->controllers);
return controllers;
}
-KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid)
+KS_DECLARE(ks_status_t) blade_protocol_controller_add(blade_protocol_t *bp, const char *nodeid)
{
char *key = NULL;
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(ks_bool_t) blade_protocol_controller_remove(blade_protocol_t *bp, const char *nodeid)
+{
+ ks_bool_t ret = KS_FALSE;
+
+ ks_assert(bp);
+ ks_assert(nodeid);
+
+ ks_hash_write_lock(bp->controllers);
+ if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
+ ret = KS_TRUE;
+ ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
+ }
+ ks_hash_write_unlock(bp->controllers);
+
+ return ret;
+}
+
+KS_DECLARE(ks_bool_t) blade_protocol_controller_available(blade_protocol_t *bp)
+{
+ ks_assert(bp);
+ return ks_hash_count(bp->controllers) > 0;
+}
+
KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name)
{
ks_status_t ret = KS_STATUS_SUCCESS;
return ret;
}
-KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
+KS_DECLARE(ks_bool_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
{
+ ks_bool_t ret = KS_FALSE;
+ ks_hash_t *authorized = NULL;
+
ks_assert(bp);
ks_assert(name);
- ks_hash_remove(bp->channels, (void *)name);
-
- ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
+ ks_hash_write_lock(bp->channels);
+ if ((authorized = ks_hash_remove(bp->channels, (void *)name))) {
+ ret = KS_TRUE;
+ ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
+ ks_hash_destroy(&authorized);
+ }
+ ks_hash_write_unlock(bp->channels);
- return KS_STATUS_SUCCESS;
+ return ret;
}
KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target)
if (ks_hash_remove(authorizations, (void *)target)) {
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", target, bp->name, bp->realm, channel);
} else ret = KS_STATUS_NOT_FOUND;
- }
- else {
+ } else {
ks_hash_insert(authorizations, (void *)ks_pstrdup(ks_pool_get(bp), target), (void *)KS_TRUE);
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Added: %s to %s@%s/%s\n", target, bp->name, bp->realm, channel);
}
const char *relayed_messageid;
};
-ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
+ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, blade_rpcsubscribe_command_t command, const char *protocol, const char *realm, cJSON *channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
// blade.register request generator
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
// blade.publish request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpublish_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
pool = ks_pool_get(bh);
ks_assert(pool);
+ // @todo validate command and parameters
+ switch (command) {
+ case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
+ break;
+ case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
+ break;
+ case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
+ if (!channels || cJSON_GetArraySize(channels) <= 0) {
+ ret = KS_STATUS_ARG_NULL;
+ goto done;
+ }
+ break;
+ case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE:
+ break;
+ default:
+ ret = KS_STATUS_ARG_INVALID;
+ goto done;
+ }
+
+ // create the response
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.publish");
- // fill in the req_params
+ cJSON_AddNumberToObject(req_params, "command", command);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
cJSON_AddStringToObject(req_params, "responder-nodeid", id);
ks_pool_free(&id);
- // @todo may want to switch this system to use a blade_rpcpublish_args_t with validation on the contents on this list internally
- // and to produce the entire json block internally in case the channel args change to include additional information like an encryption key,
- // however if passing encryption keys then they should be asymetrically encrypted using a public key provided by the master so that
- // the channel keys can be transmitted without intermediate nodes being able to snoop them
if (channels) cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
// @todo add a parameter containing a block of json for schema definitions for each of the methods being published
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_channels = NULL;
+ cJSON *req_params_command = NULL;
+ blade_rpcpublish_command_t command = BLADE_RPCPUBLISH_COMMAND_NONE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_requester_nodeid = NULL;
goto done;
}
+ req_params_command = cJSON_GetObjectItem(req_params, "command");
+ if (!req_params_command) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'command'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ command = (blade_rpcpublish_command_t)req_params_command->valueint;
+ switch (command) {
+ case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
+ case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
+ case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
+ case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE: break;
+ default: goto done;
+ }
+
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'protocol'\n", blade_session_id_get(bs));
goto done;
}
+ // @todo get enumeration parameter to represent a publish command, including add_protocol, remove_protocol, and update_channels
+ // @todo switch channels to separate add_channels and remove_channels
+
req_params_channels = cJSON_GetObjectItem(req_params, "channels");
if (req_params_channels) {
- int size = 0;
+ cJSON *element = NULL;
if (req_params_channels->type != cJSON_Array) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
goto done;
}
- size = cJSON_GetArraySize(req_params_channels);
- for (int index = 0; index < size; ++index) {
- cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
+ cJSON_ArrayForEach(element, req_params_channels) {
if (element->type != cJSON_String) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
- blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
-
- if (req_params_channels) {
- int size = cJSON_GetArraySize(req_params_channels);
- for (int index = 0; index < size; ++index) {
- cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
- blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+ // @todo switch on publish command, make the following code for add_protocol
+ switch (command) {
+ case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD:
+ blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
+ if (req_params_channels) {
+ cJSON *element = NULL;
+ cJSON_ArrayForEach(element, req_params_channels) {
+ blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+ }
}
+ break;
+ case BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE:
+ blade_mastermgr_controller_remove(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
+ break;
+ case BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD:
+ if (req_params_channels) {
+ cJSON *element = NULL;
+ cJSON_ArrayForEach(element, req_params_channels) {
+ blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+ }
+ }
+ break;
+ case BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE:
+ if (req_params_channels) {
+ cJSON *element = NULL;
+ cJSON_ArrayForEach(element, req_params_channels) {
+ blade_mastermgr_channel_remove(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
+ }
+ }
+ break;
+ default:
+ goto done;
}
+
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
+ // @todo include a list of channels that failed to be added or removed if applicable?
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
blade_session_send(bs, res, NULL, NULL);
if (res_result_unauthorized_channels) {
- blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, NULL, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
+ blade_handle_rpcsubscribe_raw(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, req_params_protocol, req_params_realm, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
}
done:
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
bp = blade_mastermgr_protocol_lookup(bh->mastermgr, req_params_protocol, req_params_realm);
- if (bp) res_result_controllers = blade_protocol_controllers_pack(bp);
+ if (bp) res_result_controllers = blade_protocol_controller_pack(bp);
// build the actual response finally
}
// blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data)
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh,
+ blade_rpcsubscribe_command_t command,
+ const char *protocol,
+ const char *realm,
+ cJSON *channels,
+ blade_rpc_response_callback_t callback,
+ void *data,
+ blade_rpc_request_callback_t channel_callback,
+ void *channel_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_pool_t *pool = NULL;
ks_assert(bh);
ks_assert(protocol);
ks_assert(realm);
- ks_assert(subscribe_channels || unsubscribe_channels);
+ ks_assert(channels);
pool = ks_pool_get(bh);
ks_assert(pool);
temp_data->channel_data = channel_data;
ks_pool_set_cleanup(temp_data, NULL, blade_rpcsubscribe_data_cleanup);
- ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
+ ret = blade_handle_rpcsubscribe_raw(bh, command, protocol, realm, channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
ks_pool_free(&localid);
return ret;
}
-ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data)
+ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh,
+ blade_rpcsubscribe_command_t command,
+ const char *protocol,
+ const char *realm,
+ cJSON *channels,
+ const char *subscriber,
+ ks_bool_t downstream,
+ blade_rpc_response_callback_t callback,
+ blade_rpcsubscribe_data_t *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_assert(bh);
ks_assert(protocol);
ks_assert(realm);
- ks_assert(subscribe_channels || unsubscribe_channels);
+ ks_assert(channels);
ks_assert(subscriber);
if (downstream) {
- // @note if a master is sending a downstream update, it may only use unsubscribe_channels, cannot force a subscription without a subscriber callback
- if (subscribe_channels) {
+ // @note if a master is sending a downstream update, it may only remove subscriptions, it cannot force a subscription without the subscriber providing the callback
+ if (command != BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
ret = KS_STATUS_NOT_ALLOWED;
goto done;
}
pool = ks_pool_get(bh);
ks_assert(pool);
- if (unsubscribe_channels) {
+ if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
cJSON *channel = NULL;
- cJSON_ArrayForEach(channel, unsubscribe_channels) {
+ cJSON_ArrayForEach(channel, channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, protocol, realm, channel->valuestring, subscriber);
}
}
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
+ cJSON_AddNumberToObject(req_params, "command", command);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
cJSON_AddStringToObject(req_params, "subscriber-nodeid", subscriber);
if (downstream) cJSON_AddTrueToObject(req_params, "downstream");
-
- if (subscribe_channels) cJSON_AddItemToObject(req_params, "subscribe-channels", cJSON_Duplicate(subscribe_channels, 1));
- if (unsubscribe_channels) cJSON_AddItemToObject(req_params, "unsubscribe-channels", cJSON_Duplicate(unsubscribe_channels, 1));
+ cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
+ cJSON *req_params_command = NULL;
+ blade_rpcsubscribe_command_t command = BLADE_RPCSUBSCRIBE_COMMAND_NONE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_subscriber_nodeid = NULL;
cJSON *req_params_downstream = NULL;
ks_bool_t downstream = KS_FALSE;
- cJSON *req_params_subscribe_channels = NULL;
- cJSON *req_params_unsubscribe_channels = NULL;
+ cJSON *req_params_channels = NULL;
ks_bool_t masterlocal = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
goto done;
}
+ req_params_command = cJSON_GetObjectItem(req_params, "command");
+ if (!req_params_command) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'command'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ command = (blade_rpcsubscribe_command_t)req_params_command->valueint;
+ switch (command) {
+ case BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD:
+ case BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE: break;
+ default: goto done;
+ }
+
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
req_params_downstream = cJSON_GetObjectItem(req_params, "downstream");
downstream = req_params_downstream && req_params_downstream->type == cJSON_True;
- req_params_subscribe_channels = cJSON_GetObjectItem(req_params, "subscribe-channels");
- req_params_unsubscribe_channels = cJSON_GetObjectItem(req_params, "unsubscribe-channels");
+ req_params_channels = cJSON_GetObjectItem(req_params, "channels");
- if (!req_params_subscribe_channels && !req_params_unsubscribe_channels) {
- ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscribe-channels' or 'unsubscribe-channels'\n", blade_session_id_get(bs));
- blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscribe-channels or unsubscribe-channels");
+ if (!req_params_channels) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'channels'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
// @note This is normally handled by blade_handle_rpcsubscribe_raw() to ensure authorization removals are processed during the request path
// including on the node they start on, whether that is the master or the subscriber
- if (req_params_unsubscribe_channels) {
+ if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE) {
cJSON *channel = NULL;
- cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
+ cJSON_ArrayForEach(channel, req_params_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
}
}
cJSON_AddStringToObject(res_result, "subscriber-nodeid", req_params_subscriber_nodeid);
if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
- if (req_params_subscribe_channels) {
- // @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request containing subscribe-channels
+ if (command == BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD) {
+ // @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request to add a subscriber
cJSON *channel = NULL;
- cJSON_ArrayForEach(channel, req_params_subscribe_channels) {
+ cJSON_ArrayForEach(channel, req_params_channels) {
if (blade_mastermgr_channel_verify(bh->mastermgr, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid)) {
blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
if (!res_result_subscribe_channels) res_result_subscribe_channels = cJSON_CreateArray();
temp_data->relayed_messageid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
ks_pool_set_cleanup(temp_data, NULL, blade_rpcsubscribe_data_cleanup);
- blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
+ blade_handle_rpcsubscribe_raw(bh, command, req_params_protocol, req_params_realm, req_params_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
}
done:
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bh);
- ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
- ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, NULL, protocol, realm, channel, event, params, callback, data);
+ ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL, protocol, realm, channel, event, params, callback, data);
// @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
// if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
return ret;
}
+// @todo blade_handle_rpcbroadcast_raw() to encapsulate adding subcommands to broadcast to support protocol removal, protocol channel removal, and normal event broadcast
+
// blade.broadcast request handler
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
+ cJSON *req_params_command = NULL;
+ blade_rpcbroadcast_command_t command = BLADE_RPCBROADCAST_COMMAND_NONE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_channel = NULL;
goto done;
}
- req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
- if (!req_params_channel) {
- ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
- blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
+ req_params_command = cJSON_GetObjectItem(req_params, "command");
+ if (!req_params_command) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'command'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params command");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
-
- req_params_event = cJSON_GetObjectCstr(req_params, "event");
- if (!req_params_event) {
- ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
- blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
- blade_session_send(bs, res, NULL, NULL);
- goto done;
+ command = (blade_rpcbroadcast_command_t)req_params_command->valueint;
+ switch (command) {
+ case BLADE_RPCBROADCAST_COMMAND_EVENT:
+ req_params_event = cJSON_GetObjectCstr(req_params, "event");
+ if (!req_params_event) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
+ req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
+ if (!req_params_channel) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
+ blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
+ blade_session_send(bs, res, NULL, NULL);
+ goto done;
+ }
+ break;
+ case BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE: break;
+ default: goto done;
}
req_params_params = cJSON_GetObjectItem(req_params, "params");
- blade_subscriptionmgr_broadcast(bh->subscriptionmgr, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
+ blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
- bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
- if (bsub) {
- const char *localid = NULL;
- ks_pool_t *pool = NULL;
+ if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
+ bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
+ if (bsub) {
+ const char *localid = NULL;
+ ks_pool_t *pool = NULL;
- pool = ks_pool_get(bh);
+ pool = ks_pool_get(bh);
- blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
- ks_assert(localid);
+ blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
+ ks_assert(localid);
- if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
- callback = blade_subscription_callback_get(bsub);
- if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
+ if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
+ callback = blade_subscription_callback_get(bsub);
+ if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
+ }
+ ks_pool_free(&localid);
}
- ks_pool_free(&localid);
}
// build the actual response finally
// @todo this is not neccessary, can obtain this from the original request
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
- cJSON_AddStringToObject(res_result, "channel", req_params_channel);
- cJSON_AddStringToObject(res_result, "event", req_params_event);
+ if (req_params_channel) cJSON_AddStringToObject(res_result, "channel", req_params_channel);
+ if (req_params_event) cJSON_AddStringToObject(res_result, "event", req_params_event);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
return bsub;
}
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel)
+{
+ ks_pool_t *pool = NULL;
+ char *bsub_key = NULL;
+ blade_subscription_t *bsub = NULL;
+ ks_hash_t *subscribers = NULL;
+ ks_hash_t *subscriptions = NULL;
+
+ ks_assert(bsmgr);
+ ks_assert(protocol);
+ ks_assert(realm);
+ ks_assert(channel);
+
+ pool = ks_pool_get(bsmgr);
+
+ bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
+
+ ks_hash_write_lock(bsmgr->subscriptions);
+
+ bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+
+ subscribers = blade_subscription_subscribers_get(bsub);
+ ks_assert(subscribers);
+
+ for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, &value);
+
+ subscriptions = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, key, KS_UNLOCKED);
+
+ ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", key, bsub_key);
+ ks_hash_remove(subscriptions, bsub_key);
+
+ if (ks_hash_count(subscriptions) == 0) {
+ ks_hash_remove(bsmgr->subscriptions_cleanup, key);
+ }
+ }
+
+ ks_log(KS_LOG_DEBUG, "Subscription Removed: %s\n", bsub_key);
+ ks_hash_remove(bsmgr->subscriptions, (void *)bsub_key);
+
+ ks_hash_write_unlock(bsmgr->subscriptions);
+
+ ks_pool_free(&bsub_key);
+
+ return KS_STATUS_SUCCESS;
+}
+
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
{
ks_pool_t *pool = NULL;
}
}
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_pool_t *pool = NULL;
const char *bsub_key = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
+ ks_hash_t *routers = NULL;
+ ks_hash_t *channels = NULL;
ks_assert(bsmgr);
ks_assert(protocol);
ks_assert(realm);
- ks_assert(channel);
pool = ks_pool_get(bsmgr);
- bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
+ switch (command) {
+ case BLADE_RPCBROADCAST_COMMAND_EVENT:
+ ks_assert(event);
+ case BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE:
+ ks_assert(channel);
+ bsub_key = ks_psprintf(pool, "%s@%s/%s", protocol, realm, channel);
- blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.broadcast");
- cJSON_AddStringToObject(req_params, "protocol", protocol);
- cJSON_AddStringToObject(req_params, "realm", realm);
- cJSON_AddStringToObject(req_params, "channel", channel);
- cJSON_AddStringToObject(req_params, "event", event);
- if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+ ks_hash_read_lock(bsmgr->subscriptions);
- ks_hash_read_lock(bsmgr->subscriptions);
+ bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+ if (bsub) {
+ ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
- bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)bsub_key, KS_UNLOCKED);
- if (bsub) {
- ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
+ ks_assert(subscribers);
+
+ for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, &value);
- ks_assert(subscribers);
+ if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
- for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
+ if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
+
+ bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
+ if (bs) {
+ if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+ if (!ks_hash_search(routers, blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, blade_session_id_get(bs), bs);
+ else blade_session_read_unlock(bs);
+ }
+ }
+ if (command == BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE) {
+ if (!channels) ks_hash_create(&channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+ ks_hash_insert(channels, channel, (void *)KS_TRUE);
+ }
+ }
+
+ ks_hash_read_unlock(bsmgr->subscriptions);
+ break;
+ case BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE:
+ bsub_key = ks_psprintf(pool, "%s@%s", protocol, realm);
+
+ ks_hash_read_lock(bsmgr->subscriptions);
+
+ for (ks_hash_iterator_t *it = ks_hash_first(bsmgr->subscriptions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
- if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
+ bsub = (blade_subscription_t *)value;
- // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
- if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
+ if (ks_stristr(bsub_key, (const char *)key) == (const char *)key) {
+ ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
- bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
- if (bs) {
- ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
+ ks_assert(subscribers);
- blade_session_send(bs, req, callback, data);
+ for (ks_hash_iterator_t *it2 = ks_hash_first(subscribers, KS_UNLOCKED); it2; it2 = ks_hash_next(&it2)) {
+ void *key2 = NULL;
+ void *value2 = NULL;
- blade_session_read_unlock(bs);
+ ks_hash_this(it2, (const void **)&key2, NULL, &value2);
+
+ if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
+
+ // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
+ if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key2)) continue;
+
+ bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2);
+ if (bs) {
+ if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+ if (!ks_hash_search(routers, blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, blade_session_id_get(bs), bs);
+ else blade_session_read_unlock(bs);
+ }
+ }
+
+ if (!channels) ks_hash_create(&channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+ ks_hash_insert(channels, blade_subscription_channel_get(bsub), (void *)KS_TRUE);
}
}
+
+ ks_hash_read_unlock(bsmgr->subscriptions);
+ break;
+ default: return KS_STATUS_ARG_INVALID;
}
- ks_hash_read_unlock(bsmgr->subscriptions);
bs = blade_upstreammgr_session_get(blade_handle_upstreammgr_get(bsmgr->handle));
if (bs) {
if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
+ if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
+ ks_hash_insert(routers, blade_session_id_get(bs), bs);
+ }
+ else blade_session_read_unlock(bs);
+ }
+
+ if (channels) {
+ for (ks_hash_iterator_t *it = ks_hash_first(channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, &value);
+
+ blade_subscriptionmgr_subscription_remove(bsmgr, protocol, realm, (const char *)key);
+ }
+ ks_hash_destroy(&channels);
+ }
+
+ if (routers) {
+ for (ks_hash_iterator_t *it = ks_hash_first(routers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, (const void **)&key, NULL, &value);
+
+ bs = (blade_session_t *)value;
+
+ blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.broadcast");
+ cJSON_AddNumberToObject(req_params, "command", command);
+ cJSON_AddStringToObject(req_params, "protocol", protocol);
+ cJSON_AddStringToObject(req_params, "realm", realm);
+ if (channel) cJSON_AddStringToObject(req_params, "channel", channel);
+ if (event) cJSON_AddStringToObject(req_params, "event", event);
+ if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+
ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data);
- }
- blade_session_read_unlock(bs);
- }
+ cJSON_Delete(req);
- cJSON_Delete(req);
+ blade_session_read_unlock(bs);
+ }
+ ks_hash_destroy(&routers);
+ }
ks_pool_free(&bsub_key);
KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid);
KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
+KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target);
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
+KS_DECLARE(const char *) blade_protocol_name_get(blade_protocol_t *bp);
+KS_DECLARE(const char *) blade_protocol_realm_get(blade_protocol_t *bp);
KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid);
-KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid);
-KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp);
+KS_DECLARE(cJSON *) blade_protocol_controller_pack(blade_protocol_t *bp);
+KS_DECLARE(ks_status_t) blade_protocol_controller_add(blade_protocol_t *bp, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_protocol_controller_remove(blade_protocol_t *bp, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_protocol_controller_available(blade_protocol_t *bp);
KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name);
-KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
+KS_DECLARE(ks_bool_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target);
KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target);
KS_END_EXTERN_C
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
-KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpublish_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
-KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
+KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, blade_rpcsubscribe_command_t command, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_destroy(blade_subscriptionmgr_t **bsmgrP);
KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscriptionmgr_t *bsmgr);
KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C
#endif
typedef void (*blade_session_callback_t)(blade_session_t *bs, blade_session_state_condition_t condition, void *data);
+typedef enum {
+ BLADE_RPCPUBLISH_COMMAND_NONE,
+ BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD,
+ BLADE_RPCPUBLISH_COMMAND_CONTROLLER_REMOVE,
+ BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD,
+ BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE,
+} blade_rpcpublish_command_t;
+
+typedef enum {
+ BLADE_RPCSUBSCRIBE_COMMAND_NONE,
+ BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD,
+ BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE,
+} blade_rpcsubscribe_command_t;
+
+typedef enum {
+ BLADE_RPCBROADCAST_COMMAND_NONE,
+ BLADE_RPCBROADCAST_COMMAND_EVENT,
+ BLADE_RPCBROADCAST_COMMAND_PROTOCOL_REMOVE,
+ BLADE_RPCBROADCAST_COMMAND_CHANNEL_REMOVE,
+} blade_rpcbroadcast_command_t;
+
+
KS_END_EXTERN_C
#endif
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
- blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
+ blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD, "test", "mydomain.com", channels, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
cJSON_Delete(channels);
}
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
// @todo build up json-based method schema for each protocolrpc registered above, and pass into blade_handle_rpcpublish() to attach to the request, to be stored in the blade_protocol_t tracked by the master node
- blade_handle_rpcpublish(bh, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
+ blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
}
void command_broadcast(blade_handle_t *bh, char *args)
ks_assert(res_result_realm);
res_result_controllers = cJSON_GetObjectItem(res_result, "controllers");
- ks_assert(res_result_controllers);
ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) response processing\n", blade_session_id_get(bs), res_result_protocol, res_result_realm);
- for (int index = 0; index < cJSON_GetArraySize(res_result_controllers); ++index) {
- cJSON *elem = cJSON_GetArrayItem(res_result_controllers, index);
- if (elem->type == cJSON_String) {
- nodeid = elem->valuestring;
+ if (res_result_controllers) {
+ for (int index = 0; index < cJSON_GetArraySize(res_result_controllers); ++index) {
+ cJSON *elem = cJSON_GetArrayItem(res_result_controllers, index);
+ if (elem->type == cJSON_String) {
+ nodeid = elem->valuestring;
+ }
}
}
blade_session_read_unlock(bs);
if (nodeid) {
+ if (g_testcon_nodeid) ks_pool_free(&g_testcon_nodeid);
g_testcon_nodeid = ks_pstrdup(ks_pool_get(bh), nodeid);
}
+
ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, g_testcon_nodeid);
return KS_FALSE;
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
- blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_channel_handler, NULL);
+ if (args && args[0]) cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+ blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_ADD, "test", "mydomain.com", channels, NULL, NULL, test_channel_handler, NULL);
cJSON_Delete(channels);
}
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
- blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
+ if (args && args[0]) cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+ blade_handle_rpcsubscribe(bh, BLADE_RPCSUBSCRIBE_COMMAND_SUBSCRIBER_REMOVE, "test", "mydomain.com", channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
cJSON_Delete(channels);
}
};
void command_quit(blade_handle_t *bh, char *args);
+void command_channeladd(blade_handle_t *bh, char *args);
+void command_channelremove(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "quit", command_quit },
+ { "channeladd", command_channeladd },
+ { "channelremove", command_channelremove },
{ NULL, NULL }
};
blade_handle_t *handle;
ks_pool_t *pool;
ks_hash_t *participants;
+ ks_hash_t *channels;
};
typedef struct testproto_s testproto_t;
+testproto_t *g_test = NULL;
+
static void testproto_cleanup(void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
//testproto_t *test = (testproto_t *)ptr;
test->pool = pool;
ks_hash_create(&test->participants, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
+ ks_hash_create(&test->channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, pool);
ks_pool_set_cleanup(test, NULL, testproto_cleanup);
// authorize channels with the master for the requester
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+ for (ks_hash_iterator_t *it = ks_hash_first(test->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, &key, NULL, &value);
+ cJSON_AddItemToArray(channels, cJSON_CreateString((const char *)key));
+ }
blade_handle_rpcauthorize(bh, requester_nodeid, KS_FALSE, "test", "mydomain.com", channels, NULL, NULL);
// deauthorize channels with the master for the requester
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
+ for (ks_hash_iterator_t *it = ks_hash_first(test->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ void *key = NULL;
+ void *value = NULL;
+
+ ks_hash_this(it, &key, NULL, &value);
+ cJSON_AddItemToArray(channels, cJSON_CreateString((const char *)key));
+ }
blade_handle_rpcauthorize(bh, requester_nodeid, KS_TRUE, "test", "mydomain.com", channels, NULL, NULL);
config_setting_t *config_blade = NULL;
const char *cfgpath = "testcon.cfg";
const char *autoconnect = NULL;
- testproto_t *test = NULL;
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
return EXIT_FAILURE;
}
- testproto_create(&test, bh);
+ testproto_create(&g_test, bh);
if (autoconnect) {
blade_connection_t *bc = NULL;
// @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
ks_sleep_ms(3000);
- blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)test);
+ blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)g_test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
- blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)test);
+ blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)g_test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
- blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)test);
+ blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)g_test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
- blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, (void *)test);
+ blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CONTROLLER_ADD, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
cJSON_Delete(channels);
}
blade_handle_destroy(&bh);
- testproto_destroy(&test);
+ testproto_destroy(&g_test);
config_destroy(&config);
g_shutdown = KS_TRUE;
}
+void command_channeladd(blade_handle_t *bh, char *args)
+{
+ cJSON *channels = NULL;
+
+ ks_assert(bh);
+ ks_assert(args);
+
+ if (!args[0]) {
+ ks_log(KS_LOG_INFO, "Requires channel argument");
+ return;
+ }
+
+ ks_hash_insert(g_test->channels, (void *)ks_pstrdup(g_test->pool, args), (void *)KS_TRUE);
+
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+
+ blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CHANNEL_ADD, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
+
+ cJSON_Delete(channels);
+}
+
+void command_channelremove(blade_handle_t *bh, char *args)
+{
+ cJSON *channels = NULL;
+
+ ks_assert(bh);
+ ks_assert(args);
+
+ if (!args[0]) {
+ ks_log(KS_LOG_INFO, "Requires channel argument");
+ return;
+ }
+
+
+ if (ks_hash_remove(g_test->channels, (void *)args)) {
+ channels = cJSON_CreateArray();
+ cJSON_AddItemToArray(channels, cJSON_CreateString(args));
+
+ blade_handle_rpcpublish(bh, BLADE_RPCPUBLISH_COMMAND_CHANNEL_REMOVE, "test", "mydomain.com", channels, test_publish_response_handler, (void *)g_test);
+
+ cJSON_Delete(channels);
+ }
+}
+
/* For Emacs:
* Local Variables:
* mode:c
KS_DECLARE(ks_bool_t) ks_pool_verify(void *addr);
// @todo fill in documentation
-KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr);
+inline KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr);
/*
* void *ks_pool_alloc
// @todo fill in documentation
KS_DECLARE(ks_bool_t) ks_pool_verify(void *addr)
{
- ks_pool_prefix_t *prefix = NULL;
if (!addr) return KS_FALSE;
- prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
- if (check_prefix(prefix) != KS_STATUS_SUCCESS) return KS_FALSE;
+ if (check_prefix((ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE)) != KS_STATUS_SUCCESS) return KS_FALSE;
return KS_TRUE;
}
// @todo fill in documentation
-KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr)
+inline KS_DECLARE(ks_pool_t *) ks_pool_get(void *addr)
{
- ks_pool_prefix_t *prefix = NULL;
+ ks_assert(addr);
+#ifdef DEBUG
+ ks_pool_prefix_t *prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
ks_status_t ret = KS_STATUS_SUCCESS;
- ks_pool_t *pool = NULL;
-
- if (!addr) goto done;
- prefix = (ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE);
- if (check_prefix(prefix) != KS_STATUS_SUCCESS) goto done;
-
- if ((ret = check_pool(prefix->pool)) == KS_STATUS_SUCCESS) pool = prefix->pool;
-
-done:
+ ret = check_prefix(prefix);
ks_assert(ret == KS_STATUS_SUCCESS);
- return pool;
+ ret = check_pool(prefix->pool);
+ ks_assert(ret == KS_STATUS_SUCCESS);
+#endif
+ return ((ks_pool_prefix_t *)((uintptr_t)addr - KS_POOL_PREFIX_SIZE))->pool;
}
/*