]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: Added preliminary blade.broadcast, tested event being broadcasted with...
authorShane Bryldt <astaelan@gmail.com>
Sun, 11 Jun 2017 05:08:39 +0000 (23:08 -0600)
committerShane Bryldt <astaelan@gmail.com>
Sun, 11 Jun 2017 05:08:39 +0000 (23:08 -0600)
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade_stack.h
libs/libblade/test/bladec.c
libs/libblade/test/blades.c

index 973f3a416f4fa70258289e543fd5f84db625024d..278a16e5b46aa218c39b85a3ba5f74a5cab37ea3 100644 (file)
@@ -88,6 +88,7 @@ ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, v
 ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
 ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
 ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
 
 
 typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
@@ -359,6 +360,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
        blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL);
        blade_handle_corerpc_register(brpc);
 
+       blade_rpc_create(&brpc, bh, "blade.broadcast", NULL, NULL, blade_protocol_broadcast_request_handler, NULL);
+       blade_handle_corerpc_register(brpc);
+
        // register internal transport for secure websockets
        blade_transport_wss_create(&bt, bh);
        ks_assert(bt);
@@ -2141,6 +2145,232 @@ done:
        return KS_FALSE;
 }
 
+
+// blade.broadcast request generator
+KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(bh);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+
+       // this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to
+       ret = blade_protocol_broadcast_raw(bh, NULL, event, protocol, realm, params, callback, data);
+
+       // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
+       // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
+       // is where this normally occurs
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+{
+       const char *bsub_key = NULL;
+       blade_subscription_t *bsub = NULL;
+       blade_session_t *bs = NULL;
+
+       ks_assert(bh);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+
+       bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
+
+       ks_hash_read_lock(bh->subscriptions);
+
+       bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+       if (bsub) {
+               ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
+
+               ks_assert(subscribers);
+
+               for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       void *key = NULL;
+                       void *value = NULL;
+                       cJSON *req = NULL;
+                       cJSON *req_params = NULL;
+
+                       ks_hash_this(it, &key, NULL, &value);
+
+                       if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
+
+                       if (blade_handle_local_nodeid_compare(bh, (const char *)key)) continue;
+
+                       bs = blade_handle_sessions_lookup(bh, (const char *)key);
+                       if (bs) {
+                               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
+
+                               blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast");
+
+                               cJSON_AddStringToObject(req_params, "event", event);
+                               cJSON_AddStringToObject(req_params, "protocol", protocol);
+                               cJSON_AddStringToObject(req_params, "realm", realm);
+
+                               if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+
+                               blade_session_send(bs, req, callback, data);
+
+                               cJSON_Delete(req);
+
+                               blade_session_read_unlock(bs);
+                       }
+               }
+       }
+
+       ks_hash_read_unlock(bh->subscriptions);
+
+       ks_pool_free(bh->pool, &bsub_key);
+
+       bs = blade_handle_sessions_upstream(bh);
+       if (bs) {
+               if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
+                       cJSON *req = NULL;
+                       cJSON *req_params = NULL;
+
+                       ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
+
+                       blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast");
+
+                       cJSON_AddStringToObject(req_params, "event", event);
+                       cJSON_AddStringToObject(req_params, "protocol", protocol);
+                       cJSON_AddStringToObject(req_params, "realm", realm);
+
+                       if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
+
+                       blade_session_send(bs, req, callback, data);
+
+                       cJSON_Delete(req);
+               }
+
+               blade_session_read_unlock(bs);
+       }
+       return KS_STATUS_SUCCESS;
+}
+
+// blade.broadcast request handler
+ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
+{
+       ks_bool_t ret = KS_FALSE;
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       const char *req_params_event = NULL;
+       const char *req_params_protocol = NULL;
+       const char *req_params_realm = NULL;
+       cJSON *req_params_params = NULL;
+       const char *bsub_key = NULL;
+       blade_subscription_t *bsub = NULL;
+       blade_rpc_request_callback_t callback = NULL;
+       cJSON *res = NULL;
+       cJSON *res_result = NULL;
+
+       ks_assert(brpcreq);
+
+       bh = blade_rpc_request_handle_get(brpcreq);
+       ks_assert(bh);
+
+       bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
+       ks_assert(bs);
+
+       req = blade_rpc_request_message_get(brpcreq);
+       ks_assert(req);
+
+       req_params = cJSON_GetObjectItem(req, "params");
+       if (!req_params) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'params' object\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_event = cJSON_GetObjectCstr(req_params, "event");
+       if (!req_params_event) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+       if (!req_params_protocol) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+       if (!req_params_realm) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'realm'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_params = cJSON_GetObjectCstr(req_params, "params");
+
+
+       blade_protocol_broadcast_raw(bh, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL);
+
+
+       bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", req_params_protocol, req_params_realm, req_params_event);
+
+       ks_hash_read_lock(bh->subscriptions);
+
+       bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED);
+       if (bsub) {
+               ks_rwl_read_lock(bh->local_nodeid_rwl);
+               if (ks_hash_search(blade_subscription_subscribers_get(bsub), bh->local_nodeid, KS_UNLOCKED)) {
+                       callback = blade_subscription_callback_get(bsub);
+                       if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
+               }
+               ks_rwl_read_unlock(bh->local_nodeid_rwl);
+       }
+
+       ks_hash_read_unlock(bh->subscriptions);
+
+       ks_pool_free(bh->pool, &bsub_key);
+
+       // build the actual response finally
+       blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+
+       cJSON_AddStringToObject(res_result, "event", req_params_event);
+       cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+       cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+
+       // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+       blade_session_send(bs, res, NULL, NULL);
+
+
+done:
+
+       if (res) cJSON_Delete(res);
+       if (bs) blade_session_read_unlock(bs);
+
+       return ret;
+}
+
+KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq)
+{
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       cJSON *req_params_params = NULL;
+
+       ks_assert(brpcreq);
+
+       req = blade_rpc_request_message_get(brpcreq);
+       ks_assert(req);
+
+       req_params = cJSON_GetObjectItem(req, "params");
+       if (req_params) req_params_params = cJSON_GetObjectItem(req_params, "params");
+
+       return req_params_params;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 4f9e69467bc865f0317c25cdfcd52d7b27d27120..9ab945612ea586a5919a542222e34d2fc62a1ff7 100644 (file)
@@ -112,6 +112,10 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
 KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
 KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
 
+KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq);
+
 KS_END_EXTERN_C
 
 #endif
index 1d38aa3d5336e2bb1f0e7b5abaf55ab9069809e1..2858dc0946d6183c7c52a7cd64db319e45a5e741 100644 (file)
@@ -202,6 +202,8 @@ int main(int argc, char **argv)
                blade_identity_destroy(&target);
 
                ks_sleep_ms(5000);
+
+
        }
        
        loop(bh);
index 4209528640f6660487e475f6b83747470f19db13..15a323f23ca1fac2390f2610b446698cb37a3a33 100644 (file)
@@ -16,9 +16,11 @@ struct command_def_s {
 };
 
 void command_quit(blade_handle_t *bh, char *args);
+void command_broadcast(blade_handle_t *bh, char *args);
 
 static const struct command_def_s command_defs[] = {
        { "quit", command_quit },
+       { "broadcast", command_broadcast },
 
        { NULL, NULL }
 };
@@ -81,6 +83,25 @@ ks_bool_t test_echo_request_handler(blade_rpc_request_t *brpcreq, void *data)
        return KS_FALSE;
 }
 
+ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+
+       ks_assert(brpcres);
+
+       bh = blade_rpc_response_handle_get(brpcres);
+       ks_assert(bh);
+
+       bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres));
+       ks_assert(bs);
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) test.event response processing\n", blade_session_id_get(bs));
+
+       blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
 
 int main(int argc, char **argv)
 {
@@ -216,6 +237,14 @@ void command_quit(blade_handle_t *bh, char *args)
        g_shutdown = KS_TRUE;
 }
 
+void command_broadcast(blade_handle_t *bh, char *args)
+{
+       ks_assert(bh);
+       ks_assert(args);
+
+       blade_protocol_broadcast(bh, "test.event", "test", "mydomain.com", NULL, test_event_response_handler, NULL);
+}
+
 
 /* For Emacs:
 * Local Variables: