struct pollfd *listeners_poll;
int32_t listeners_count;
- list_t connected;
- ks_q_t *disconnected;
+ list_t connected; // @todo consider keeping this only as the list of connection id's, since the handle retains the pointer lookup
};
struct blade_transport_wss_s {
ks_pool_t *pool;
ks_socket_t sock;
+ const char *session_id;
};
ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
-ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json);
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock);
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
bm_wss->transport_callbacks = &g_transport_wss_callbacks;
list_init(&bm_wss->connected);
- ks_q_create(&bm_wss->disconnected, bm_wss->pool, 0);
- ks_assert(bm_wss->disconnected);
*bm_wssP = bm_wss;
+ ks_log(KS_LOG_DEBUG, "Created\n");
+
return KS_STATUS_SUCCESS;
}
blade_module_destroy(&bm_wss->module);
list_destroy(&bm_wss->connected);
- ks_q_destroy(&bm_wss->disconnected);
ks_pool_free(bm_wss->pool, bm_wssP);
+ ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
return KS_STATUS_SUCCESS;
}
*bmP = bm_wss->module;
+ ks_log(KS_LOG_DEBUG, "Loaded\n");
+
return KS_STATUS_SUCCESS;
}
blade_module_wss_destroy(&bm_wss);
+ ks_log(KS_LOG_DEBUG, "Unloaded\n");
+
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock)
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id)
{
blade_transport_wss_init_t *bt_wssi = NULL;
bt_wssi->module = bm_wss;
bt_wssi->pool = bm_wss->pool;
bt_wssi->sock = sock;
+ if (session_id) bt_wssi->session_id = ks_pstrdup(bt_wssi->pool, session_id);
*bt_wssiP = bt_wssi;
+ ks_log(KS_LOG_DEBUG, "Created\n");
+
return KS_STATUS_SUCCESS;
}
bt_wssi = *bt_wssiP;
+ if (bt_wssi->session_id) ks_pool_free(bt_wssi->pool, &bt_wssi->session_id);
+
ks_pool_free(bt_wssi->pool, bt_wssiP);
+ ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
return KS_STATUS_SUCCESS;
}
bm_wss->config_wss_endpoints_backlog = config_wss_endpoints_backlog;
//bm_wss->config_wss_ssl = config_wss_ssl;
+ ks_log(KS_LOG_DEBUG, "Configured\n");
+
return KS_STATUS_SUCCESS;
}
blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
+ ks_log(KS_LOG_DEBUG, "Started\n");
+
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
{
blade_module_wss_t *bm_wss = NULL;
- blade_transport_wss_t *bt_wss = NULL;
blade_connection_t *bc = NULL;
ks_assert(bm);
bm_wss->listeners_count = 0;
if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
- // @todo connections should be gracefully disconnected so that they detach from sessions properly
- // which means this should occur before the listeners thread is terminated, which requires that
- // the listener sockets be made inactive (or closed) to stop accepting while shutting down
- while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) ;
- list_iterator_start(&bm_wss->connected);
- while (list_iterator_hasnext(&bm_wss->connected)) {
- bc = (blade_connection_t *)list_iterator_next(&bm_wss->connected);
- bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
-
- blade_connection_destroy(&bc);
- blade_transport_wss_destroy(&bt_wss);
+ if (list_size(&bm_wss->connected) > 0) {
+ // this approach to shutdown is cleaner, ensures connections will detach from sessions and be destroyed all in the same places
+ list_iterator_start(&bm_wss->connected);
+ while (list_iterator_hasnext(&bm_wss->connected)) {
+ bc = (blade_connection_t *)list_iterator_next(&bm_wss->connected);
+ blade_connection_disconnect(bc);
+ }
+ list_iterator_stop(&bm_wss->connected);
+ while (list_size(&bm_wss->connected) > 0) ks_sleep_ms(100);
}
- list_iterator_stop(&bm_wss->connected);
- list_clear(&bm_wss->connected);
+
+ ks_log(KS_LOG_DEBUG, "Stopped\n");
return KS_STATUS_SUCCESS;
}
bm_wss->listeners_poll[listener_index].fd = listener;
bm_wss->listeners_poll[listener_index].events = POLLIN | POLLERR;
+ ks_log(KS_LOG_DEBUG, "Bound %s on port %d at index %d\n", ks_addr_get_host(addr), ks_addr_get_port(addr), listener_index);
+
done:
if (ret != KS_STATUS_SUCCESS) {
if (listener != KS_SOCK_INVALID) {
{
blade_module_wss_t *bm_wss = NULL;
blade_transport_wss_init_t *bt_wss_init = NULL;
- blade_transport_wss_t *bt_wss = NULL;
blade_connection_t *bc = NULL;
ks_assert(thread);
if (bm_wss->listeners_poll[index].revents & POLLERR) {
// @todo: error handling, just skip the listener for now, it might recover, could skip X times before closing?
+ ks_log(KS_LOG_DEBUG, "POLLERR on index %d\n", index);
continue;
}
if (!(bm_wss->listeners_poll[index].revents & POLLIN)) continue;
if ((sock = accept(bm_wss->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) {
// @todo: error handling, just skip the socket for now as most causes are because remote side became unreachable
+ ks_log(KS_LOG_DEBUG, "Accept failed on index %d\n", index);
continue;
}
- ks_log(KS_LOG_DEBUG, "Socket Accepted\n");
+ // @todo getsockname and getpeername (getpeername can be skipped if passing to accept instead)
+
+ ks_log(KS_LOG_DEBUG, "Socket accepted\n", index);
- blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
+ blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, NULL);
ks_assert(bt_wss_init);
blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
blade_connection_read_lock(bc, KS_TRUE);
if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
blade_connection_destroy(&bc);
blade_transport_wss_init_destroy(&bt_wss_init);
ks_socket_close(&sock);
continue;
}
+ ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
+
blade_handle_connections_add(bc);
list_append(&bm_wss->connected, bc);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
blade_connection_read_unlock(bc);
}
}
-
- while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) {
- bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
- bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
-
- blade_handle_connections_remove(bc);
- list_delete(&bm_wss->connected, bc);
-
- if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
- blade_connection_destroy(&bc);
- if (bt_wss) blade_transport_wss_destroy(&bt_wss);
- }
}
ks_log(KS_LOG_DEBUG, "Stopped\n");
*bt_wssP = bt_wss;
+ ks_log(KS_LOG_DEBUG, "Created\n");
+
return KS_STATUS_SUCCESS;
}
ks_pool_free(bt_wss->pool, bt_wssP);
+ ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_module_wss_t *bm_wss = NULL;
if (!ip) {
// @todo: temporary, this should fall back on DNS SRV or whatever else can turn "a@b.com" into an ip (and port?) to connect to
// also need to deal with hostname lookup, so identities with wss transport need to have a host parameter that is an IP for the moment
+ ks_log(KS_LOG_DEBUG, "No host provided\n");
ret = KS_STATUS_FAIL;
goto done;
}
ks_size_t len = strlen(ip);
if (len <= 3) {
+ ks_log(KS_LOG_DEBUG, "Invalid host provided\n");
ret = KS_STATUS_FAIL;
goto done;
}
int p = atoi(portstr);
if (p > 0 && p <= UINT16_MAX) port = p;
}
-
+
+ ks_log(KS_LOG_DEBUG, "Connecting to %s on port %d\n", ip, port);
+
ks_addr_set(&addr, ip, port, family);
if ((sock = ks_socket_connect(SOCK_STREAM, IPPROTO_TCP, &addr)) == KS_SOCK_INVALID) {
// @todo: error handling, just fail for now as most causes are because remote side became unreachable
+ ks_log(KS_LOG_DEBUG, "Connect failed\n");
ret = KS_STATUS_FAIL;
goto done;
}
- ks_log(KS_LOG_DEBUG, "Socket Connected\n");
+ ks_log(KS_LOG_DEBUG, "Socket connected\n");
- blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
+ blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock, session_id);
ks_assert(bt_wss_init);
blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
ks_assert(bc);
if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_OUTBOUND) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Connection (%s) startup failed\n", blade_connection_id_get(bc));
blade_connection_destroy(&bc);
blade_transport_wss_init_destroy(&bt_wss_init);
ks_socket_close(&sock);
ret = KS_STATUS_FAIL;
goto done;
}
+ ks_log(KS_LOG_DEBUG, "Connection (%s) started\n", blade_connection_id_get(bc));
// @todo make sure it's sensible to be mixing outbound and inbound connections in the same list, but this allows entering the destruction pipeline
// for module shutdown, disconnects and errors without special considerations
+ blade_handle_connections_add(bc);
list_append(&bm_wss->connected, bc);
- *bcP = bc;
-
+
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
+ *bcP = bc;
+
done:
return ret;
}
char *json_str = cJSON_PrintUnformatted(json);
ks_size_t json_str_len = 0;
if (!json_str) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Failed to generate json string\n");
ret = KS_STATUS_FAIL;
goto done;
}
- json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
+ // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
+ json_str_len = strlen(json_str) + 1;
if (kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len) != json_str_len) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Failed to write frame\n");
ret = KS_STATUS_FAIL;
goto done;
}
+ ks_log(KS_LOG_DEBUG, "Frame written %d bytes\n", json_str_len);
done:
if (json_str) free(json_str);
ks_assert(bc);
ks_assert(json);
- ks_log(KS_LOG_DEBUG, "Send Callback\n");
-
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
ret = blade_transport_wss_write(bt_wss, json);
- // @todo use reference counting on blade_identity_t and cJSON objects
cJSON_Delete(json);
return ret;
*json = NULL;
if (poll_flags & KS_POLL_ERROR) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "POLLERR\n");
return KS_STATUS_FAIL;
}
if (poll_flags & KS_POLL_READ) {
// -2 means nonblocking wait
// other values are based on WS_XXX reasons
// negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
+ ks_log(KS_LOG_DEBUG, "Failed to read frame\n");
return KS_STATUS_FAIL;
}
+ ks_log(KS_LOG_DEBUG, "Frame read %d bytes\n", frame_data_len);
if (!(*json = cJSON_Parse((char *)frame_data))) {
+ ks_log(KS_LOG_DEBUG, "Failed to parse frame\n");
return KS_STATUS_FAIL;
}
}
ks_assert(bc);
ks_assert(json);
- ks_log(KS_LOG_DEBUG, "Receive Callback\n");
-
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
return blade_transport_wss_read(bt_wss, json);
blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
blade_transport_wss_t *bt_wss = NULL;
+ blade_transport_wss_init_t *bt_wss_init = NULL;
ks_assert(bc);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+ bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
+
+ list_delete(&bt_wss->module->connected, bc);
- ks_q_push(bt_wss->module->disconnected, bc);
+ if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
+ if (bt_wss) blade_transport_wss_destroy(&bt_wss);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
// @todo: SSL init stuffs based on data from config to pass into kws_init
if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Failed websocket init\n");
return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
}
// @todo: SSL init stuffs based on data from config to pass into kws_init
if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, "/blade:blade.invalid:blade", KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Failed websocket init\n");
return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
}
{
blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
blade_transport_wss_t *bt_wss = NULL;
- cJSON *json = NULL;
+ cJSON *json_req = NULL;
+ cJSON *json_res = NULL;
cJSON *params = NULL;
+ cJSON *result = NULL;
+ //cJSON *error = NULL;
blade_session_t *bs = NULL;
blade_handle_t *bh = NULL;
const char *jsonrpc = NULL;
- const char *method = NULL;
const char *id = NULL;
+ const char *method = NULL;
const char *sid = NULL;
ks_time_t timeout;
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+ if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
// @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
- while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) {
- if (json) break;
- ks_sleep(250);
+ while (blade_transport_wss_read(bt_wss, &json_req) == KS_STATUS_SUCCESS) {
+ if (json_req) break;
+ ks_sleep_ms(250);
if (ks_time_now() >= timeout) break;
}
- if (!json) {
- // @todo error logging
+ if (!json_req) {
+ ks_log(KS_LOG_DEBUG, "Failed to receive message before timeout\n");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
// @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
- jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values
+ jsonrpc = cJSON_GetObjectCstr(json_req, "jsonrpc"); // @todo check for definitions of these keys and fixed values
if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Received message is not the expected protocol\n");
+ // @todo send error response before disconnecting, code = -32600 (invalid request)
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
- id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id
+ id = cJSON_GetObjectCstr(json_req, "id"); // @todo switch to number if we are not using a uuid for message id
if (!id) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Received message is missing 'id'\n");
+ // @todo send error response before disconnecting, code = -32600 (invalid request)
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
- method = cJSON_GetObjectCstr(json, "method");
+ method = cJSON_GetObjectCstr(json_req, "method");
if (!method || strcasecmp(method, "blade.session.attach")) {
- // @todo error logging
+ ks_log(KS_LOG_DEBUG, "Received message is missing 'method' or is an unexpected method\n");
+ // @todo send error response before disconnecting, code = -32601 (method not found)
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
- params = cJSON_GetObjectItem(json, "params");
+ params = cJSON_GetObjectItem(json_req, "params");
if (params) {
sid = cJSON_GetObjectCstr(params, "session-id");
if (sid) {
- // @todo validate uuid format by parsing, not currently available in uuid functions
- ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid);
+ // @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid
+ ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sid);
}
}
if (sid) {
bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
if (bs) {
- ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs));
+ if (blade_session_terminating(bs)) {
+ blade_session_read_unlock(bs);
+ ks_log(KS_LOG_DEBUG, "Session (%s) terminating\n", blade_session_id_get(bs));
+ bs = NULL;
+ } else {
+ ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
+ }
}
}
-
+
if (!bs) {
blade_session_create(&bs, bh);
ks_assert(bs);
- ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs));
+ ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bs));
+ // @todo send error response before disconnecting, code = -32603 (internal error)
blade_session_read_unlock(bs);
blade_session_destroy(&bs);
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
+ ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs));
blade_handle_sessions_add(bs);
}
+ // @todo wrapper to generate request and response
+ json_res = cJSON_CreateObject();
+ cJSON_AddStringToObject(json_res, "jsonrpc", "2.0");
+ cJSON_AddStringToObject(json_res, "id", id);
+
+ result = cJSON_CreateObject();
+ cJSON_AddStringToObject(result, "session-id", blade_session_id_get(bs));
+ cJSON_AddItemToObject(json_res, "result", result);
+
+ // @todo send response
+ if (blade_transport_wss_write(bt_wss, json_res) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Failed to write message\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
+
blade_connection_session_set(bc, blade_session_id_get(bs));
done:
// @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state
// machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard
// behaviour to simply go as far as assigning a session to the connection and let the system handle the rest
- if (json) cJSON_Delete(json);
+ if (json_req) cJSON_Delete(json_req);
+ if (json_res) cJSON_Delete(json_res);
return ret;
}
blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
+ blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+ blade_handle_t *bh = NULL;
+ blade_transport_wss_t *bt_wss = NULL;
+ blade_transport_wss_init_t *bt_wss_init = NULL;
+ ks_pool_t *pool = NULL;
+ cJSON *json_req = NULL;
+ cJSON *json_res = NULL;
+ uuid_t msgid;
+ const char *mid = NULL;
+ ks_time_t timeout;
+ const char *jsonrpc = NULL;
+ const char *id = NULL;
+ cJSON *error = NULL;
+ cJSON *result = NULL;
+ const char *sid = NULL;
+ blade_session_t *bs = NULL;
+
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
- // @todo produce jsonrpc compliant message to call method "blade.session.attach"
+ if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
- // @todo add params with nested session-id and session-token if attempting to reconnect as a client, this should probably be passed in from
- // the blade_handle_connect() call and then through the init parameters for the transport (do not directly use the old session, but copy the id and token)
+ bh = blade_connection_handle_get(bc);
+ bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+ bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
+ pool = blade_connection_pool_get(bc);
+
+
+ // @todo wrapper to build a request and response/error
+ json_req = cJSON_CreateObject();
+ cJSON_AddStringToObject(json_req, "jsonrpc", "2.0");
+ cJSON_AddStringToObject(json_req, "method", "blade.session.attach");
+
+ ks_uuid(&msgid);
+ mid = ks_uuid_str(pool, &msgid);
+ cJSON_AddStringToObject(json_req, "id", mid);
+
+ if (bt_wss_init->session_id) {
+ cJSON *params = cJSON_CreateObject();
+ cJSON_AddStringToObject(params, "session-id", bt_wss_init->session_id);
+ cJSON_AddItemToObject(json_req, "params", params);
+ }
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (bt_wss_init->session_id ? bt_wss_init->session_id : "none"));
+
+ if (blade_transport_wss_write(bt_wss, json_req) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Failed to write message\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo block while sending message with blade_transport_wss_write(bt_wss, json)
- // @todo block while receiving expected response with blade_transport_wss_read(bt_wss, json)
+ timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
+ while (blade_transport_wss_read(bt_wss, &json_res) == KS_STATUS_SUCCESS) {
+ if (json_res) break;
+ ks_sleep_ms(250);
+ if (ks_time_now() >= timeout) break;
+ }
+
+ if (!json_res) {
+ ks_log(KS_LOG_DEBUG, "Failed to receive message before timeout\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
+
+ // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
+ jsonrpc = cJSON_GetObjectCstr(json_res, "jsonrpc"); // @todo check for definitions of these keys and fixed values
+ if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
+ ks_log(KS_LOG_DEBUG, "Received message is not the expected protocol\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo check for error field, log and return HOOK_DISCONNECT if any errors occur
+ id = cJSON_GetObjectCstr(json_res, "id"); // @todo switch to number if we are not using a uuid for message id
+ if (!id || strcasecmp(mid, id)) {
+ ks_log(KS_LOG_DEBUG, "Received message is missing 'id'\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo check for result field, and nested session-id and session-token
+ error = cJSON_GetObjectItem(json_res, "error");
+ if (error) {
+ ks_log(KS_LOG_DEBUG, "Error message ... add the details\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo lookup the old session from the blade_handle_t, if it still exists then use this session
+ result = cJSON_GetObjectItem(json_res, "result");
+ if (!result) {
+ ks_log(KS_LOG_DEBUG, "Received message is missing 'result'\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo if the old session does not exist, then create a new session and populate with the parameters from the results
+ sid = cJSON_GetObjectCstr(result, "session-id");
+ if (!sid) {
+ ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'session-id'\n");
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
- // @todo once session is established, associate it to the connection, see attach_inbound for notes regarding universal actions after returning SUCCESS
+ if (sid) {
+ // @todo validate uuid format by parsing, not currently available in uuid functions
+ bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
+ if (bs) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
+ }
+ }
+
+ if (!bs) {
+ blade_session_create(&bs, bh); // @todo let sid be passed to constructor, NULL to generate
+ ks_assert(bs);
+
+ blade_session_id_set(bs, sid);
+
+ ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
+
+ blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
+
+ if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bs));
+ blade_session_read_unlock(bs);
+ blade_session_destroy(&bs);
+ ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
+ goto done;
+ }
+ ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs));
+ blade_handle_sessions_add(bs);
+ }
+
+ blade_connection_session_set(bc, blade_session_id_get(bs));
- ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done
- return BLADE_CONNECTION_STATE_HOOK_BYPASS;
+ done:
+ if (mid) ks_pool_free(pool, &mid);
+ if (json_req) cJSON_Delete(json_req);
+ if (json_res) cJSON_Delete(json_res);
+ return ret;
}
blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition)
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
-
- ks_sleep(1000);
+
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
- ks_sleep(1000);
+ ks_sleep_ms(1000);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
*bsP = bs;
+ ks_log(KS_LOG_DEBUG, "Created\n");
+
return KS_STATUS_SUCCESS;
}
ks_pool_free(bs->pool, bsP);
+ ks_log(KS_LOG_DEBUG, "Destroyed\n");
+
return KS_STATUS_SUCCESS;
}
return KS_STATUS_FAIL;
}
+ ks_log(KS_LOG_DEBUG, "Started\n");
+
return KS_STATUS_SUCCESS;
}
list_iterator_stop(&bs->connections);
list_clear(&bs->connections);
+ ks_log(KS_LOG_DEBUG, "Stopped\n");
+
return KS_STATUS_SUCCESS;
}
{
ks_assert(bs);
- if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY)
+ if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) hanging up\n", bs->id);
blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP);
+ }
+}
+
+KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs)
+{
+ ks_assert(bs);
+
+ return bs->state == BLADE_SESSION_STATE_HANGUP || bs->state == BLADE_SESSION_STATE_DESTROY;
}
KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id)
list_append(&bs->connections, cid);
+ ks_log(KS_LOG_DEBUG, "Session (%s) connection added (%s)\n", bs->id, id);
+
return ret;
}
for (uint32_t i = 0; i < size; ++i) {
const char *cid = (const char *)list_get_at(&bs->connections, i);
if (!strcasecmp(cid, id)) {
+ ks_log(KS_LOG_DEBUG, "Session (%s) connection removed (%s)\n", bs->id, id);
list_delete_at(&bs->connections, i);
ks_pool_free(bs->pool, &cid);
break;
// later there will need to be a way to pick which connection to use
cid = list_get_at(&bs->connections, 0);
if (!cid) {
- // @todo error logging... this shouldn't happen
+ // no connections available
return KS_STATUS_FAIL;
}
if (blade_session_connections_choose(bs, json, &bc) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo cache the blade_request_t here if it exists to gaurentee it's cached before a response could be received
blade_connection_sending_push(bc, json);
+ blade_connection_read_unlock(bc);
}
return KS_STATUS_SUCCESS;
return ks_q_trypop(bs->sending, (void **)json);
}
-// @todo receive queue push and pop
+KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json)
+{
+ cJSON *json_copy = NULL;
+
+ ks_assert(bs);
+ ks_assert(json);
+
+ json_copy = cJSON_Duplicate(json, 1);
+ return ks_q_push(bs->receiving, json_copy);
+}
+
+KS_DECLARE(ks_status_t) blade_session_receiving_pop(blade_session_t *bs, cJSON **json)
+{
+ ks_assert(bs);
+ ks_assert(json);
+
+ return ks_q_trypop(bs->receiving, (void **)json);
+}
+
void *blade_session_state_thread(ks_thread_t *thread, void *data)
{
if (!list_empty(&bs->connections)) {
while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
blade_connection_t *bc = NULL;
- if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) blade_connection_sending_push(bc, json);
+ if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) {
+ blade_connection_sending_push(bc, json);
+ blade_connection_read_unlock(bc);
+ }
cJSON_Delete(json);
}
}
switch (state) {
case BLADE_SESSION_STATE_DESTROY:
+ ks_log(KS_LOG_DEBUG, "Session (%s) state destroy\n", bs->id);
+ blade_handle_sessions_remove(bs);
+ blade_session_destroy(&bs);
return NULL;
case BLADE_SESSION_STATE_HANGUP:
- // @todo detach from session if this connection is attached
- blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
- break;
+ {
+ ks_log(KS_LOG_DEBUG, "Session (%s) state hangup\n", bs->id);
+
+ list_iterator_start(&bs->connections);
+ while (list_iterator_hasnext(&bs->connections)) {
+ const char *cid = (const char *)list_iterator_next(&bs->connections);
+ blade_connection_t *bc = blade_handle_connections_get(bs->handle, cid);
+ ks_assert(bc);
+
+ blade_connection_disconnect(bc);
+ blade_connection_read_unlock(bc);
+ }
+ list_iterator_stop(&bs->connections);
+
+ while (!list_empty(&bs->connections)) ks_sleep(100);
+
+ blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
+ break;
+ }
case BLADE_SESSION_STATE_CONNECT:
+ ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id);
+ ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_ATTACH:
+ ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id);
+ ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_DETACH:
+ ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id);
+ ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_READY:
- // @todo pop from session receiving queue and pass to blade_protocol_process()
+ ks_log(KS_LOG_DEBUG, "Session (%s) state ready\n", bs->id);
+ // @todo pop from session receiving queue and pass into protocol layer through something like blade_protocol_process()
+ ks_sleep_ms(1000);
break;
default: break;
}