]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9952: More work on the blade service transport layer, now compiles but is untested...
authorShane Bryldt <astaelan@gmail.com>
Mon, 30 Jan 2017 05:02:58 +0000 (05:02 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 22 Mar 2017 21:42:49 +0000 (17:42 -0400)
12 files changed:
libs/libblade/Makefile.am
libs/libblade/src/blade_directory.c [deleted file]
libs/libblade/src/blade_message.c [new file with mode: 0644]
libs/libblade/src/blade_peer.c
libs/libblade/src/blade_service.c
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade.h
libs/libblade/src/include/blade_message.h [moved from libs/libblade/src/include/blade_directory.h with 76% similarity]
libs/libblade/src/include/blade_peer.h
libs/libblade/src/include/blade_service.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_types.h

index f55b1266bb1ded5c03d9a9a2a05ed5d987af2220..ee5e5677d01cdd941d529159f19da196b3951be1 100644 (file)
@@ -11,13 +11,14 @@ libunqlite_la_CFLAGS    = -DUNQLITE_ENABLE_THREADS
 libunqlite_la_LIBADD    = -lpthread
 
 lib_LTLIBRARIES                = libblade.la
-libblade_la_SOURCES     = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c
+libblade_la_SOURCES     = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c
+libblade_la_SOURCES    += src/blade_message.c
 libblade_la_CFLAGS     = $(AM_CFLAGS) $(AM_CPPFLAGS)
 libblade_la_LDFLAGS     = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
 libblade_la_LIBADD      = libunqlite.la
 library_includedir     = $(prefix)/include
 library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
-library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h
+library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_message.h
 library_include_HEADERS += src/include/unqlite.h test/tap.h
 
 tests: libblade.la
diff --git a/libs/libblade/src/blade_directory.c b/libs/libblade/src/blade_directory.c
deleted file mode 100644 (file)
index ab24942..0000000
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright (c) 2007-2014, Anthony Minessale II
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * 
- * * Neither the name of the original author; nor the names of any contributors
- * may be used to endorse or promote products derived from this software
- * without specific prior written permission.
- * 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
- * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
- * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "blade.h"
-
-
-typedef enum {
-       BD_NONE = 0,
-       BD_MYPOOL = (1 << 0),
-       BD_MYTPOOL = (1 << 1),
-} bdpvt_flag_t;
-
-struct blade_directory_s {
-       bdpvt_flag_t flags;
-       ks_pool_t *pool;
-       ks_thread_pool_t *tpool;
-       
-       config_setting_t *config_service;
-       
-       blade_service_t *service;
-};
-
-
-
-
-KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP)
-{
-       blade_directory_t *bd = NULL;
-       bdpvt_flag_t flags;
-       ks_pool_t *pool;
-
-       ks_assert(bdP);
-
-       bd = *bdP;
-       *bdP = NULL;
-
-       ks_assert(bd);
-
-       flags = bd->flags;
-       pool = bd->pool;
-
-       blade_directory_shutdown(bd);
-
-    if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool);
-       
-       ks_pool_free(bd->pool, &bd);
-
-       if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool);
-
-       return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool)
-{
-       bdpvt_flag_t newflags = BD_NONE;
-       blade_directory_t *bd = NULL;
-
-       if (!pool) {
-               newflags |= BD_MYPOOL;
-               ks_pool_open(&pool);
-               ks_assert(pool);
-       }
-       // @todo: move thread pool creation to startup which allows thread pool to be configurable
-    if (!tpool) {
-               newflags |= BD_MYTPOOL;
-               ks_thread_pool_create(&tpool,
-                                                         BLADE_DIRECTORY_TPOOL_MIN,
-                                                         BLADE_DIRECTORY_TPOOL_MAX,
-                                                         BLADE_DIRECTORY_TPOOL_STACK,
-                                                         KS_PRI_NORMAL,
-                                                         BLADE_DIRECTORY_TPOOL_IDLE);
-               ks_assert(tpool);
-       }
-       
-       bd = ks_pool_alloc(pool, sizeof(*bd));
-       bd->flags = newflags;
-       bd->pool = pool;
-       bd->tpool = tpool;
-       *bdP = bd;
-
-       return KS_STATUS_SUCCESS;
-}
-
-ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config)
-{
-       config_setting_t *service = NULL;
-
-       ks_assert(bd);
-
-       if (!config) return KS_STATUS_FAIL;
-       if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
-       
-       service = config_setting_get_member(config, "service");
-       if (!service) return KS_STATUS_FAIL;
-
-       bd->config_service = service;
-       
-       return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config)
-{
-       ks_assert(bd);
-
-       blade_directory_shutdown(bd);
-       
-       if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-
-       blade_service_create(&bd->service, bd->pool, bd->tpool);
-       ks_assert(bd->service);
-       if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       
-       return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd)
-{
-       ks_assert(bd);
-
-       if (bd->service) blade_service_destroy(&bd->service);
-
-       return KS_STATUS_SUCCESS;
-}
-
-
-/* For Emacs:
- * Local Variables:
- * mode:c
- * indent-tabs-mode:t
- * tab-width:4
- * c-basic-offset:4
- * End:
- * For VIM:
- * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
- */
diff --git a/libs/libblade/src/blade_message.c b/libs/libblade/src/blade_message.c
new file mode 100644 (file)
index 0000000..0bf4013
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+struct blade_message_s {
+       ks_pool_t *pool;
+       blade_handle_t *handle;
+
+       void *data;
+       ks_size_t data_length;
+       ks_size_t data_size;
+};
+
+
+KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP)
+{
+       blade_message_t *bm = NULL;
+
+       ks_assert(bmP);
+
+       bm = *bmP;
+       *bmP = NULL;
+
+       ks_assert(bm);
+
+       if (bm->data) ks_pool_free(bm->pool, &bm->data);
+       
+       ks_pool_free(bm->pool, &bm);
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle)
+{
+       blade_message_t *bm = NULL;
+
+       ks_assert(bmP);
+       ks_assert(pool);
+       ks_assert(handle);
+
+       bm = ks_pool_alloc(pool, sizeof(*bm));
+       bm->pool = pool;
+       bm->handle = handle;
+       *bmP = bm;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm)
+{
+       ks_assert(bm);
+       ks_assert(*bm);
+
+       return blade_handle_message_discard((*bm)->handle, bm);
+}
+
+KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length)
+{
+       ks_assert(bm);
+       ks_assert(data);
+       ks_assert(data_length > 0);
+
+       // @todo fail on a max message size?
+
+       if (data_length > bm->data_size) {
+               // @todo talk to tony about adding flags to ks_pool_resize_ex to prevent the memcpy, don't need to copy old memory here
+               // otherwise switch to a new allocation instead of resizing
+               bm->data = ks_pool_resize(bm->pool, bm->data, data_length);
+               ks_assert(bm->data);
+               bm->data_size = data_length;
+       }
+       memcpy(bm->data, data, data_length);
+       bm->data_length = data_length;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length)
+{
+       ks_assert(bm);
+
+       *data = bm->data;
+       *data_length = bm->data_length;
+
+       return KS_STATUS_SUCCESS;
+}
+
+       
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 2e794a629b51ce9086c5505230762298f1399283..942841ce64a0094c1f91d89e03147a485882f589 100644 (file)
@@ -45,7 +45,9 @@ struct blade_peer_s {
        ks_thread_pool_t *tpool;
        blade_service_t *service;
 
+       ks_socket_t sock;
        ks_bool_t shutdown;
+       ks_bool_t disconnecting;
        kws_t *kws;
        ks_thread_t *kws_thread;
 
@@ -118,8 +120,10 @@ KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, k
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock)
 {
+       kws_t *kws = NULL;
+       
        ks_assert(bp);
        ks_assert(kws);
 
@@ -127,7 +131,7 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
        
        blade_peer_shutdown(bp);
        
-       bp->kws = kws;
+       bp->sock = sock;
 
     if (ks_thread_create_ex(&bp->kws_thread,
                                                        blade_peer_kws_thread,
@@ -142,6 +146,8 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
 
 KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
 {
+       blade_message_t *message = NULL;
+       
        ks_assert(bp);
 
        bp->shutdown = KS_TRUE;
@@ -150,61 +156,125 @@ KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
                ks_thread_join(bp->kws_thread);
                ks_pool_free(bp->pool, &bp->kws_thread);
        }
+
+       while (ks_q_trypop(bp->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
+       while (ks_q_trypop(bp->messages_receiving, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
        
        if (bp->kws) kws_destroy(&bp->kws);
+       else if (bp->sock != KS_SOCK_INVALID) ks_socket_close(&bp->sock);
+       bp->sock = KS_SOCK_INVALID;
        
        bp->shutdown = KS_FALSE;
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp)
+{
+       ks_assert(bp);
+
+       bp->disconnecting = KS_TRUE;
+}
+
+KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp)
+{
+       ks_assert(bp);
+       return bp->disconnecting;
+}
+
+KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message)
+{
+       ks_assert(peer);
+       ks_assert(message);
+
+       *message = NULL;
+       return ks_q_trypop(peer->messages_receiving, (void **)message);
+}
+
+KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length)
+{
+       blade_message_t *message = NULL;
+
+       ks_assert(peer);
+       ks_assert(data);
+       ks_assert(data_length > 0);
+       
+       if (blade_handle_message_claim(blade_service_handle(peer->service), &message, data, data_length) != KS_STATUS_SUCCESS || !message) {
+               // @todo error handling
+               // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+               peer->disconnecting = KS_TRUE;
+               return KS_STATUS_FAIL;
+       }
+       ks_q_push(peer->messages_sending, message);
+       return KS_STATUS_SUCCESS;
+}
+
 void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
 {
-       blade_peer_t *peer;
+       blade_peer_t *peer = NULL;
        kws_opcode_t opcode;
-       uint8_t *data;
-       ks_size_t data_len;
-       blade_message_t *message;
+       uint8_t *frame_data = NULL;
+       ks_size_t frame_data_len = 0;
+       blade_message_t *message = NULL;
+       int32_t poll_flags = 0;
 
        ks_assert(thread);
        ks_assert(data);
 
        peer = (blade_peer_t *)data;
 
+       // @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
+       
+       if (kws_init(&peer->kws, peer->sock, NULL, NULL, KWS_BLOCK, peer->pool) != KS_STATUS_SUCCESS) {
+               peer->disconnecting = KS_TRUE;
+               return NULL;
+       }
+       
        while (!peer->shutdown) {
-               // @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again
-               // or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or
-               // kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout))
-               data_len = kws_read_frame(peer->kws, &opcode, &data);
-
-               if (data_len <= 0) {
-                       // @todo error handling, strerror(ks_errno())
-                       // 0 means socket closed with WS_NONE, which closes websocket with no additional reason
-                       // -1 means socket closed with a general failure
-                       // -2 means nonblocking wait
-                       // other values are based on WS_XXX reasons
-                       // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
-                       
-                       // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
-                       // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
-                       // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
-                       // at the service level
-                       peer->disconnecting = KS_TRUE;
-                       break;
-               }
+               // @todo get exact timeout from service config?
+               poll_flags = ks_wait_sock(peer->sock, 100, KS_POLL_READ | KS_POLL_ERROR);
 
-               // @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data
-               if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) {
-                       // @todo error handling
-                       // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+               if (poll_flags & KS_POLL_ERROR) {
+                       // @todo switch this (and others below) to the enum for the state callback, called during the service connected peer cleanup
                        peer->disconnecting = KS_TRUE;
                        break;
                }
-               
-               ks_q_push(peer->messages_receiving, message);
-               // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
 
+               if (poll_flags & KS_POLL_READ) {
+                       frame_data_len = kws_read_frame(peer->kws, &opcode, &frame_data);
+                       
+                       if (frame_data_len <= 0) {
+                               // @todo error handling, strerror(ks_errno())
+                               // 0 means socket closed with WS_NONE, which closes websocket with no additional reason
+                               // -1 means socket closed with a general failure
+                               // -2 means nonblocking wait
+                               // other values are based on WS_XXX reasons
+                               // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
+                       
+                               // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
+                               // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
+                               // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
+                               // at the service level
+                               peer->disconnecting = KS_TRUE;
+                               break;
+                       }
+
+                       if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
+                               // @todo error handling
+                               // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+                               peer->disconnecting = KS_TRUE;
+                               break;
+                       }
+               
+                       ks_q_push(peer->messages_receiving, message);
+                       // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
+                       // might not perfectly fit the traditional state callback, but it could work if it only sends the state temporarily and does NOT actually change
+                       // the internal state to receiving
+               }
 
-               if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) {
+               // @todo consider only sending one message at a time and use shorter polling timeout to prevent any considerable blocking if send buffers get full
+               while (ks_q_trypop(peer->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) {
+                       blade_message_get(message, (void **)&frame_data, &frame_data_len);
+                       kws_write_frame(peer->kws, WSOC_TEXT, frame_data, frame_data_len);
                }
        }
        
index e1361bf9dff307a026b8bea71720cb159a87cd68..dc7dcadbf6f820d7629a12df04b81c97adf3d26d 100644 (file)
@@ -45,6 +45,7 @@ struct blade_service_s {
        bspvt_flag_t flags;
        ks_pool_t *pool;
        ks_thread_pool_t *tpool;
+       blade_handle_t *handle;
 
        ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
        ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
@@ -97,11 +98,14 @@ KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle)
 {
        bspvt_flag_t newflags = BS_NONE;
        blade_service_t *bs = NULL;
 
+       ks_assert(bsP);
+       ks_assert(handle);
+
        if (!pool) {
                newflags |= BS_MYPOOL;
                ks_pool_open(&pool);
@@ -117,12 +121,19 @@ KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *p
        bs->flags = newflags;
        bs->pool = pool;
        bs->tpool = tpool;
+       bs->handle = handle;
        list_init(&bs->connected);
        *bsP = bs;
 
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs)
+{
+       ks_assert(bs);
+       return bs->handle;
+}
+
 ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config)
 {
        config_setting_t *websockets = NULL;
@@ -325,7 +336,7 @@ ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
 
 void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
 {
-       blade_service_t *service;
+       blade_service_t *service = NULL;
 
        ks_assert(thread);
        ks_assert(data);
@@ -333,57 +344,50 @@ void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
        service = (blade_service_t *)data;
        
        while (!service->shutdown) {
+               // @todo take exact timeout from a setting in config_service_endpoints
                if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
                        for (int32_t index = 0; index < service->listeners_length; ++index) {
-                               ks_socket_t sock;
-                               ks_sockaddr_t raddr;
-                               socklen_t slen = 0;
-                               kws_t *kws = NULL;
+                               ks_socket_t sock = KS_SOCK_INVALID;
                                blade_peer_t *peer = NULL;
 
                                if (!(service->listeners_poll[index].revents & POLLIN)) continue;
                                if (service->listeners_poll[index].revents & POLLERR) {
-                                       // @todo: error handling, just skip the listener for now
+                                       // @todo: error handling, just skip the listener for now, it might recover, could skip X sanity times before closing?
                                        continue;
                                }
 
-                               if (service->listeners_families[index] == AF_INET) {
-                                       slen = sizeof(raddr.v.v4);
-                                       if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) {
-                                               // @todo: error handling, just skip the socket for now
-                                               continue;
-                                       }
-                                       raddr.family = AF_INET;
-                               } else {
-                                       slen = sizeof(raddr.v.v6);
-                                       if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) {
-                                               // @todo: error handling, just skip the socket for now
-                                               continue;
-                                       }
-                                       raddr.family = AF_INET6;
-                               }
-
-                               ks_addr_get_host(&raddr);
-                               ks_addr_get_port(&raddr);
-
-                               // @todo: SSL init stuffs based on data from service->config_websockets_ssl
-                               
-                               if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) {
-                                       // @todo: error handling, just close and skip the socket for now
-                                       ks_socket_close(&sock);
+                               if ((sock = accept(service->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) {
+                                       // @todo: error handling, just skip the socket for now as most causes are because the remote side suddenly became unreachable
                                        continue;
                                }
 
-                               blade_peer_create(&peer, service->pool, service->tpool);
+                               // @todo consider a recycle queue of peers per service, and only have to call startup when one is already available
+                               // blade_service_peer_claim(service, &peer);
+                               blade_peer_create(&peer, service->pool, service->tpool, service);
                                ks_assert(peer);
 
-                               // @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread
+                               // @todo call state callback with connecting enum state
                                
-                               list_append(&service->connected, peer);
+                               blade_peer_startup(peer, sock);
                                
-                               blade_peer_startup(peer, kws);
+                               list_append(&service->connected, peer);
+                       }
+               }
+
+               list_iterator_start(&service->connected);
+               while (list_iterator_hasnext(&service->connected)) {
+                       blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&service->connected);
+                       // @todo expose accessor for disconnecting, after changing it into the state callback enum
+                       // ensure that every way kws_close might occur leads back to disconnecting = KS_TRUE for this to universally process disconnects
+                       if (blade_peer_disconnecting(peer)) {
+                               // @todo check if there is an iterator based remove function, or indexed iteration to use list_delete_at()
+                               list_delete(&service->connected, peer);
+                               // @todo call state callback with internal disconnecting enum state (stored in peer to hold specific reason for disconnecting)
+                               // @todo switch to blade_peer_shutdown(&peer) and blade_peer_discard(&peer) after introducing recycling of peers
+                               blade_peer_destroy(&peer);
                        }
                }
+               list_iterator_stop(&service->connected);
        }
        
        return NULL;
index 5f7c8a7d730e27b973569b28be0911f1bd3c51ce..b262336bf9d0729eebe9bb4a5b532f8294b9e4b3 100644 (file)
@@ -44,11 +44,12 @@ struct blade_handle_s {
        ks_pool_t *pool;
        ks_thread_pool_t *tpool;
        
+       config_setting_t *config_service;
        config_setting_t *config_datastore;
-       config_setting_t *config_directory;
-       
-       //blade_peer_t *peer;
-       blade_directory_t *directory;
+
+       ks_q_t *messages_discarded;
+       blade_service_t *service;
+
        blade_datastore_t *datastore;
 };
 
@@ -70,8 +71,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
        pool = bh->pool;
 
        blade_handle_shutdown(bh);
-       
-       //blade_peer_destroy(&bh->peer);
+
+       if (bh->messages_discarded) {
+               // @todo make sure messages are cleaned up
+               ks_q_destroy(&bh->messages_discarded);
+       }
+
     if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
 
        ks_pool_free(bh->pool, &bh);
@@ -104,7 +109,10 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
        bh->flags = newflags;
        bh->pool = pool;
        bh->tpool = tpool;
-       //blade_peer_create(&bh->peer, bh->pool, bh->tpool);
+
+       // @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages
+       ks_q_create(&bh->messages_discarded, bh->pool, 100);
+       ks_assert(bh->messages_discarded);
 
        *bhP = bh;
 
@@ -113,22 +121,24 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
 
 ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
 {
+       config_setting_t *service = NULL;
        config_setting_t *datastore = NULL;
-       config_setting_t *directory = NULL;
        
        ks_assert(bh);
 
        if (!config) return KS_STATUS_FAIL;
     if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
 
+       // @todo config for messages_discarded threshold (ie, message count, message memory, etc)
+
+    service = config_setting_get_member(config, "service");
+       
     datastore = config_setting_get_member(config, "datastore");
     //if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
-       
-    directory = config_setting_get_member(config, "directory");
-    //if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL;
 
+
+       bh->config_service = service;
        bh->config_datastore = datastore;
-       bh->config_directory = directory;
        
        return KS_STATUS_SUCCESS;
 }
@@ -138,15 +148,17 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
        ks_assert(bh);
 
     if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       if (bh->config_service && !blade_handle_service_available(bh)) {
+               blade_service_create(&bh->service, bh->pool, bh->tpool, bh);
+               ks_assert(bh->service);
+               if (blade_service_startup(bh->service, bh->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       }
        
        if (bh->config_datastore && !blade_handle_datastore_available(bh)) {
                blade_datastore_create(&bh->datastore, bh->pool, bh->tpool);
-               blade_datastore_startup(bh->datastore, bh->config_datastore);
-       }
-       
-       if (bh->config_directory && !blade_handle_directory_available(bh)) {
-               blade_directory_create(&bh->directory, bh->pool, bh->tpool);
-               blade_directory_startup(bh->directory, config);
+               ks_assert(bh->datastore);
+               if (blade_datastore_startup(bh->datastore, bh->config_datastore) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
        }
        
        return KS_STATUS_SUCCESS;
@@ -155,26 +167,60 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
 KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
 {
        ks_assert(bh);
-       
-       if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory);
+
+       if (blade_handle_service_available(bh)) blade_service_destroy(&bh->service);
        
        if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
        
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
+KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length)
 {
+       blade_message_t *msg = NULL;
+       
        ks_assert(bh);
+       ks_assert(message);
+       ks_assert(data);
 
-       return bh->datastore != NULL;
+       *message = NULL;
+       
+       if (ks_q_trypop(bh->messages_discarded, (void **)&msg) != KS_STATUS_SUCCESS || !msg) blade_message_create(&msg, bh->pool, bh);
+       ks_assert(msg);
+
+       if (blade_message_set(msg, data, data_length) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       
+       *message = msg;
+               
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message)
+{
+       ks_assert(bh);
+       ks_assert(message);
+       ks_assert(*message);
+
+       // @todo check thresholds for discarded messages, if the queue is full just destroy the message for now (currently 100 messages)
+       if (ks_q_push(bh->messages_discarded, *message) != KS_STATUS_SUCCESS) blade_message_destroy(message);
+
+       *message = NULL;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh)
+{
+       ks_assert(bh);
+
+       return bh->service != NULL;
 }
 
-KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh)
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
 {
        ks_assert(bh);
 
-       return bh->directory != NULL;
+       return bh->datastore != NULL;
 }
 
 KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
index 578d41fee0f72050e30ca82178070fc8052ecd04..f8a3a4459f076bf06d9aadcb6172aa9d229794d9 100644 (file)
@@ -42,8 +42,8 @@
 #include "blade_stack.h"
 #include "blade_peer.h"
 #include "blade_service.h"
+#include "blade_message.h"
 #include "blade_datastore.h"
-#include "blade_directory.h"
 #include "bpcp.h"
 
 KS_BEGIN_EXTERN_C
similarity index 76%
rename from libs/libblade/src/include/blade_directory.h
rename to libs/libblade/src/include/blade_message.h
index e0a66980f3b44204fde1d790064ed6befaab47ba..5db9c60c22633a50f98df847776225e89df37cba 100644 (file)
  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-#ifndef _BLADE_DIRECTORY_H_
-#define _BLADE_DIRECTORY_H_
+#ifndef _BLADE_MESSAGE_H_
+#define _BLADE_MESSAGE_H_
 #include <blade.h>
 
-#define BLADE_DIRECTORY_TPOOL_MIN 2
-#define BLADE_DIRECTORY_TPOOL_MAX 8
-#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256)
-#define BLADE_DIRECTORY_TPOOL_IDLE 10
-
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool);
-KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP);
-KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config);
-KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd);
+KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle);
+KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP);
+KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length);
+KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length);
+KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm);
 KS_END_EXTERN_C
 
 #endif
index 8a15044ac525a6b0cfb2356f7fbb4306b940dc29..acffc3799828316bc0e80bcf7a6fbd9c9e22ac45 100644 (file)
@@ -31,8 +31,8 @@
  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-#ifndef _BPCP_H_
-#define _BPCP_H_
+#ifndef _BLADE_PEER_H_
+#define _BLADE_PEER_H_
 #include <blade.h>
 
 #define BLADE_PEER_TPOOL_MIN 2
 #define BLADE_PEER_TPOOL_IDLE 10
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service);
 KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
-KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws);
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock);
 KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
+KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp);
+KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp);
+KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message);
+KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length);
 KS_END_EXTERN_C
 
 #endif
index 9ec014f2777cf79edec81c6bf1bb614334560976..d6f1de40b5afdeb1c2501d11e92b0eb4b3453202 100644 (file)
@@ -41,8 +41,9 @@
 #define BLADE_SERVICE_TPOOL_IDLE 10
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle);
 KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
+KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs);
 KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
 KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
 KS_END_EXTERN_C
index 1eb4229d726cc8d0ad46ee09635f600de507e6b2..507d8a12f75a59eea0b495f6f2f7b941362dda2f 100644 (file)
@@ -46,9 +46,12 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
 KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
 KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
 
-KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
-KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh);
+KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length);
+KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message);
+
+KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh);
 
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
 KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
 KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
                                                                                                         blade_datastore_fetch_callback_t callback,
index 0cb2ec8e3ef6824627d45f9d6b5641738b3cc087..e7484d0f389d8be1a42a0f5273834a9a7aec01c7 100644 (file)
@@ -40,8 +40,8 @@ KS_BEGIN_EXTERN_C
 typedef struct blade_handle_s blade_handle_t;
 typedef struct blade_peer_s blade_peer_t;
 typedef struct blade_service_s blade_service_t;
+typedef struct blade_message_s blade_message_t;
 typedef struct blade_datastore_s blade_datastore_t;
-typedef struct blade_directory_s blade_directory_t;
 
 typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);