blade_handle_t *handle;
ks_pool_t *pool;
+ void *transport_init_data;
void *transport_data;
blade_transport_callbacks_t *transport_callbacks;
ks_bool_t shutdown;
// @todo add auto generated UUID
+ blade_connection_direction_t direction;
ks_thread_t *state_thread;
blade_connection_state_t state;
ks_q_t *sending;
- ks_q_t *receiving;
+ //ks_q_t *receiving;
};
void *blade_connection_state_thread(ks_thread_t *thread, void *data);
KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
blade_handle_t *bh,
- void *transport_data,
+ void *transport_init_data,
blade_transport_callbacks_t *transport_callbacks)
{
blade_connection_t *bc = NULL;
ks_assert(bcP);
ks_assert(bh);
- ks_assert(transport_data);
ks_assert(transport_callbacks);
pool = blade_handle_pool_get(bh);
bc = ks_pool_alloc(pool, sizeof(blade_connection_t));
bc->handle = bh;
bc->pool = pool;
- bc->transport_data = transport_data;
+ bc->transport_init_data = transport_init_data;
bc->transport_callbacks = transport_callbacks;
ks_q_create(&bc->sending, pool, 0);
- ks_q_create(&bc->receiving, pool, 0);
+ //ks_q_create(&bc->receiving, pool, 0);
*bcP = bc;
return KS_STATUS_SUCCESS;
blade_connection_shutdown(bc);
ks_q_destroy(&bc->sending);
- ks_q_destroy(&bc->receiving);
+ //ks_q_destroy(&bc->receiving);
ks_pool_free(bc->pool, bcP);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc)
+KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction)
{
ks_assert(bc);
+ bc->direction = direction;
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
if (ks_thread_create_ex(&bc->state_thread,
KS_PRI_NORMAL,
bc->pool) != KS_STATUS_SUCCESS) {
// @todo error logging
- blade_connection_disconnect(bc);
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc)
+{
+ ks_assert(bc);
+
+ return bc->transport_init_data;
+}
+
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
{
ks_assert(bc);
return bc->transport_data;
}
+KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data)
+{
+ ks_assert(bc);
+
+ bc->transport_data = transport_data;
+}
+
+blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state)
+{
+ blade_transport_state_callback_t callback = NULL;
+
+ ks_assert(bc);
+
+ switch (state) {
+ case BLADE_CONNECTION_STATE_DISCONNECT:
+ if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_disconnect_inbound;
+ else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_disconnect_outbound;
+ break;
+ case BLADE_CONNECTION_STATE_NEW:
+ if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_new_inbound;
+ else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_new_outbound;
+ break;
+ case BLADE_CONNECTION_STATE_CONNECT:
+ if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_connect_inbound;
+ else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_connect_outbound;
+ break;
+ case BLADE_CONNECTION_STATE_ATTACH:
+ if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_attach_inbound;
+ else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_attach_outbound;
+ break;
+ case BLADE_CONNECTION_STATE_DETACH:
+ if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_detach_inbound;
+ else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_detach_outbound;
+ break;
+ case BLADE_CONNECTION_STATE_READY:
+ if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_ready_inbound;
+ else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_ready_outbound;
+ break;
+ default: break;
+ }
+
+ return callback;
+}
+
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state)
{
+ blade_transport_state_callback_t callback = NULL;
+ blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
ks_assert(bc);
- bc->transport_callbacks->onstate(bc, state, BLADE_CONNECTION_STATE_CONDITION_PRE);
+ callback = blade_connection_state_callback_lookup(bc, state);
+
+ if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_PRE);
+
bc->state = state;
+
+ if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
}
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
{
ks_assert(bc);
- blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
+ if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT)
+ blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH);
}
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json)
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json)
-{
- ks_assert(bc);
- ks_assert(json);
+// @todo may not need receiving queue on connection, by the time we are queueing we should have a session to receive into
+//KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json)
+//{
+// ks_assert(bc);
+// ks_assert(json);
- return ks_q_push(bc->receiving, json);
-}
+// return ks_q_push(bc->receiving, json);
+//}
-KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json)
-{
- ks_assert(bc);
- ks_assert(json);
+//KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json)
+//{
+// ks_assert(bc);
+// ks_assert(json);
- return ks_q_trypop(bc->receiving, (void **)json);
-}
+// return ks_q_trypop(bc->receiving, (void **)json);
+//}
void *blade_connection_state_thread(ks_thread_t *thread, void *data)
{
blade_connection_t *bc = NULL;
- blade_connection_state_hook_t hook;
+ blade_connection_state_t state;
+ blade_transport_state_callback_t callback = NULL;
+ blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+ cJSON *json = NULL;
ks_assert(thread);
ks_assert(data);
bc = (blade_connection_t *)data;
while (!bc->shutdown) {
- // @todo need to get messages from the transport into receiving queue, and pop messages from sending queue to write out using transport
- // sending is relatively easy, but receiving cannot occur universally due to cases like kws_init() blocking and expecting data to be on the wire
- // and other transports may have similar behaviours, but CONNECTIN, ATTACH, and READY require async message passing into application layer
- // and sending whenever the response hits the queue
+
+ // @todo pop from connection sending queue and call transport callback to write one message (passing target identity too)
+ // and delete the cJSON object here after returning from callback
+
- // @todo it's possible that onstate could handle receiving and sending messages during the appropriate states, but this means some states
- // like CONNECTIN which may send and receive multiple messages require BYPASSing until the application layer updates the state or disconnects
+ // @todo seems like connection will not need a receiving queue as the session will exist prior to async transmissions
+
+ state = bc->state;
+ hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+ callback = blade_connection_state_callback_lookup(bc, state);
+
+ // @todo should this just go in the ready state callback? it's generalized here, so the callback for READY doesn't really
+ // need to do anything
+ if (state == BLADE_CONNECTION_STATE_READY && bc->transport_callbacks->onreceive(bc, &json) == KS_STATUS_SUCCESS && json) {
+ // @todo push json to session receiving queue
+
+ }
- hook = bc->transport_callbacks->onstate(bc, bc->state, BLADE_CONNECTION_STATE_CONDITION_POST);
- if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT)
- blade_connection_disconnect(bc);
+ if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
+
+ if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT && (state == BLADE_CONNECTION_STATE_DETACH || state == BLADE_CONNECTION_STATE_DISCONNECT))
+ hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+ if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) {
- // @todo pop from sending queue, and pass to transport callback to send out
- switch (bc->state) {
+ switch (state) {
+ case BLADE_CONNECTION_STATE_DISCONNECT:
+ return NULL;
case BLADE_CONNECTION_STATE_NEW:
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT);
break;
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH);
break;
case BLADE_CONNECTION_STATE_ATTACH:
+ // @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge?
+ // determine how much of session management is handled here... do we process these session negotiation messages without
+ // passing it up to the application layer? or does the application layer give back a session and build the response?
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
break;
case BLADE_CONNECTION_STATE_DETACH:
- blade_connection_disconnect(bc);
+ // @todo detach from session if this connection is attached
+ blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
break;
default: break;
}
typedef struct blade_module_wss_s blade_module_wss_t;
typedef struct blade_transport_wss_s blade_transport_wss_t;
+typedef struct blade_transport_wss_init_s blade_transport_wss_init_t;
struct blade_module_wss_s {
blade_handle_t *handle;
kws_t *kws;
};
+struct blade_transport_wss_init_s {
+ blade_module_wss_t *module;
+ ks_pool_t *pool;
+
+ ks_socket_t sock;
+};
+
ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh);
ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP);
-ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh);
-ks_status_t blade_module_wss_onunload(blade_module_t *bm);
-ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config);
-ks_status_t blade_module_wss_onshutdown(blade_module_t *bm);
+ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh);
+ks_status_t blade_module_wss_on_unload(blade_module_t *bm);
+ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config);
+ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm);
ks_status_t blade_module_wss_listen(blade_module_wss_t *bm, ks_sockaddr_t *addr);
void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
+
ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
-ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
-blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target);
-blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition);
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
+blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
+
+ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json);
+ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json);
+
+blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition);
+blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition);
+
+
+
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock);
+ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
static blade_module_callbacks_t g_module_wss_callbacks =
{
- blade_module_wss_onload,
- blade_module_wss_onunload,
- blade_module_wss_onstartup,
- blade_module_wss_onshutdown,
+ blade_module_wss_on_load,
+ blade_module_wss_on_unload,
+ blade_module_wss_on_startup,
+ blade_module_wss_on_shutdown,
};
static blade_transport_callbacks_t g_transport_wss_callbacks =
{
- blade_transport_wss_onconnect,
- blade_transport_wss_onrank,
- blade_transport_wss_onstate,
+ blade_transport_wss_on_connect,
+ blade_transport_wss_on_rank,
+ blade_transport_wss_on_send,
+ blade_transport_wss_on_receive,
+
+ blade_transport_wss_on_state_disconnect,
+ blade_transport_wss_on_state_disconnect,
+ blade_transport_wss_on_state_new_inbound,
+ blade_transport_wss_on_state_new_outbound,
+ blade_transport_wss_on_state_connect_inbound,
+ blade_transport_wss_on_state_connect_outbound,
+ blade_transport_wss_on_state_attach_inbound,
+ blade_transport_wss_on_state_attach_outbound,
+ blade_transport_wss_on_state_detach,
+ blade_transport_wss_on_state_detach,
+ blade_transport_wss_on_state_ready,
+ blade_transport_wss_on_state_ready,
};
bm_wss = *bm_wssP;
- blade_module_wss_onshutdown(bm_wss->module);
+ blade_module_wss_on_shutdown(bm_wss->module);
blade_module_destroy(&bm_wss->module);
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh)
+ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh)
{
blade_module_wss_t *bm_wss = NULL;
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_module_wss_onunload(blade_module_t *bm)
+ks_status_t blade_module_wss_on_unload(blade_module_t *bm)
{
blade_module_wss_t *bm_wss = NULL;
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config)
+ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config)
{
blade_module_wss_t *bm_wss = NULL;
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_module_wss_onshutdown(blade_module_t *bm)
+ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm)
{
blade_module_wss_t *bm_wss = NULL;
blade_transport_wss_t *bt_wss = NULL;
void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
{
blade_module_wss_t *bm_wss = NULL;
+ blade_transport_wss_init_t *bt_wss_init = NULL;
blade_transport_wss_t *bt_wss = NULL;
blade_connection_t *bc = NULL;
continue;
}
- blade_transport_wss_create(&bt_wss, bm_wss, sock);
- ks_assert(bt_wss);
+ blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
+ ks_assert(bt_wss_init);
- blade_connection_create(&bc, bm_wss->handle, bt_wss, bm_wss->transport_callbacks);
+ blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
ks_assert(bc);
- blade_connection_startup(bc);
-
+ if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
+ blade_connection_destroy(&bc);
+ blade_transport_wss_init_destroy(&bt_wss_init);
+ ks_socket_close(&sock);
+ continue;
+ }
list_append(&bm_wss->connected, bc);
-
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
}
}
while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) {
+ bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
list_delete(&bm_wss->connected, bc);
+ if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
blade_connection_destroy(&bc);
- blade_transport_wss_destroy(&bt_wss);
+ if (bt_wss) blade_transport_wss_destroy(&bt_wss);
}
}
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
+ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
{
ks_assert(bcP);
ks_assert(bm);
return KS_STATUS_SUCCESS;
}
-blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target)
+blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target)
{
ks_assert(bc);
ks_assert(target);
+ return BLADE_CONNECTION_RANK_POOR;
+}
+
+ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json)
+{
+ char *json_str = cJSON_PrintUnformatted(json);
+ ks_size_t json_str_len = 0;
+ if (!json_str) {
+ // @todo error logging
+ return KS_STATUS_FAIL;
+ }
+ json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
+ kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len);
+
+ free(json_str);
+
return KS_STATUS_SUCCESS;
}
+ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json)
+{
+ blade_transport_wss_t *bt_wss = NULL;
+
+ ks_assert(bc);
+ ks_assert(json);
+
+ ks_log(KS_LOG_DEBUG, "Send Callback\n");
+
+ bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+
+ return blade_transport_wss_write(bt_wss, json);
+}
+
ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json)
{
// @todo get exact timeout from service config?
return KS_STATUS_FAIL;
}
- //if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
- // @todo error logging
- // return KS_STATUS_FAIL;
- //}
-
- // @todo convert frame_data to cJSON safely, make sure data is null-terminated at frame_data_len
if (!(*json = cJSON_Parse((char *)frame_data))) {
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
}
-ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json)
+ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json)
{
- //blade_message_get(message, &target, &json);
- char *json_str = cJSON_PrintUnformatted(json);
- ks_size_t json_str_len = 0;
- if (!json_str) {
- // @todo error logging
- return KS_STATUS_FAIL;
- }
- json_str_len = strlen(json_str) + 1;
- kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len);
+ blade_transport_wss_t *bt_wss = NULL;
- return KS_STATUS_SUCCESS;
+ ks_assert(bc);
+ ks_assert(json);
+
+ ks_log(KS_LOG_DEBUG, "Receive Callback\n");
+
+ bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+
+ return blade_transport_wss_read(bt_wss, json);
}
-blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition)
+blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
blade_transport_wss_t *bt_wss = NULL;
- //cJSON *json = NULL;
ks_assert(bc);
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
- switch (state) {
- case BLADE_CONNECTION_STATE_DISCONNECT:
- {
- if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) {
- ks_q_push(bt_wss->module->disconnected, bc);
- blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
- }
- break;
- }
- case BLADE_CONNECTION_STATE_NEW:
- {
- if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) {
- // @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
- if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
- // @todo error logging
- return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
- }
- }
- break;
- }
- case BLADE_CONNECTION_STATE_CONNECT:
- {
- // @todo abstract read message and write message, so these can be called from connection and processed from there
-
- //if (blade_transport_wss_read(bt_wss, &json) != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATEHOOK_DISCONNECT;
-
- //if (json) {
- // @todo processing connectin messages for identity registration
- // cJSON_Delete(json);
- //blade_connection_receiving_push(conn, json);
- //}
-
- // @todo wrap identity + json into an envelope for queueing through the connection
- //while (blade_connection_sending_pop(bc, (void **)&json) == KS_STATUS_SUCCESS && json) {
- // ks_status_t ret = blade_transport_wss_write(bt_wss, json);
- // cJSON_Delete(json);
- // if (ret != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
- //}
- return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
- //break;
- }
- default: break;
+ ks_q_push(bt_wss->module->disconnected, bc);
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ blade_transport_wss_t *bt_wss = NULL;
+ blade_transport_wss_init_t *bt_wss_init = NULL;
+
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+ bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
+
+ blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
+ ks_assert(bt_wss);
+
+ blade_connection_transport_set(bc, bt_wss);
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ blade_transport_wss_t *bt_wss = NULL;
+
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+
+ bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
+
+ // @todo: SSL init stuffs based on data from config to pass into kws_init
+ if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
+ // @todo error logging
+ return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
}
-
+
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
+blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+ // @todo Establish sessid and discover existing session or create and register new session through BLADE commands
+ // Set session state to CONNECT if its new or RECONNECT if existing
+ // start session and its thread if its new
+
+ return BLADE_CONNECTION_STATE_HOOK_BYPASS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition)
+{
+ ks_assert(bc);
+
+ ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
+
+ return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
+}
+
+ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock)
+{
+ blade_transport_wss_init_t *bt_wssi = NULL;
+
+ ks_assert(bt_wssiP);
+ ks_assert(bm_wss);
+ ks_assert(sock != KS_SOCK_INVALID);
+
+ bt_wssi = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_init_t));
+ bt_wssi->module = bm_wss;
+ bt_wssi->pool = bm_wss->pool;
+ bt_wssi->sock = sock;
+
+ *bt_wssiP = bt_wssi;
+
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP)
+{
+ blade_transport_wss_init_t *bt_wssi = NULL;
+
+ ks_assert(bt_wssiP);
+ ks_assert(*bt_wssiP);
+
+ bt_wssi = *bt_wssiP;
+
+ ks_pool_free(bt_wssi->pool, bt_wssiP);
+
+ return KS_STATUS_SUCCESS;
+}
+
/* For Emacs:
* Local Variables:
* mode:c