ks_rwl_t *lock;
ks_q_t *sending;
+
+ const char *session;
};
void *blade_connection_state_thread(ks_thread_t *thread, void *data);
bc->shutdown = KS_FALSE;
}
+ if (bc->session) ks_pool_free(bc->pool, &bc->session);
+
while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
return KS_STATUS_SUCCESS;
return ks_q_trypop(bc->sending, (void **)json);
}
+KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc)
+{
+ ks_assert(bc);
+
+ return bc->session;
+}
+
+KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id)
+{
+ ks_assert(bc);
+
+ if (bc->session) ks_pool_free(bc->pool, &bc->session);
+ bc->session = ks_pstrdup(bc->pool, id);
+}
void *blade_connection_state_thread(ks_thread_t *thread, void *data)
{
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH);
break;
case BLADE_CONNECTION_STATE_ATTACH:
- // @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge?
- // determine how much of session management is handled here... do we process these session negotiation messages without
- // passing it up to the application layer? or does the application layer give back a session and build the response?
- blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
- break;
+ {
+ blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
+ ks_assert(bs); // should not happen because bs should still be locked
+
+ blade_session_connections_add(bs, bc->id);
+
+ blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
+ blade_session_state_set(bs, BLADE_SESSION_STATE_READY);
+
+ blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching
+ break;
+ }
case BLADE_CONNECTION_STATE_DETACH:
// @todo detach from session if this connection is attached
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
+ blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+ blade_transport_wss_t *bt_wss = NULL;
+ cJSON *json = NULL;
+ cJSON *params = NULL;
+ blade_session_t *bs = NULL;
+ blade_handle_t *bh = NULL;
+ const char *jsonrpc = NULL;
+ const char *method = NULL;
+ const char *id = NULL;
+ const char *sid = NULL;
+ ks_time_t timeout;
+
ks_assert(bc);
+ bh = blade_connection_handle_get(bc);
+ ks_assert(bh);
+
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
- // @todo block while reading expected message with blade_transport_wss_read(bt_wss, json)
+ bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- // @todo check if expected message is a request by confirming it has a method field (along with json field validation, stay compliant with jsonrpc)
+ // @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
+ timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
+ while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) {
+ if (json) break;
+ ks_sleep(250);
+ if (ks_time_now() >= timeout) break;
+ }
- // @todo validate method is "blade.session.attach"
+ if (!json) {
+ // @todo error logging
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo validate parameters "session-id" and "session-token" must both be present or omitted, validate both are strings and valid uuid format
- // if both are omitted, params may be omitted entirely by jsonrpc spec
+ // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
+ jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values
+ if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
+ // @todo error logging
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo if session-id is provided, lookup existing session within the blade_handle_t
+ id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id
+ if (!id) {
+ // @todo error logging
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo if the session exists, verify the session-token, if it matches then use this session
+ method = cJSON_GetObjectCstr(json, "method");
+ if (!method || strcasecmp(method, "blade.session.attach")) {
+ // @todo error logging
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
+
+ params = cJSON_GetObjectItem(json, "params");
+ if (params) {
+ sid = cJSON_GetObjectCstr(params, "session-id");
+ if (sid) {
+ // @todo validate uuid format by parsing, not currently available in uuid functions
+ ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid);
+ }
+ }
+
+ if (sid) {
+ bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
+ if (bs) {
+ ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs));
+ }
+ }
+
+ if (!bs) {
+ blade_session_create(&bs, bh);
+ ks_assert(bs);
- // @todo if the session-token does not match, or the session does not exist, or the session-id and session-token are not provided then create a new session
+ ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs));
- // @todo once session is established, associate it to the connection
+ blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
- // @todo if anything fails, return HOOK_DISCONNECT, otherwise return HOOK_SUCCESS which will continue the rest of the session attaching process
- // which is to grab the expected session off the connection and attach the connection to the connection list on the session, start the session thread if
- // it hasn't already been started, and set the session state to CONNECT or ATTACH... discuss with tony, finalize session state machine regarding multiple
- // connections attempting to attach at the same time to the session and changing the session state, may need to queue pending connections to the session
- // and process them from within the session state machine thread
+ if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
+ blade_session_read_unlock(bs);
+ blade_session_destroy(&bs);
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
+ blade_handle_sessions_add(bs);
+ }
+
+ blade_connection_session_set(bc, blade_session_id_get(bs));
- ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done
- return BLADE_CONNECTION_STATE_HOOK_BYPASS;
+ done:
+ // @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state
+ // machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard
+ // behaviour to simply go as far as assigning a session to the connection and let the system handle the rest
+ if (json) cJSON_Delete(json);
+ return ret;
}
blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+ ks_sleep(1000);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+ ks_sleep(1000);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_session_state_t state;
const char *id;
+ ks_rwl_t *lock;
list_t connections;
ks_q_t *sending;
ks_uuid(&id);
bs->id = ks_uuid_str(pool, &id);
+ ks_rwl_create(&bs->lock, pool);
+ ks_assert(bs->lock);
+
list_init(&bs->connections);
ks_q_create(&bs->sending, pool, 0);
ks_assert(bs->sending);
ks_q_destroy(&bs->receiving);
ks_q_destroy(&bs->sending);
+ ks_rwl_destroy(&bs->lock);
+
ks_pool_free(bs->pool, &bs->id);
ks_pool_free(bs->pool, bsP);
while (ks_q_trypop(bs->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
while (ks_q_trypop(bs->receiving, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
+ list_iterator_start(&bs->connections);
+ while (list_iterator_hasnext(&bs->connections)) {
+ const char *id = (const char *)list_iterator_next(&bs->connections);
+ ks_pool_free(bs->pool, &id);
+ }
+ list_iterator_stop(&bs->connections);
+ list_clear(&bs->connections);
+
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs)
+{
+ ks_assert(bs);
+
+ return bs->handle;
+}
+
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs)
{
ks_assert(bs);
bs->id = ks_pstrdup(bs->pool, id);
}
+KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(bs);
+
+ if (block) ret = ks_rwl_read_lock(bs->lock);
+ else ret = ks_rwl_try_read_lock(bs->lock);
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs)
+{
+ ks_assert(bs);
+
+ return ks_rwl_read_unlock(bs->lock);
+}
+
+KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(bs);
+
+ if (block) ret = ks_rwl_write_lock(bs->lock);
+ else ret = ks_rwl_try_write_lock(bs->lock);
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs)
+{
+ ks_assert(bs);
+
+ return ks_rwl_write_unlock(bs->lock);
+}
+
+
KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state)
{
ks_assert(bs);
blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP);
}
+KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ const char *cid = NULL;
+
+ ks_assert(bs);
+
+ cid = ks_pstrdup(bs->pool, id);
+ ks_assert(cid);
+
+ list_append(&bs->connections, cid);
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ uint32_t size = 0;
+
+ ks_assert(bs);
+
+ size = list_size(&bs->connections);
+ for (uint32_t i = 0; i < size; ++i) {
+ const char *cid = (const char *)list_get_at(&bs->connections, i);
+ if (!strcasecmp(cid, id)) {
+ list_delete_at(&bs->connections, i);
+ ks_pool_free(bs->pool, &cid);
+ break;
+ }
+ }
+
+ return ret;
+}
+
ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, blade_connection_t **bcP)
{
blade_connection_t *bc = NULL;
// @todo error logging... this shouldn't happen
return KS_STATUS_FAIL;
}
+ // @todo make sure the connection is in the READY state before allowing it to be choosen, just in case it is detaching or not quite fully attached
*bcP = bc;
// @todo detach from session if this connection is attached
blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
break;
+ case BLADE_SESSION_STATE_CONNECT:
+ break;
+ case BLADE_SESSION_STATE_ATTACH:
+ break;
+ case BLADE_SESSION_STATE_DETACH:
+ break;
case BLADE_SESSION_STATE_READY:
// @todo pop from session receiving queue and pass to blade_protocol_process()
break;
}
+KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid)
+{
+ blade_session_t *bs = NULL;
+
+ ks_assert(bs);
+ ks_assert(sid);
+
+ ks_hash_read_lock(bh->sessions);
+ bs = ks_hash_search(bh->sessions, (void *)sid, KS_UNLOCKED);
+ if (bs && blade_session_read_lock(bs, KS_FALSE) != KS_STATUS_SUCCESS) bs = NULL;
+ ks_hash_read_unlock(bh->sessions);
+
+ return bs;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_handle_t *bh = NULL;
+
+ ks_assert(bs);
+
+ bh = blade_session_handle_get(bs);
+ ks_assert(bh);
+
+ ks_hash_write_lock(bh->sessions);
+ ret = ks_hash_insert(bh->sessions, (void *)blade_session_id_get(bs), bs);
+ ks_hash_write_unlock(bh->sessions);
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
+{
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_handle_t *bh = NULL;
+
+ ks_assert(bs);
+
+ bh = blade_session_handle_get(bs);
+ ks_assert(bh);
+
+ blade_session_write_lock(bs, KS_TRUE);
+
+ ks_hash_write_lock(bh->sessions);
+ if (ks_hash_remove(bh->sessions, (void *)blade_session_id_get(bs)) == NULL) ret = KS_STATUS_FAIL;
+ ks_hash_write_unlock(bh->sessions);
+
+ blade_session_write_unlock(bs);
+
+ return ret;
+}
+
+
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
{
KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json);
KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJSON **json);
+KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc);
+KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id);
KS_END_EXTERN_C
#endif
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP);
-KS_DECLARE(ks_status_t) blade_sesssion_startup(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs);
+KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs);
KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id);
+KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block);
+KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block);
+KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs);
KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state);
KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id);
+KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks);
KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target);
+
KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid);
KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc);
KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc);
+KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid);
+KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs);
+KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs);
+
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
BLADE_SESSION_STATE_NONE,
BLADE_SESSION_STATE_DESTROY,
BLADE_SESSION_STATE_HANGUP,
+ BLADE_SESSION_STATE_CONNECT,
BLADE_SESSION_STATE_ATTACH,
BLADE_SESSION_STATE_DETACH,
BLADE_SESSION_STATE_READY,