libunqlite_la_LIBADD = -lpthread
lib_LTLIBRARIES = libblade.la
-libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c
+libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c
+libblade_la_SOURCES += src/blade_message.c
libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS)
libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
libblade_la_LIBADD = libunqlite.la
library_includedir = $(prefix)/include
library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
-library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h
+library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_message.h
library_include_HEADERS += src/include/unqlite.h test/tap.h
tests: libblade.la
+++ /dev/null
-/*
- * Copyright (c) 2007-2014, Anthony Minessale II
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * * Neither the name of the original author; nor the names of any contributors
- * may be used to endorse or promote products derived from this software
- * without specific prior written permission.
- *
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
- * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
- * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "blade.h"
-
-
-typedef enum {
- BD_NONE = 0,
- BD_MYPOOL = (1 << 0),
- BD_MYTPOOL = (1 << 1),
-} bdpvt_flag_t;
-
-struct blade_directory_s {
- bdpvt_flag_t flags;
- ks_pool_t *pool;
- ks_thread_pool_t *tpool;
-
- config_setting_t *config_service;
-
- blade_service_t *service;
-};
-
-
-
-
-KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP)
-{
- blade_directory_t *bd = NULL;
- bdpvt_flag_t flags;
- ks_pool_t *pool;
-
- ks_assert(bdP);
-
- bd = *bdP;
- *bdP = NULL;
-
- ks_assert(bd);
-
- flags = bd->flags;
- pool = bd->pool;
-
- blade_directory_shutdown(bd);
-
- if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool);
-
- ks_pool_free(bd->pool, &bd);
-
- if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool);
-
- return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool)
-{
- bdpvt_flag_t newflags = BD_NONE;
- blade_directory_t *bd = NULL;
-
- if (!pool) {
- newflags |= BD_MYPOOL;
- ks_pool_open(&pool);
- ks_assert(pool);
- }
- // @todo: move thread pool creation to startup which allows thread pool to be configurable
- if (!tpool) {
- newflags |= BD_MYTPOOL;
- ks_thread_pool_create(&tpool,
- BLADE_DIRECTORY_TPOOL_MIN,
- BLADE_DIRECTORY_TPOOL_MAX,
- BLADE_DIRECTORY_TPOOL_STACK,
- KS_PRI_NORMAL,
- BLADE_DIRECTORY_TPOOL_IDLE);
- ks_assert(tpool);
- }
-
- bd = ks_pool_alloc(pool, sizeof(*bd));
- bd->flags = newflags;
- bd->pool = pool;
- bd->tpool = tpool;
- *bdP = bd;
-
- return KS_STATUS_SUCCESS;
-}
-
-ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config)
-{
- config_setting_t *service = NULL;
-
- ks_assert(bd);
-
- if (!config) return KS_STATUS_FAIL;
- if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
-
- service = config_setting_get_member(config, "service");
- if (!service) return KS_STATUS_FAIL;
-
- bd->config_service = service;
-
- return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config)
-{
- ks_assert(bd);
-
- blade_directory_shutdown(bd);
-
- if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-
- blade_service_create(&bd->service, bd->pool, bd->tpool);
- ks_assert(bd->service);
- if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-
- return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd)
-{
- ks_assert(bd);
-
- if (bd->service) blade_service_destroy(&bd->service);
-
- return KS_STATUS_SUCCESS;
-}
-
-
-/* For Emacs:
- * Local Variables:
- * mode:c
- * indent-tabs-mode:t
- * tab-width:4
- * c-basic-offset:4
- * End:
- * For VIM:
- * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
- */
--- /dev/null
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+struct blade_message_s {
+ ks_pool_t *pool;
+ blade_handle_t *handle;
+
+ void *data;
+ ks_size_t data_length;
+ ks_size_t data_size;
+};
+
+
+KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP)
+{
+ blade_message_t *bm = NULL;
+
+ ks_assert(bmP);
+
+ bm = *bmP;
+ *bmP = NULL;
+
+ ks_assert(bm);
+
+ if (bm->data) ks_pool_free(bm->pool, &bm->data);
+
+ ks_pool_free(bm->pool, &bm);
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle)
+{
+ blade_message_t *bm = NULL;
+
+ ks_assert(bmP);
+ ks_assert(pool);
+ ks_assert(handle);
+
+ bm = ks_pool_alloc(pool, sizeof(*bm));
+ bm->pool = pool;
+ bm->handle = handle;
+ *bmP = bm;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm)
+{
+ ks_assert(bm);
+ ks_assert(*bm);
+
+ return blade_handle_message_discard((*bm)->handle, bm);
+}
+
+KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length)
+{
+ ks_assert(bm);
+ ks_assert(data);
+ ks_assert(data_length > 0);
+
+ // @todo fail on a max message size?
+
+ if (data_length > bm->data_size) {
+ // @todo talk to tony about adding flags to ks_pool_resize_ex to prevent the memcpy, don't need to copy old memory here
+ // otherwise switch to a new allocation instead of resizing
+ bm->data = ks_pool_resize(bm->pool, bm->data, data_length);
+ ks_assert(bm->data);
+ bm->data_size = data_length;
+ }
+ memcpy(bm->data, data, data_length);
+ bm->data_length = data_length;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length)
+{
+ ks_assert(bm);
+
+ *data = bm->data;
+ *data_length = bm->data_length;
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
ks_thread_pool_t *tpool;
blade_service_t *service;
+ ks_socket_t sock;
ks_bool_t shutdown;
+ ks_bool_t disconnecting;
kws_t *kws;
ks_thread_t *kws_thread;
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock)
{
+ kws_t *kws = NULL;
+
ks_assert(bp);
ks_assert(kws);
blade_peer_shutdown(bp);
- bp->kws = kws;
+ bp->sock = sock;
if (ks_thread_create_ex(&bp->kws_thread,
blade_peer_kws_thread,
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
{
+ blade_message_t *message = NULL;
+
ks_assert(bp);
bp->shutdown = KS_TRUE;
ks_thread_join(bp->kws_thread);
ks_pool_free(bp->pool, &bp->kws_thread);
}
+
+ while (ks_q_trypop(bp->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
+ while (ks_q_trypop(bp->messages_receiving, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
if (bp->kws) kws_destroy(&bp->kws);
+ else if (bp->sock != KS_SOCK_INVALID) ks_socket_close(&bp->sock);
+ bp->sock = KS_SOCK_INVALID;
bp->shutdown = KS_FALSE;
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp)
+{
+ ks_assert(bp);
+
+ bp->disconnecting = KS_TRUE;
+}
+
+KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp)
+{
+ ks_assert(bp);
+ return bp->disconnecting;
+}
+
+KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message)
+{
+ ks_assert(peer);
+ ks_assert(message);
+
+ *message = NULL;
+ return ks_q_trypop(peer->messages_receiving, (void **)message);
+}
+
+KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length)
+{
+ blade_message_t *message = NULL;
+
+ ks_assert(peer);
+ ks_assert(data);
+ ks_assert(data_length > 0);
+
+ if (blade_handle_message_claim(blade_service_handle(peer->service), &message, data, data_length) != KS_STATUS_SUCCESS || !message) {
+ // @todo error handling
+ // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+ peer->disconnecting = KS_TRUE;
+ return KS_STATUS_FAIL;
+ }
+ ks_q_push(peer->messages_sending, message);
+ return KS_STATUS_SUCCESS;
+}
+
void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
{
- blade_peer_t *peer;
+ blade_peer_t *peer = NULL;
kws_opcode_t opcode;
- uint8_t *data;
- ks_size_t data_len;
- blade_message_t *message;
+ uint8_t *frame_data = NULL;
+ ks_size_t frame_data_len = 0;
+ blade_message_t *message = NULL;
+ int32_t poll_flags = 0;
ks_assert(thread);
ks_assert(data);
peer = (blade_peer_t *)data;
+ // @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
+
+ if (kws_init(&peer->kws, peer->sock, NULL, NULL, KWS_BLOCK, peer->pool) != KS_STATUS_SUCCESS) {
+ peer->disconnecting = KS_TRUE;
+ return NULL;
+ }
+
while (!peer->shutdown) {
- // @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again
- // or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or
- // kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout))
- data_len = kws_read_frame(peer->kws, &opcode, &data);
-
- if (data_len <= 0) {
- // @todo error handling, strerror(ks_errno())
- // 0 means socket closed with WS_NONE, which closes websocket with no additional reason
- // -1 means socket closed with a general failure
- // -2 means nonblocking wait
- // other values are based on WS_XXX reasons
- // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
-
- // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
- // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
- // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
- // at the service level
- peer->disconnecting = KS_TRUE;
- break;
- }
+ // @todo get exact timeout from service config?
+ poll_flags = ks_wait_sock(peer->sock, 100, KS_POLL_READ | KS_POLL_ERROR);
- // @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data
- if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) {
- // @todo error handling
- // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+ if (poll_flags & KS_POLL_ERROR) {
+ // @todo switch this (and others below) to the enum for the state callback, called during the service connected peer cleanup
peer->disconnecting = KS_TRUE;
break;
}
-
- ks_q_push(peer->messages_receiving, message);
- // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
+ if (poll_flags & KS_POLL_READ) {
+ frame_data_len = kws_read_frame(peer->kws, &opcode, &frame_data);
+
+ if (frame_data_len <= 0) {
+ // @todo error handling, strerror(ks_errno())
+ // 0 means socket closed with WS_NONE, which closes websocket with no additional reason
+ // -1 means socket closed with a general failure
+ // -2 means nonblocking wait
+ // other values are based on WS_XXX reasons
+ // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
+
+ // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
+ // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
+ // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
+ // at the service level
+ peer->disconnecting = KS_TRUE;
+ break;
+ }
+
+ if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
+ // @todo error handling
+ // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+ peer->disconnecting = KS_TRUE;
+ break;
+ }
+
+ ks_q_push(peer->messages_receiving, message);
+ // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
+ // might not perfectly fit the traditional state callback, but it could work if it only sends the state temporarily and does NOT actually change
+ // the internal state to receiving
+ }
- if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) {
+ // @todo consider only sending one message at a time and use shorter polling timeout to prevent any considerable blocking if send buffers get full
+ while (ks_q_trypop(peer->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) {
+ blade_message_get(message, (void **)&frame_data, &frame_data_len);
+ kws_write_frame(peer->kws, WSOC_TEXT, frame_data, frame_data_len);
}
}
bspvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
+ blade_handle_t *handle;
ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle)
{
bspvt_flag_t newflags = BS_NONE;
blade_service_t *bs = NULL;
+ ks_assert(bsP);
+ ks_assert(handle);
+
if (!pool) {
newflags |= BS_MYPOOL;
ks_pool_open(&pool);
bs->flags = newflags;
bs->pool = pool;
bs->tpool = tpool;
+ bs->handle = handle;
list_init(&bs->connected);
*bsP = bs;
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs)
+{
+ ks_assert(bs);
+ return bs->handle;
+}
+
ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config)
{
config_setting_t *websockets = NULL;
void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
{
- blade_service_t *service;
+ blade_service_t *service = NULL;
ks_assert(thread);
ks_assert(data);
service = (blade_service_t *)data;
while (!service->shutdown) {
+ // @todo take exact timeout from a setting in config_service_endpoints
if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
for (int32_t index = 0; index < service->listeners_length; ++index) {
- ks_socket_t sock;
- ks_sockaddr_t raddr;
- socklen_t slen = 0;
- kws_t *kws = NULL;
+ ks_socket_t sock = KS_SOCK_INVALID;
blade_peer_t *peer = NULL;
if (!(service->listeners_poll[index].revents & POLLIN)) continue;
if (service->listeners_poll[index].revents & POLLERR) {
- // @todo: error handling, just skip the listener for now
+ // @todo: error handling, just skip the listener for now, it might recover, could skip X sanity times before closing?
continue;
}
- if (service->listeners_families[index] == AF_INET) {
- slen = sizeof(raddr.v.v4);
- if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) {
- // @todo: error handling, just skip the socket for now
- continue;
- }
- raddr.family = AF_INET;
- } else {
- slen = sizeof(raddr.v.v6);
- if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) {
- // @todo: error handling, just skip the socket for now
- continue;
- }
- raddr.family = AF_INET6;
- }
-
- ks_addr_get_host(&raddr);
- ks_addr_get_port(&raddr);
-
- // @todo: SSL init stuffs based on data from service->config_websockets_ssl
-
- if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) {
- // @todo: error handling, just close and skip the socket for now
- ks_socket_close(&sock);
+ if ((sock = accept(service->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) {
+ // @todo: error handling, just skip the socket for now as most causes are because the remote side suddenly became unreachable
continue;
}
- blade_peer_create(&peer, service->pool, service->tpool);
+ // @todo consider a recycle queue of peers per service, and only have to call startup when one is already available
+ // blade_service_peer_claim(service, &peer);
+ blade_peer_create(&peer, service->pool, service->tpool, service);
ks_assert(peer);
- // @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread
+ // @todo call state callback with connecting enum state
- list_append(&service->connected, peer);
+ blade_peer_startup(peer, sock);
- blade_peer_startup(peer, kws);
+ list_append(&service->connected, peer);
+ }
+ }
+
+ list_iterator_start(&service->connected);
+ while (list_iterator_hasnext(&service->connected)) {
+ blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&service->connected);
+ // @todo expose accessor for disconnecting, after changing it into the state callback enum
+ // ensure that every way kws_close might occur leads back to disconnecting = KS_TRUE for this to universally process disconnects
+ if (blade_peer_disconnecting(peer)) {
+ // @todo check if there is an iterator based remove function, or indexed iteration to use list_delete_at()
+ list_delete(&service->connected, peer);
+ // @todo call state callback with internal disconnecting enum state (stored in peer to hold specific reason for disconnecting)
+ // @todo switch to blade_peer_shutdown(&peer) and blade_peer_discard(&peer) after introducing recycling of peers
+ blade_peer_destroy(&peer);
}
}
+ list_iterator_stop(&service->connected);
}
return NULL;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
+ config_setting_t *config_service;
config_setting_t *config_datastore;
- config_setting_t *config_directory;
-
- //blade_peer_t *peer;
- blade_directory_t *directory;
+
+ ks_q_t *messages_discarded;
+ blade_service_t *service;
+
blade_datastore_t *datastore;
};
pool = bh->pool;
blade_handle_shutdown(bh);
-
- //blade_peer_destroy(&bh->peer);
+
+ if (bh->messages_discarded) {
+ // @todo make sure messages are cleaned up
+ ks_q_destroy(&bh->messages_discarded);
+ }
+
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
bh->flags = newflags;
bh->pool = pool;
bh->tpool = tpool;
- //blade_peer_create(&bh->peer, bh->pool, bh->tpool);
+
+ // @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages
+ ks_q_create(&bh->messages_discarded, bh->pool, 100);
+ ks_assert(bh->messages_discarded);
*bhP = bh;
ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
{
+ config_setting_t *service = NULL;
config_setting_t *datastore = NULL;
- config_setting_t *directory = NULL;
ks_assert(bh);
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+ // @todo config for messages_discarded threshold (ie, message count, message memory, etc)
+
+ service = config_setting_get_member(config, "service");
+
datastore = config_setting_get_member(config, "datastore");
//if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
-
- directory = config_setting_get_member(config, "directory");
- //if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL;
+
+ bh->config_service = service;
bh->config_datastore = datastore;
- bh->config_directory = directory;
return KS_STATUS_SUCCESS;
}
ks_assert(bh);
if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ if (bh->config_service && !blade_handle_service_available(bh)) {
+ blade_service_create(&bh->service, bh->pool, bh->tpool, bh);
+ ks_assert(bh->service);
+ if (blade_service_startup(bh->service, bh->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ }
if (bh->config_datastore && !blade_handle_datastore_available(bh)) {
blade_datastore_create(&bh->datastore, bh->pool, bh->tpool);
- blade_datastore_startup(bh->datastore, bh->config_datastore);
- }
-
- if (bh->config_directory && !blade_handle_directory_available(bh)) {
- blade_directory_create(&bh->directory, bh->pool, bh->tpool);
- blade_directory_startup(bh->directory, config);
+ ks_assert(bh->datastore);
+ if (blade_datastore_startup(bh->datastore, bh->config_datastore) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
{
ks_assert(bh);
-
- if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory);
+
+ if (blade_handle_service_available(bh)) blade_service_destroy(&bh->service);
if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
+KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length)
{
+ blade_message_t *msg = NULL;
+
ks_assert(bh);
+ ks_assert(message);
+ ks_assert(data);
- return bh->datastore != NULL;
+ *message = NULL;
+
+ if (ks_q_trypop(bh->messages_discarded, (void **)&msg) != KS_STATUS_SUCCESS || !msg) blade_message_create(&msg, bh->pool, bh);
+ ks_assert(msg);
+
+ if (blade_message_set(msg, data, data_length) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ *message = msg;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message)
+{
+ ks_assert(bh);
+ ks_assert(message);
+ ks_assert(*message);
+
+ // @todo check thresholds for discarded messages, if the queue is full just destroy the message for now (currently 100 messages)
+ if (ks_q_push(bh->messages_discarded, *message) != KS_STATUS_SUCCESS) blade_message_destroy(message);
+
+ *message = NULL;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh)
+{
+ ks_assert(bh);
+
+ return bh->service != NULL;
}
-KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh)
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
{
ks_assert(bh);
- return bh->directory != NULL;
+ return bh->datastore != NULL;
}
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
#include "blade_stack.h"
#include "blade_peer.h"
#include "blade_service.h"
+#include "blade_message.h"
#include "blade_datastore.h"
-#include "blade_directory.h"
#include "bpcp.h"
KS_BEGIN_EXTERN_C
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-#ifndef _BLADE_DIRECTORY_H_
-#define _BLADE_DIRECTORY_H_
+#ifndef _BLADE_MESSAGE_H_
+#define _BLADE_MESSAGE_H_
#include <blade.h>
-#define BLADE_DIRECTORY_TPOOL_MIN 2
-#define BLADE_DIRECTORY_TPOOL_MAX 8
-#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256)
-#define BLADE_DIRECTORY_TPOOL_IDLE 10
-
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool);
-KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP);
-KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config);
-KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd);
+KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle);
+KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP);
+KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length);
+KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length);
+KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm);
KS_END_EXTERN_C
#endif
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-#ifndef _BPCP_H_
-#define _BPCP_H_
+#ifndef _BLADE_PEER_H_
+#define _BLADE_PEER_H_
#include <blade.h>
#define BLADE_PEER_TPOOL_MIN 2
#define BLADE_PEER_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
-KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws);
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock);
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
+KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp);
+KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp);
+KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message);
+KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length);
KS_END_EXTERN_C
#endif
#define BLADE_SERVICE_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle);
KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
+KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs);
KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
KS_END_EXTERN_C
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
-KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
-KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh);
+KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length);
+KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message);
+
+KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh);
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
blade_datastore_fetch_callback_t callback,
typedef struct blade_handle_s blade_handle_t;
typedef struct blade_peer_s blade_peer_t;
typedef struct blade_service_s blade_service_t;
+typedef struct blade_message_s blade_message_t;
typedef struct blade_datastore_s blade_datastore_t;
-typedef struct blade_directory_s blade_directory_t;
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);