]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10739: [libblade] Loopback session support and reworking sessions to use independe...
authorShane Bryldt <astaelan@gmail.com>
Thu, 19 Oct 2017 08:32:55 +0000 (02:32 -0600)
committerShane Bryldt <astaelan@gmail.com>
Thu, 19 Oct 2017 08:32:55 +0000 (02:32 -0600)
libs/libblade/src/blade_routemgr.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_sessionmgr.c
libs/libblade/src/blade_stack.c
libs/libblade/src/blade_subscriptionmgr.c
libs/libblade/src/blade_transport_wss.c
libs/libblade/src/include/blade_routemgr.h
libs/libblade/src/include/blade_session.h
libs/libblade/src/include/blade_sessionmgr.h
libs/libblade/src/include/blade_subscriptionmgr.h
libs/libblade/src/include/blade_types.h

index 91bd99a209b9fa967dc911b4a01d5f9f632f2829..af3149d92112afff121a204a1e8aef53e3ff571f 100644 (file)
@@ -129,15 +129,11 @@ KS_DECLARE(ks_status_t) blade_routemgr_local_set(blade_routemgr_t *brmgr, const
 
        ks_rwl_write_lock(brmgr->local_lock);
 
-       if (brmgr->local_nodeid) {
-               ret = KS_STATUS_DUPLICATE_OPERATION;
-               goto done;
-       }
+       if (brmgr->local_nodeid) ks_pool_free(&brmgr->local_nodeid);
        if (nodeid) brmgr->local_nodeid = ks_pstrdup(ks_pool_get(brmgr), nodeid);
 
        ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid);
 
-done:
        ks_rwl_write_unlock(brmgr->local_lock);
 
        return ret;
@@ -220,21 +216,6 @@ KS_DECLARE(ks_bool_t) blade_routemgr_local_pack(blade_routemgr_t *brmgr, cJSON *
        return ret;
 }
 
-KS_DECLARE(blade_session_t *) blade_routemgr_upstream_lookup(blade_routemgr_t *brmgr)
-{
-       blade_session_t *bs = NULL;
-
-       ks_assert(brmgr);
-
-       ks_rwl_read_lock(brmgr->local_lock);
-
-       if (brmgr->local_nodeid) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), brmgr->local_nodeid);
-
-       ks_rwl_read_unlock(brmgr->local_lock);
-
-       return bs;
-}
-
 KS_DECLARE(ks_status_t) blade_routemgr_master_set(blade_routemgr_t *brmgr, const char *nodeid)
 {
        ks_assert(brmgr);
@@ -329,26 +310,35 @@ KS_DECLARE(blade_session_t *) blade_routemgr_route_lookup(blade_routemgr_t *brmg
        ks_assert(brmgr);
        ks_assert(target);
 
-       router = (const char *)ks_hash_search(brmgr->routes, (void *)target, KS_READLOCKED);
-       if (!router) {
-               // @todo this is all really inefficient, but we need the string to be parsed and recombined to ensure correctness for key matching
-               blade_identity_t *identity = NULL;
-               ks_pool_t *pool = ks_pool_get(brmgr);
+       // Short circuit any nodeid or identity that matches the local node by returning the loopback session explicitly
+       // @todo this could be potentially be avoided if the local nodeid and all local identities are added as routes to the loopback sessionid
+       if (blade_routemgr_local_check(brmgr, target)) bs = blade_sessionmgr_loopback_lookup(blade_handle_sessionmgr_get(brmgr->handle));
+       else {
+               // If the target is a downstream nodeid then it will be found in the route table immediately and return the sessionid of the downstream session to route through
+               router = (const char *)ks_hash_search(brmgr->routes, (void *)target, KS_READLOCKED);
+               if (!router) {
+                       // If the target is a downstream node identity then it will be found in the identity table and return the nodeid which can then be used to lookup the route
+                       blade_identity_t *identity = NULL;
+                       ks_pool_t *pool = ks_pool_get(brmgr);
 
-               blade_identity_create(&identity, pool);
-               if (blade_identity_parse(identity, target) == KS_STATUS_SUCCESS) {
-                       char *key = ks_psprintf(pool, "%s@%s/%s", blade_identity_user_get(identity), blade_identity_host_get(identity), blade_identity_path_get(identity));
+                       blade_identity_create(&identity, pool);
+                       if (blade_identity_parse(identity, target) == KS_STATUS_SUCCESS) {
+                               char *key = ks_psprintf(pool, "%s@%s/%s", blade_identity_user_get(identity), blade_identity_host_get(identity), blade_identity_path_get(identity));
 
-                       router = (const char *)ks_hash_search(brmgr->identities, (void *)key, KS_READLOCKED);
-                       ks_hash_read_unlock(brmgr->identities);
+                               router = (const char *)ks_hash_search(brmgr->identities, (void *)key, KS_READLOCKED);
+                               ks_hash_read_unlock(brmgr->identities);
 
-                       ks_pool_free(&key);
-               }
+                               ks_pool_free(&key);
 
-               blade_identity_destroy(&identity);
+                               if (router) router = (const char *)ks_hash_search(brmgr->routes, (void *)router, KS_UNLOCKED);
+                       }
+
+                       blade_identity_destroy(&identity);
+               }
+               // When a router is found it is the sessionid of the downstream session, lookup the session to route through
+               if (router) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), router);
+               ks_hash_read_unlock(brmgr->routes);
        }
-       if (router) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), router);
-       ks_hash_read_unlock(brmgr->routes);
 
        return bs;
 }
index 99626209e24557f8cc58d3bb2fff220c819e5795..d449ff725b81069e154fa90b2388a6d0bf9d610c 100644 (file)
 struct blade_session_s {
        blade_handle_t *handle;
 
-       volatile blade_session_state_t state;
+       blade_session_flags_t flags;
 
        const char *id;
        ks_rwl_t *lock;
 
+       volatile blade_session_state_t state;
+
        ks_cond_t *cond;
 
        const char *connection;
@@ -82,7 +84,7 @@ static void blade_session_cleanup(void *ptr, void *arg, ks_pool_cleanup_action_t
 }
 
 
-KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, const char *id)
+KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, blade_session_flags_t flags, const char *sessionid)
 {
        blade_session_t *bs = NULL;
        ks_pool_t *pool = NULL;
@@ -95,8 +97,9 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
 
        bs = ks_pool_alloc(pool, sizeof(blade_session_t));
        bs->handle = bh;
+       bs->flags = flags;
 
-       if (id) bs->id = ks_pstrdup(pool, id);
+       if (sessionid) bs->id = ks_pstrdup(pool, sessionid);
        else {
                uuid_t id;
                ks_uuid(&id);
@@ -106,6 +109,8 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
     ks_rwl_create(&bs->lock, pool);
        ks_assert(bs->lock);
 
+       bs->state = BLADE_SESSION_STATE_NONE;
+
        ks_cond_create(&bs->cond, pool);
        ks_assert(bs->cond);
 
@@ -158,8 +163,6 @@ KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs)
        tpool = blade_handle_tpool_get(bs->handle);
        ks_assert(tpool);
 
-       blade_session_state_set(bs, BLADE_SESSION_STATE_NONE);
-
        if (ks_thread_pool_add_job(tpool, blade_session_state_thread, bs) != KS_STATUS_SUCCESS) {
                // @todo error logging
                return KS_STATUS_FAIL;
@@ -207,6 +210,18 @@ KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs)
        return bs->handle;
 }
 
+KS_DECLARE(ks_bool_t) blade_session_loopback(blade_session_t *bs)
+{
+       ks_assert(bs);
+       return (bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK;
+}
+
+KS_DECLARE(ks_bool_t) blade_session_upstream(blade_session_t *bs)
+{
+       ks_assert(bs);
+       return (bs->flags & BLADE_SESSION_FLAGS_UPSTREAM) == BLADE_SESSION_FLAGS_UPSTREAM;
+}
+
 KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs)
 {
        ks_assert(bs);
@@ -397,8 +412,11 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j
     ks_assert(bs);
     ks_assert(json);
 
-    json_copy = cJSON_Duplicate(json, 1);
-       if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond);
+       if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) ret = blade_session_receiving_push(bs, json);
+       else {
+               json_copy = cJSON_Duplicate(json, 1);
+               if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond);
+       }
        return ret;
 }
 
@@ -485,7 +503,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
                default: break;
                }
 
-               if (!bs->connection &&
+               if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) != BLADE_SESSION_FLAGS_LOOPBACK &&
+                       !bs->connection &&
                        bs->ttl > 0 &&
                        !blade_session_terminating(bs) &&
                        ks_time_now() >= bs->ttl) {
@@ -537,7 +556,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
 
        ks_assert(bs);
 
-       while (bs->connection && blade_session_receiving_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
+       while (blade_session_receiving_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
                blade_session_process(bs, json);
                cJSON_Delete(json);
        }
@@ -645,7 +664,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                // not meant for local processing, continue with standard unicast routing for requests
                                blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid);
                                if (!bs_router) {
-                                       bs_router = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bh));
+                                       bs_router = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh));
                                        if (!bs_router) {
                                                cJSON *res = NULL;
                                                cJSON *res_error = NULL;
@@ -719,7 +738,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
                                // not meant for local processing, continue with standard unicast routing for responses
                                blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), object_requester_nodeid);
                                if (!bs_router) {
-                                       bs_router = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bh));
+                                       bs_router = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh));
                                        if (!bs_router) {
                                                ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) but upstream session unavailable\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid);
                                                return KS_STATUS_DISCONNECTED;
index 593317c37de355946a629591846b7abbce6ffbc3..c4b2eefadbb5518ea6d49abce95cdd936ba18e04 100644 (file)
@@ -36,6 +36,8 @@
 struct blade_sessionmgr_s {
        blade_handle_t *handle;
 
+       blade_session_t *loopback;
+       blade_session_t *upstream;
        ks_hash_t *sessions; // id, blade_session_t*
        ks_hash_t *callbacks; // id, blade_session_callback_data_t*
 };
@@ -130,12 +132,37 @@ KS_DECLARE(blade_handle_t *) blade_sessionmgr_handle_get(blade_sessionmgr_t *bsm
        return bsmgr->handle;
 }
 
+KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, config_setting_t *config)
+{
+       ks_assert(bsmgr);
+
+       blade_session_create(&bsmgr->loopback, bsmgr->handle, BLADE_SESSION_FLAGS_LOOPBACK, NULL);
+       ks_assert(bsmgr->loopback);
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bsmgr->loopback));
+
+       if (blade_session_startup(bsmgr->loopback) != KS_STATUS_SUCCESS) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bsmgr->loopback));
+               blade_session_destroy(&bsmgr->loopback);
+               return KS_STATUS_FAIL;
+       }
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback));
+
+       return KS_STATUS_SUCCESS;
+}
+
 KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr)
 {
        ks_hash_iterator_t *it = NULL;
 
        ks_assert(bsmgr);
 
+       if (bsmgr->loopback) {
+               blade_session_hangup(bsmgr->loopback);
+               ks_sleep_ms(100);
+       }
+
        ks_hash_read_lock(bsmgr->sessions);
        for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
                void *key = NULL;
@@ -151,6 +178,22 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr)
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(blade_session_t *) blade_sessionmgr_loopback_lookup(blade_sessionmgr_t *bsmgr)
+{
+       ks_assert(bsmgr);
+
+       blade_session_read_lock(bsmgr->loopback, KS_TRUE);
+       return bsmgr->loopback;
+}
+
+KS_DECLARE(blade_session_t *) blade_sessionmgr_upstream_lookup(blade_sessionmgr_t *bsmgr)
+{
+       ks_assert(bsmgr);
+
+       if (bsmgr->upstream) blade_session_read_lock(bsmgr->upstream, KS_TRUE);
+       return bsmgr->upstream;
+}
+
 KS_DECLARE(blade_session_t *) blade_sessionmgr_session_lookup(blade_sessionmgr_t *bsmgr, const char *id)
 {
        blade_session_t *bs = NULL;
@@ -177,6 +220,8 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_add(blade_sessionmgr_t *bsmgr,
 
        ks_log(KS_LOG_DEBUG, "Session Added: %s\n", key);
 
+       if (blade_session_upstream(bs)) bsmgr->upstream = bs;
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -196,7 +241,7 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmg
        ks_log(KS_LOG_DEBUG, "Session Removed: %s\n", id);
 
        routemgr = blade_handle_routemgr_get(bsmgr->handle);
-       if (blade_routemgr_local_check(routemgr, id)) {
+       if (blade_session_upstream(bs)) {
                blade_routemgr_local_set(routemgr, NULL);
                blade_routemgr_master_set(routemgr, NULL);
 
index bc7d96a32e054be2b9a1dd63f4115574153c58c5..70e1943deb7d74803254f26991c807677ce197d3 100644 (file)
@@ -216,6 +216,8 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
 
        blade_transportmgr_startup(bh->transportmgr, config);
 
+       blade_sessionmgr_startup(bh->sessionmgr, config);
+
        blade_mastermgr_startup(bh->mastermgr, config);
 
        blade_restmgr_startup(bh->restmgr, config);
@@ -306,7 +308,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
        ks_assert(target);
 
        // @todo mini state machine to deal with upstream establishment to avoid attempting multiple upstream connects at the same time?
-       if ((bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if ((bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                blade_session_read_unlock(bs);
                return KS_STATUS_DUPLICATE_OPERATION;
        }
@@ -362,7 +364,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcroute(blade_handle_t *bh, const char *no
        ks_assert(nodeid);
 
 
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -479,7 +481,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char
        ks_assert(bh);
        ks_assert(identity);
 
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -514,7 +516,7 @@ ks_status_t blade_handle_rpcregister_raw(blade_handle_t *bh, const char *identit
        ks_assert(bh);
        ks_assert(identity);
 
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -720,7 +722,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpub
        ks_assert(protocol);
 
        // @todo consideration for the Master trying to publish a protocol, with no upstream
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -969,7 +971,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char
        ks_assert(channels);
 
        // @todo consideration for the Master trying to publish a protocol, with no upstream
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -1167,7 +1169,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *p
        ks_assert(bh);
        ks_assert(protocol);
 
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -1303,7 +1305,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *
        ks_assert(protocol);
 
        if (!(bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), nodeid))) {
-               if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+               if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                        ret = KS_STATUS_DISCONNECTED;
                        goto done;
                }
@@ -1582,7 +1584,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh,
        ks_assert(channels);
 
        // @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
-       if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -1640,7 +1642,7 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh,
                        goto done;
                }
        }
-       else if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
+       else if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                ret = KS_STATUS_DISCONNECTED;
                goto done;
        }
@@ -1895,7 +1897,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, voi
        if (temp_data && temp_data->original_requestid) {
                blade_session_t *relay = NULL;
                if (downstream) {
-                       if (!(relay = blade_routemgr_upstream_lookup(bh->routemgr))) {
+                       if (!(relay = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
                                goto done;
                        }
                } else {
@@ -1929,18 +1931,10 @@ done:
 // blade.broadcast request generator
 KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
 {
-       ks_status_t ret = KS_STATUS_SUCCESS;
-
        ks_assert(bh);
        ks_assert(protocol);
 
-       ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL,  protocol, channel, event, params, callback, data);
-
-       // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
-       // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
-       // is where this normally occurs, however this is not a simple case as the callback expects a blade_rpc_request_t parameter containing context
-
-       return ret;
+       return blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL,  protocol, channel, event, params, callback, data);
 }
 
 // @todo blade_handle_rpcbroadcast_raw() to encapsulate adding subcommands to broadcast to support protocol removal, protocol channel removal, and normal event broadcast
@@ -2023,21 +2017,12 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
 
        req_params_params = cJSON_GetObjectItem(req_params, "params");
 
-       blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_channel, req_params_event, req_params_params, NULL, NULL);
-
-       if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
+       if (!blade_session_loopback(bs)) blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_channel, req_params_event, req_params_params, NULL, NULL);
+       else if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
                bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_channel);
                if (bsub) {
-                       const char *localid = NULL;
-
-                       blade_routemgr_local_copy(bh->routemgr, &localid);
-                       ks_assert(localid);
-
-                       if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
-                               callback = blade_subscription_callback_get(bsub);
-                               if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
-                       }
-                       ks_pool_free(&localid);
+                       callback = blade_subscription_callback_get(bsub);
+                       if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
                }
        }
 
index 64105239cd182a345d97c681ec843655075728a7..59cabdbf279ca3017c1e7729c061876b827c8652 100644 (file)
@@ -338,7 +338,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con
        }
 }
 
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_sessionid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
 {
        ks_pool_t *pool = NULL;
        const char *bsub_key = NULL;
@@ -376,13 +376,13 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
 
                                ks_hash_this(it, (const void **)&key, NULL, &value);
 
-                               if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
+                               //if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
 
-                               // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
-                               if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key)) continue;
+                               //if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key)) continue;
 
                                bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
                                if (bs) {
+                                       if (excluded_sessionid && !ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) continue;
                                        if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
                                        if (!ks_hash_search(routers, (void *)blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs);
                                        else blade_session_read_unlock(bs);
@@ -420,13 +420,13 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
 
                                        ks_hash_this(it2, (const void **)&key2, NULL, &value2);
 
-                                       if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
+                                       //if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
 
-                                       // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
-                                       if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2)) continue;
+                                       //if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2)) continue;
 
                                        bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2);
                                        if (bs) {
+                                               if (excluded_sessionid && !ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) continue;
                                                if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
                                                if (!ks_hash_search(routers, (void *)blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs);
                                                else blade_session_read_unlock(bs);
@@ -444,9 +444,9 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
        }
 
 
-       bs = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bsmgr->handle));
+       bs = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bsmgr->handle));
        if (bs) {
-               if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
+               if (!excluded_sessionid || ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) {
                        if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
                        ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs);
                }
index ea123f4a0e3738de865f69c1d18ff948ca9883af..57aa2c9e021ae4b67e5ac6591416cd5ea1de32fa 100644 (file)
@@ -832,6 +832,8 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
        const char *jsonrpc = NULL;
        const char *id = NULL;
        const char *method = NULL;
+       const char *sessionid = NULL;
+       uuid_t uuid;
        const char *nodeid = NULL;
        ks_time_t timeout;
 
@@ -901,15 +903,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
 
        json_params = cJSON_GetObjectItem(json_req, "params");
        if (json_params) {
-               nodeid = cJSON_GetObjectCstr(json_params, "session-id");
-               if (nodeid) {
-                       // @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid
-                       ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", nodeid);
+               sessionid = cJSON_GetObjectCstr(json_params, "sessionid");
+               if (sessionid) {
+                       ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sessionid);
                }
        }
 
-       if (nodeid) {
-               bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
+       ks_uuid(&uuid);
+       nodeid = ks_uuid_str(ks_pool_get(bc), &uuid);
+
+       if (sessionid) {
+               bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), sessionid);
                if (bs) {
                        if (blade_session_terminating(bs)) {
                                blade_session_read_unlock(bs);
@@ -917,21 +921,22 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
                                bs = NULL;
                        } else {
                                ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
+                               // @todo validate against IP address or something to ensure reconnects are acceptable
                        }
                }
        }
 
        if (!bs) {
-               blade_session_create(&bs, bh, NULL);
+               blade_session_create(&bs, bh, BLADE_SESSION_FLAGS_NONE, NULL);
                ks_assert(bs);
 
-               nodeid = blade_session_id_get(bs);
-               ks_log(KS_LOG_DEBUG, "Session (%s) created\n", nodeid);
+               sessionid = blade_session_id_get(bs);
+               ks_log(KS_LOG_DEBUG, "Session (%s) created\n", sessionid);
 
                blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
 
                if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
-                       ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", nodeid);
+                       ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", sessionid);
                        blade_transport_wss_rpc_error_send(bc, id, -32603, "Internal error, session could not be started");
                        blade_session_read_unlock(bs);
                        blade_session_destroy(&bs);
@@ -941,7 +946,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
 
                // This is an inbound connection, thus it is always creating a downstream session
 
-               ks_log(KS_LOG_DEBUG, "Session (%s) started\n", nodeid);
+               ks_log(KS_LOG_DEBUG, "Session (%s) started\n", sessionid);
                blade_sessionmgr_session_add(blade_handle_sessionmgr_get(bh), bs);
 
                // This is primarily to cleanup the routes added to the blade_handle for main routing when a session terminates, these don't have a lot of use otherwise but it will keep the main route table
@@ -951,16 +956,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
                // a message should pass through when it does not match the local node id from blade_routemgr_t, and must be matched with a call to blade_session_route_add() for cleanup, additionally when
                // a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.route" would also
                // result in the new identities being added as routes however federation registration would require a special process to maintain proper routing
-               blade_routemgr_route_add(blade_handle_routemgr_get(bh), nodeid, nodeid);
+               blade_routemgr_route_add(blade_handle_routemgr_get(bh), nodeid, sessionid);
        }
 
        blade_rpc_response_raw_create(&json_res, &json_result, id);
        ks_assert(json_res);
 
+       cJSON_AddStringToObject(json_result, "sessionid", sessionid);
        cJSON_AddStringToObject(json_result, "nodeid", nodeid);
 
        if (!blade_routemgr_master_pack(blade_handle_routemgr_get(bh), json_result, "master-nodeid")) {
-               ks_log(KS_LOG_DEBUG, "Master nodeid unavailable\n");
+               ks_log(KS_LOG_DEBUG, "Master nodeid unavailable, upstream is not established\n");
                blade_transport_wss_rpc_error_send(bc, id, -32602, "Master nodeid unavailable");
                ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
                goto done;
@@ -969,7 +975,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
        // This starts the final process for associating the connection to the session, including for reconnecting to an existing session, this simply
        // associates the session to this connection, upon return the remainder of the association for the session to the connection is handled along
        // with making sure both this connection and the session state machines are in running states
-       blade_connection_session_set(bc, nodeid);
+       blade_connection_session_set(bc, sessionid);
 
        // @todo end of reusable handler for "blade.connect" request
 
@@ -1004,6 +1010,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
        const char *id = NULL;
        cJSON *json_error = NULL;
        cJSON *json_result = NULL;
+       const char *sessionid = NULL;
        const char *nodeid = NULL;
        const char *master_nodeid = NULL;
        blade_session_t *bs = NULL;
@@ -1032,7 +1039,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
        blade_rpc_request_raw_create(pool, &json_req, &json_params, &mid, "blade.connect");
        ks_assert(json_req);
 
-       if (btwssl->session_id) cJSON_AddStringToObject(json_params, "session-id", btwssl->session_id);
+       if (btwssl->session_id) cJSON_AddStringToObject(json_params, "sessionid", btwssl->session_id);
 
        ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (btwssl->session_id ? btwssl->session_id : "none"));
 
@@ -1086,6 +1093,13 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
                goto done;
        }
 
+       sessionid = cJSON_GetObjectCstr(json_result, "sessionid");
+       if (!sessionid) {
+               ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'sessionid'\n");
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
+
        nodeid = cJSON_GetObjectCstr(json_result, "nodeid");
        if (!nodeid) {
                ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'nodeid'\n");
@@ -1100,15 +1114,19 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
                goto done;
        }
 
-
-       // @todo validate uuid format by parsing, not currently available in uuid functions
-       bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
+       bs = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh));
        if (bs) {
-               ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
+               if (ks_safe_strcasecmp(blade_session_id_get(bs), sessionid)) {
+                       ks_log(KS_LOG_DEBUG, "Already have upstream session with different sessionid, could not establish session\n");
+                       blade_session_read_unlock(bs);
+                       ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+                       goto done;
+               }
+               ks_log(KS_LOG_DEBUG, "Session (%s) reestablishing\n", blade_session_id_get(bs));
        }
 
        if (!bs) {
-               blade_session_create(&bs, bh, nodeid);
+               blade_session_create(&bs, bh, BLADE_SESSION_FLAGS_UPSTREAM, sessionid);
                ks_assert(bs);
 
                ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
@@ -1123,20 +1141,11 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
                        goto done;
                }
 
-               // This is an outbound connection, thus it is always creating an upstream session, defined by the sessionid matching the local_nodeid in the handle
-
-               if (blade_routemgr_local_set(blade_handle_routemgr_get(bh), nodeid) != KS_STATUS_SUCCESS) {
-                       ks_log(KS_LOG_DEBUG, "Session (%s) abandoned, upstream already available\n", blade_session_id_get(bs));
-                       blade_session_read_unlock(bs);
-                       blade_session_hangup(bs);
-                       ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
-                       goto done;
-               }
-
+               // This is an outbound connection, thus it is always creating an upstream session
                ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs));
-
                blade_sessionmgr_session_add(blade_handle_sessionmgr_get(bh), bs);
 
+               blade_routemgr_local_set(blade_handle_routemgr_get(bh), nodeid);
                blade_routemgr_master_set(blade_handle_routemgr_get(bh), master_nodeid);
        }
 
index c75426c18307829bc12e9deb20a07cc63b52490e..d1e930b65725a5bb9436682f80750dfc0330a68c 100644 (file)
@@ -43,7 +43,6 @@ KS_DECLARE(ks_status_t) blade_routemgr_local_set(blade_routemgr_t *brmgr, const
 KS_DECLARE(ks_bool_t) blade_routemgr_local_check(blade_routemgr_t *brmgr, const char *target);
 KS_DECLARE(ks_bool_t) blade_routemgr_local_copy(blade_routemgr_t *brmgr, const char **nodeid);
 KS_DECLARE(ks_bool_t) blade_routemgr_local_pack(blade_routemgr_t *brmgr, cJSON *json, const char *key);
-KS_DECLARE(blade_session_t *) blade_routemgr_upstream_lookup(blade_routemgr_t *brmgr);
 KS_DECLARE(ks_status_t) blade_routemgr_master_set(blade_routemgr_t *brmgr, const char *nodeid);
 KS_DECLARE(ks_bool_t) blade_routemgr_master_check(blade_routemgr_t *brmgr, const char *target);
 KS_DECLARE(ks_bool_t) blade_routemgr_master_pack(blade_routemgr_t *brmgr, cJSON *json, const char *key);
index 4bdbe8a85ccdf7dec61a45d88523303269fa30a7..fb7a2c64ce3ac85c3be2bf3b70ed546e01d503ff 100644 (file)
 #include <blade.h>
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, const char *id);
+KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, blade_session_flags_t flags, const char *sessionid);
 KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP);
 KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs);
 KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs);
+KS_DECLARE(ks_bool_t) blade_session_loopback(blade_session_t *bs);
+KS_DECLARE(ks_bool_t) blade_session_upstream(blade_session_t *bs);
 KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs);
 KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid);
index 62f2783d8df59d42bad4d4b52531b4a68ccbcb0c..9666142a14e411ef3dfe1723ca217c29e2b52adb 100644 (file)
@@ -39,7 +39,10 @@ KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_sessionmgr_create(blade_sessionmgr_t **bsmgrP, blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_sessionmgr_destroy(blade_sessionmgr_t **bsmgrP);
 KS_DECLARE(blade_handle_t *) blade_sessionmgr_handle_get(blade_sessionmgr_t *bsmgr);
+KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, config_setting_t *config);
 KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr);
+KS_DECLARE(blade_session_t *) blade_sessionmgr_loopback_lookup(blade_sessionmgr_t *bsmgr);
+KS_DECLARE(blade_session_t *) blade_sessionmgr_upstream_lookup(blade_sessionmgr_t *bsmgr);
 KS_DECLARE(blade_session_t *) blade_sessionmgr_session_lookup(blade_sessionmgr_t *bsmgr, const char *id);
 KS_DECLARE(ks_status_t) blade_sessionmgr_session_add(blade_sessionmgr_t *bsmgr, blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmgr, blade_session_t *bs);
index 0cc811985e142f3848f9a859a3a54e574b67db18..ace7db8887a7fbb3feed122f12d2bf5fa75f133e 100644 (file)
@@ -44,7 +44,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscrip
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *channel, const char *subscriber);
 KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *channel, const char *subscriber);
 KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
-KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_sessionid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
 KS_END_EXTERN_C
 
 #endif
index 93d61b6f950104d53eb095fbf67e07179cfcdb38..2003e3dfbb9b7afc17e5c773ca5dbc3bc15cb466 100644 (file)
@@ -95,6 +95,11 @@ typedef enum {
        BLADE_CONNECTION_STATE_HOOK_BYPASS,
 } blade_connection_state_hook_t;
 
+typedef enum {
+       BLADE_SESSION_FLAGS_NONE = 0 << 0,
+       BLADE_SESSION_FLAGS_LOOPBACK = 1 << 0,
+       BLADE_SESSION_FLAGS_UPSTREAM = 1 << 1,
+} blade_session_flags_t;
 
 typedef enum {
        BLADE_SESSION_STATE_CONDITION_PRE,