]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9952: Added the first half of the session negotations for the server side, unteste...
authorShane Bryldt <astaelan@gmail.com>
Tue, 21 Feb 2017 21:20:44 +0000 (21:20 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 22 Mar 2017 21:42:50 +0000 (17:42 -0400)
libs/libblade/src/blade_connection.c
libs/libblade/src/blade_module_wss.c
libs/libblade/src/blade_session.c
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade_connection.h
libs/libblade/src/include/blade_session.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_types.h

index d548c477dedae22f0e182096e1759da5f2614793..eadbea4adb1762f983152bbf82874c2820163367 100644 (file)
@@ -50,6 +50,8 @@ struct blade_connection_s {
        ks_rwl_t *lock;
        
        ks_q_t *sending;
+
+       const char *session;
 };
 
 void *blade_connection_state_thread(ks_thread_t *thread, void *data);
@@ -147,6 +149,8 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
                bc->shutdown = KS_FALSE;
        }
 
+       if (bc->session) ks_pool_free(bc->pool, &bc->session);
+
        while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
 
        return KS_STATUS_SUCCESS;
@@ -304,6 +308,20 @@ KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJS
        return ks_q_trypop(bc->sending, (void **)json);
 }
 
+KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc)
+{
+       ks_assert(bc);
+
+       return bc->session;
+}
+
+KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id)
+{
+       ks_assert(bc);
+
+       if (bc->session) ks_pool_free(bc->pool, &bc->session);
+       bc->session = ks_pstrdup(bc->pool, id);
+}
 
 void *blade_connection_state_thread(ks_thread_t *thread, void *data)
 {
@@ -369,11 +387,18 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH);
                                break;
                        case BLADE_CONNECTION_STATE_ATTACH:
-                               // @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge?
-                               // determine how much of session management is handled here... do we process these session negotiation messages without
-                               // passing it up to the application layer? or does the application layer give back a session and build the response?
-                               blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
-                               break;
+                               {
+                                       blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
+                                       ks_assert(bs); // should not happen because bs should still be locked
+                                       
+                                       blade_session_connections_add(bs, bc->id);
+                                       
+                                       blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
+                                       blade_session_state_set(bs, BLADE_SESSION_STATE_READY);
+                                       
+                                       blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching
+                                       break;
+                               }
                        case BLADE_CONNECTION_STATE_DETACH:
                                // @todo detach from session if this connection is attached
                                blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
index 8ca224cf9fb3578176d570c533d2b4ac207d0521..429fdba344eef2cc335bb9d901d2e8a6f47a8253 100644 (file)
@@ -895,35 +895,104 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blad
 
 blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
 {
+       blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+       blade_transport_wss_t *bt_wss = NULL;
+       cJSON *json = NULL;
+       cJSON *params = NULL;
+       blade_session_t *bs = NULL;
+       blade_handle_t *bh = NULL;
+       const char *jsonrpc = NULL;
+       const char *method = NULL;
+       const char *id = NULL;
+       const char *sid = NULL;
+       ks_time_t timeout;
+
        ks_assert(bc);
 
+       bh = blade_connection_handle_get(bc);
+       ks_assert(bh);
+
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
-       // @todo block while reading expected message with blade_transport_wss_read(bt_wss, json)
+       bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
 
-       // @todo check if expected message is a request by confirming it has a method field (along with json field validation, stay compliant with jsonrpc)
+       // @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
+       timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
+       while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) {
+               if (json) break;
+               ks_sleep(250);
+               if (ks_time_now() >= timeout) break;
+       }
 
-       // @todo validate method is "blade.session.attach"
+       if (!json) {
+               // @todo error logging
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo validate parameters "session-id" and "session-token" must both be present or omitted, validate both are strings and valid uuid format
-       // if both are omitted, params may be omitted entirely by jsonrpc spec
+       // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
+       jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values
+       if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
+               // @todo error logging
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo if session-id is provided, lookup existing session within the blade_handle_t
+       id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id
+       if (!id) {
+               // @todo error logging
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
 
-       // @todo if the session exists, verify the session-token, if it matches then use this session
+       method = cJSON_GetObjectCstr(json, "method");
+       if (!method || strcasecmp(method, "blade.session.attach")) {
+               // @todo error logging
+               ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+               goto done;
+       }
+
+       params = cJSON_GetObjectItem(json, "params");
+       if (params) {
+               sid = cJSON_GetObjectCstr(params, "session-id");
+               if (sid) {
+                       // @todo validate uuid format by parsing, not currently available in uuid functions
+                       ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid);
+               }
+       }
+
+       if (sid) {
+               bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
+               if (bs) {
+                       ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs));
+               }
+       }
+       
+       if (!bs) {
+               blade_session_create(&bs, bh);
+               ks_assert(bs);
 
-       // @todo if the session-token does not match, or the session does not exist, or the session-id and session-token are not provided then create a new session
+               ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs));
 
-       // @todo once session is established, associate it to the connection
+               blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
 
-       // @todo if anything fails, return HOOK_DISCONNECT, otherwise return HOOK_SUCCESS which will continue the rest of the session attaching process
-       // which is to grab the expected session off the connection and attach the connection to the connection list on the session, start the session thread if
-       // it hasn't already been started, and set the session state to CONNECT or ATTACH... discuss with tony, finalize session state machine regarding multiple
-       // connections attempting to attach at the same time to the session and changing the session state, may need to queue pending connections to the session
-       // and process them from within the session state machine thread
+               if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
+                       blade_session_read_unlock(bs);
+                       blade_session_destroy(&bs);
+                       ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+                       goto done;
+               }
+               blade_handle_sessions_add(bs);
+       }
+
+       blade_connection_session_set(bc, blade_session_id_get(bs));
        
-       ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done
-       return BLADE_CONNECTION_STATE_HOOK_BYPASS;
+ done:
+       // @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state
+       // machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard
+       // behaviour to simply go as far as assigning a session to the connection and let the system handle the rest
+       if (json) cJSON_Delete(json);
+       return ret;
 }
 
 blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
@@ -961,6 +1030,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connecti
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
+       ks_sleep(1000);
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
@@ -970,6 +1040,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio
 
        ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
 
+       ks_sleep(1000);
        return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
 }
 
index 950a5108389a9e513700b4137da01b46fe982d6c..fbfd227254abd49befa83d9f93195eaa9ff72a0d 100644 (file)
@@ -42,6 +42,7 @@ struct blade_session_s {
        blade_session_state_t state;
 
        const char *id;
+       ks_rwl_t *lock;
        list_t connections;
        
        ks_q_t *sending;
@@ -69,6 +70,9 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
     ks_uuid(&id);
        bs->id = ks_uuid_str(pool, &id);
 
+    ks_rwl_create(&bs->lock, pool);
+       ks_assert(bs->lock);
+       
        list_init(&bs->connections);
        ks_q_create(&bs->sending, pool, 0);
        ks_assert(bs->sending);
@@ -95,6 +99,8 @@ KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP)
        ks_q_destroy(&bs->receiving);
        ks_q_destroy(&bs->sending);
 
+       ks_rwl_destroy(&bs->lock);
+
     ks_pool_free(bs->pool, &bs->id);
 
        ks_pool_free(bs->pool, bsP);
@@ -138,9 +144,24 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs)
        while (ks_q_trypop(bs->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
        while (ks_q_trypop(bs->receiving, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
 
+       list_iterator_start(&bs->connections);
+       while (list_iterator_hasnext(&bs->connections)) {
+               const char *id = (const char *)list_iterator_next(&bs->connections);
+               ks_pool_free(bs->pool, &id);
+       }
+       list_iterator_stop(&bs->connections);
+       list_clear(&bs->connections);
+       
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs)
+{
+       ks_assert(bs);
+
+       return bs->handle;
+}
+
 KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs)
 {
        ks_assert(bs);
@@ -157,6 +178,43 @@ KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id)
        bs->id = ks_pstrdup(bs->pool, id);
 }
 
+KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(bs);
+
+       if (block) ret = ks_rwl_read_lock(bs->lock);
+       else ret = ks_rwl_try_read_lock(bs->lock);
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs)
+{
+       ks_assert(bs);
+
+       return ks_rwl_read_unlock(bs->lock);
+}
+
+KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(bs);
+
+       if (block) ret = ks_rwl_write_lock(bs->lock);
+       else ret = ks_rwl_try_write_lock(bs->lock);
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs)
+{
+       ks_assert(bs);
+
+       return ks_rwl_write_unlock(bs->lock);
+}
+
+
 KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state)
 {
        ks_assert(bs);
@@ -172,6 +230,41 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs)
                blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP);
 }
 
+KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       const char *cid = NULL;
+
+       ks_assert(bs);
+
+       cid = ks_pstrdup(bs->pool, id);
+       ks_assert(cid);
+       
+       list_append(&bs->connections, cid);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       uint32_t size = 0;
+
+       ks_assert(bs);
+
+       size = list_size(&bs->connections);
+       for (uint32_t i = 0; i < size; ++i) {
+               const char *cid = (const char *)list_get_at(&bs->connections, i);
+               if (!strcasecmp(cid, id)) {
+                       list_delete_at(&bs->connections, i);
+                       ks_pool_free(bs->pool, &cid);
+                       break;
+               }
+       }
+
+       return ret;
+}
+
 ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, blade_connection_t **bcP)
 {
        blade_connection_t *bc = NULL;
@@ -194,6 +287,7 @@ ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, b
                // @todo error logging... this shouldn't happen
                return KS_STATUS_FAIL;
        }
+       // @todo make sure the connection is in the READY state before allowing it to be choosen, just in case it is detaching or not quite fully attached
 
        *bcP = bc;
 
@@ -272,6 +366,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
                        // @todo detach from session if this connection is attached
                        blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
                        break;
+               case BLADE_SESSION_STATE_CONNECT:
+                       break;
+               case BLADE_SESSION_STATE_ATTACH:
+                       break;
+               case BLADE_SESSION_STATE_DETACH:
+                       break;
                case BLADE_SESSION_STATE_READY:
                        // @todo pop from session receiving queue and pass to blade_protocol_process()
                        break;
index 6ec4af3a292fb80cec785eff1ad5cdd88c284d24..5977c091d7c32558802bae982b70b2ac1eccbf3f 100644 (file)
@@ -419,6 +419,60 @@ KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc)
 }
 
 
+KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid)
+{
+       blade_session_t *bs = NULL;
+
+       ks_assert(bs);
+       ks_assert(sid);
+
+       ks_hash_read_lock(bh->sessions);
+       bs = ks_hash_search(bh->sessions, (void *)sid, KS_UNLOCKED);
+       if (bs && blade_session_read_lock(bs, KS_FALSE) != KS_STATUS_SUCCESS) bs = NULL;
+       ks_hash_read_unlock(bh->sessions);
+
+       return bs;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_handle_t *bh = NULL;
+
+       ks_assert(bs);
+
+       bh = blade_session_handle_get(bs);
+       ks_assert(bh);
+
+       ks_hash_write_lock(bh->sessions);
+       ret = ks_hash_insert(bh->sessions, (void *)blade_session_id_get(bs), bs);
+       ks_hash_write_unlock(bh->sessions);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_handle_t *bh = NULL;
+
+       ks_assert(bs);
+
+       bh = blade_session_handle_get(bs);
+       ks_assert(bh);
+
+       blade_session_write_lock(bs, KS_TRUE);
+
+       ks_hash_write_lock(bh->sessions);
+       if (ks_hash_remove(bh->sessions, (void *)blade_session_id_get(bs)) == NULL) ret = KS_STATUS_FAIL;
+       ks_hash_write_unlock(bh->sessions);
+
+       blade_session_write_unlock(bs);
+
+       return ret;
+}
+
+
 
 KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
 {
index 5d149bc445490d02796827aeed4df83bcd79fc6e..2242197861e4feb5c9fe8577c65409b6f92962db 100644 (file)
@@ -57,6 +57,8 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
 KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
 KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json);
 KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJSON **json);
+KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc);
+KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id);
 KS_END_EXTERN_C
 
 #endif
index 975d10bbfb2e3d84e337de99467ed6fbf88daba7..c6d376acd96c9ee9384331fcb05a7b304600faf7 100644 (file)
 KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP);
-KS_DECLARE(ks_status_t) blade_sesssion_startup(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs);
 KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs);
+KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs);
 KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs);
 KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id);
+KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block);
+KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block);
+KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs);
 KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state);
 KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id);
+KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id);
 KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
 KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
index f6de9665bacb733760ff439228c23b309363f95f..7b87fa84f3b8b964ad8389c18ba670a402f31c3c 100644 (file)
@@ -51,10 +51,15 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks);
 KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name);
 KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target);
+
 KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid);
 KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc);
 KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc);
                                                
+KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid);
+KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs);
+                                               
 KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
 KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
index 5531557dd676f9daba78967c9b4eb8fcf3ed8dc8..2442d88a062e1a6f9396a27048b54e9a1e10b54a 100644 (file)
@@ -92,6 +92,7 @@ typedef enum {
        BLADE_SESSION_STATE_NONE,
        BLADE_SESSION_STATE_DESTROY,
        BLADE_SESSION_STATE_HANGUP,
+       BLADE_SESSION_STATE_CONNECT,
        BLADE_SESSION_STATE_ATTACH,
        BLADE_SESSION_STATE_DETACH,
        BLADE_SESSION_STATE_READY,