]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9952: Temporary commit for some peer review
authorShane Bryldt <astaelan@gmail.com>
Fri, 27 Jan 2017 19:49:02 +0000 (19:49 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 22 Mar 2017 21:42:49 +0000 (17:42 -0400)
14 files changed:
libs/libblade/Makefile.am
libs/libblade/src/blade_datastore.c
libs/libblade/src/blade_directory.c [new file with mode: 0644]
libs/libblade/src/blade_peer.c
libs/libblade/src/blade_service.c [new file with mode: 0644]
libs/libblade/src/blade_stack.c
libs/libblade/src/include/blade.h
libs/libblade/src/include/blade_datastore.h
libs/libblade/src/include/blade_directory.h [new file with mode: 0644]
libs/libblade/src/include/blade_peer.h
libs/libblade/src/include/blade_service.h [new file with mode: 0644]
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_types.h
libs/libblade/test/bladec.c

index fe4c96b10857b3d0287614c1ccc88b246f558e45..f55b1266bb1ded5c03d9a9a2a05ed5d987af2220 100644 (file)
@@ -11,13 +11,13 @@ libunqlite_la_CFLAGS    = -DUNQLITE_ENABLE_THREADS
 libunqlite_la_LIBADD    = -lpthread
 
 lib_LTLIBRARIES                = libblade.la
-libblade_la_SOURCES     = src/blade.c src/blade_stack.c src/blade_peer.c src/bpcp.c src/blade_datastore.c
+libblade_la_SOURCES     = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c
 libblade_la_CFLAGS     = $(AM_CFLAGS) $(AM_CPPFLAGS)
-libblade_la_LDFLAGS     = -version-info 0:1:0 -lncurses -lpthread -lm $(AM_LDFLAGS)
+libblade_la_LDFLAGS     = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
 libblade_la_LIBADD      = libunqlite.la
 library_includedir     = $(prefix)/include
-library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/bpcp.h
-library_include_HEADERS += src/include/blade_datastore.h
+library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
+library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h
 library_include_HEADERS += src/include/unqlite.h test/tap.h
 
 tests: libblade.la
index d97cd9840dba448b3b6567c279f22d950fd91ea5..49c3cbddb64d42eb78b4b6c71fed3b5dd2baf233 100644 (file)
 typedef enum {
        BDS_NONE = 0,
        BDS_MYPOOL = (1 << 0),
+       BDS_MYTPOOL = (1 << 1),
 } bdspvt_flag_t;
 
 struct blade_datastore_s {
        bdspvt_flag_t flags;
        ks_pool_t *pool;
+       ks_thread_pool_t *tpool;
+       
+       const char *config_database_path;
+       //config_setting_t *config_service;
+       
        unqlite *db;
+       //blade_service_t *service;
 };
 
 struct blade_datastore_fetch_userdata_s
@@ -71,11 +78,10 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
        flags = bds->flags;
        pool = bds->pool;
 
-       if (bds->db) {
-               unqlite_close(bds->db);
-               bds->db = NULL;
-       }
+       blade_datastore_shutdown(bds);
 
+    if (bds->tpool && (flags & BDS_MYTPOOL)) ks_thread_pool_destroy(&bds->tpool);
+       
        ks_pool_free(bds->pool, &bds);
 
        if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool);
@@ -83,7 +89,7 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool)
+KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
 {
        bdspvt_flag_t newflags = BDS_NONE;
        blade_datastore_t *bds = NULL;
@@ -93,16 +99,67 @@ KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool
                ks_pool_open(&pool);
                ks_assert(pool);
        }
-
+       // @todo: move thread pool creation to startup which allows thread pool to be configurable
+    if (!tpool) {
+               newflags |= BDS_MYTPOOL;
+               ks_thread_pool_create(&tpool,
+                                                         BLADE_DATASTORE_TPOOL_MIN,
+                                                         BLADE_DATASTORE_TPOOL_MAX,
+                                                         BLADE_DATASTORE_TPOOL_STACK,
+                                                         KS_PRI_NORMAL,
+                                                         BLADE_DATASTORE_TPOOL_IDLE);
+               ks_assert(tpool);
+       }
+       
        bds = ks_pool_alloc(pool, sizeof(*bds));
        bds->flags = newflags;
        bds->pool = pool;
+       bds->tpool = tpool;
        *bdsP = bds;
 
-       if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_datastore_config(blade_datastore_t *bds, config_setting_t *config)
+{
+       config_setting_t *tmp;
+       config_setting_t *database = NULL;
+       //config_setting_t *service = NULL;
+       const char *config_database_path = NULL;
+
+       ks_assert(bds);
+
+       if (!config) return KS_STATUS_FAIL;
+       if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+       database = config_setting_get_member(config, "database");
+       if (!database) return KS_STATUS_FAIL;
+       tmp = config_lookup_from(database, "path");
+       if (!tmp) return KS_STATUS_FAIL;
+       if (config_setting_type(tmp) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
+       config_database_path = config_setting_get_string(tmp);
+       //service = config_setting_get_member(config, "service");
+
+       if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
+       bds->config_database_path = ks_pstrdup(bds->pool, config_database_path);
+       //bds->config_service = service;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config)
+{
+       ks_assert(bds);
+
+       // @todo check if already started
+       
+       if (blade_datastore_config(bds, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       
+       //if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
+       if (unqlite_open(&bds->db, bds->config_database_path, UNQLITE_OPEN_CREATE) != UNQLITE_OK) {
                const char *errbuf = NULL;
                blade_datastore_error(bds, &errbuf, NULL);
-               ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+               ks_log(KS_LOG_ERROR, "BDS Open Error: %s\n", errbuf);
                return KS_STATUS_FAIL;
        }
 
@@ -110,15 +167,31 @@ KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool
 
        // @todo VM init if document store is used (and output consumer callback)
        
+       //blade_service_create(&bds->service, bds->pool, bds->tpool);
+       //ks_assert(bds->service);
+       //blade_service_startup(bds->service, bds->config_service);
+       
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout)
+KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds)
 {
        ks_assert(bds);
-       ks_assert(timeout >= 0);
+
+       //if (bds->service) blade_service_destroy(&bds->service);
+
+       if (bds->db) {
+               unqlite_close(bds->db);
+               bds->db = NULL;
+       }
+
+       if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
+       //bds->config_service = NULL;
+       
+       return KS_STATUS_SUCCESS;
 }
 
+
 KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length)
 {
        ks_assert(bds);
@@ -147,7 +220,7 @@ KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void
                else {
                        const char *errbuf;
                        blade_datastore_error(bds, &errbuf, NULL);
-                       ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+                       ks_log(KS_LOG_ERROR, "BDS Store Error: %s\n", errbuf);
                        
                        ret = KS_STATUS_FAIL;
                }
@@ -196,10 +269,11 @@ KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
        
        if (rc != UNQLITE_OK) {
                if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
+               else if(rc == UNQLITE_NOTFOUND) ret = KS_STATUS_NOT_FOUND;
                else {
                        const char *errbuf;
                        blade_datastore_error(bds, &errbuf, NULL);
-                       ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+                       ks_log(KS_LOG_ERROR, "BDS Fetch Error: %s\n", errbuf);
                        
                        ret = KS_STATUS_FAIL;
                }
diff --git a/libs/libblade/src/blade_directory.c b/libs/libblade/src/blade_directory.c
new file mode 100644 (file)
index 0000000..ab24942
--- /dev/null
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+
+typedef enum {
+       BD_NONE = 0,
+       BD_MYPOOL = (1 << 0),
+       BD_MYTPOOL = (1 << 1),
+} bdpvt_flag_t;
+
+struct blade_directory_s {
+       bdpvt_flag_t flags;
+       ks_pool_t *pool;
+       ks_thread_pool_t *tpool;
+       
+       config_setting_t *config_service;
+       
+       blade_service_t *service;
+};
+
+
+
+
+KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP)
+{
+       blade_directory_t *bd = NULL;
+       bdpvt_flag_t flags;
+       ks_pool_t *pool;
+
+       ks_assert(bdP);
+
+       bd = *bdP;
+       *bdP = NULL;
+
+       ks_assert(bd);
+
+       flags = bd->flags;
+       pool = bd->pool;
+
+       blade_directory_shutdown(bd);
+
+    if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool);
+       
+       ks_pool_free(bd->pool, &bd);
+
+       if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool);
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool)
+{
+       bdpvt_flag_t newflags = BD_NONE;
+       blade_directory_t *bd = NULL;
+
+       if (!pool) {
+               newflags |= BD_MYPOOL;
+               ks_pool_open(&pool);
+               ks_assert(pool);
+       }
+       // @todo: move thread pool creation to startup which allows thread pool to be configurable
+    if (!tpool) {
+               newflags |= BD_MYTPOOL;
+               ks_thread_pool_create(&tpool,
+                                                         BLADE_DIRECTORY_TPOOL_MIN,
+                                                         BLADE_DIRECTORY_TPOOL_MAX,
+                                                         BLADE_DIRECTORY_TPOOL_STACK,
+                                                         KS_PRI_NORMAL,
+                                                         BLADE_DIRECTORY_TPOOL_IDLE);
+               ks_assert(tpool);
+       }
+       
+       bd = ks_pool_alloc(pool, sizeof(*bd));
+       bd->flags = newflags;
+       bd->pool = pool;
+       bd->tpool = tpool;
+       *bdP = bd;
+
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config)
+{
+       config_setting_t *service = NULL;
+
+       ks_assert(bd);
+
+       if (!config) return KS_STATUS_FAIL;
+       if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+       
+       service = config_setting_get_member(config, "service");
+       if (!service) return KS_STATUS_FAIL;
+
+       bd->config_service = service;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config)
+{
+       ks_assert(bd);
+
+       blade_directory_shutdown(bd);
+       
+       if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       blade_service_create(&bd->service, bd->pool, bd->tpool);
+       ks_assert(bd->service);
+       if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd)
+{
+       ks_assert(bd);
+
+       if (bd->service) blade_service_destroy(&bd->service);
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 8bb96ae676f991ca35910b327685b7424c6b7686..2e794a629b51ce9086c5505230762298f1399283 100644 (file)
 
 #include "blade.h"
 
-#define KS_DHT_TPOOL_MIN 2
-#define KS_DHT_TPOOL_MAX 8
-#define KS_DHT_TPOOL_STACK (1024 * 256)
-#define KS_DHT_TPOOL_IDLE 10
-
 typedef enum {
        BP_NONE = 0,
        BP_MYPOOL = (1 << 0),
@@ -48,10 +43,20 @@ struct blade_peer_s {
        bppvt_flag_t flags;
        ks_pool_t *pool;
        ks_thread_pool_t *tpool;
-       ks_dht_t *dht;
+       blade_service_t *service;
+
+       ks_bool_t shutdown;
+       kws_t *kws;
+       ks_thread_t *kws_thread;
+
+       ks_q_t *messages_sending;
+       ks_q_t *messages_receiving;
 };
 
 
+void *blade_peer_kws_thread(ks_thread_t *thread, void *data);
+
+
 KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
 {
        blade_peer_t *bp = NULL;
@@ -68,7 +73,11 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
        flags = bp->flags;
        pool = bp->pool;
 
-       if (bp->dht) ks_dht_destroy(&bp->dht);
+       blade_peer_shutdown(bp);
+
+       ks_q_destroy(&bp->messages_sending);
+       ks_q_destroy(&bp->messages_receiving);
+
        if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool);
        
        ks_pool_free(bp->pool, &bp);
@@ -78,11 +87,13 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid)
+KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service)
 {
        bppvt_flag_t newflags = BP_NONE;
        blade_peer_t *bp = NULL;
-       ks_dht_t *dht = NULL;
+
+       ks_assert(bpP);
+       ks_assert(service);
 
        if (!pool) {
                newflags |= BP_MYPOOL;
@@ -94,50 +105,112 @@ KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, k
                ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE);
                ks_assert(tpool);
        }
-       ks_dht_create(&dht, pool, tpool, nodeid);
-       ks_assert(dht);
 
        bp = ks_pool_alloc(pool, sizeof(*bp));
        bp->flags = newflags;
        bp->pool = pool;
        bp->tpool = tpool;
-       bp->dht = dht;
+       bp->service = service;
+       ks_q_create(&bp->messages_sending, pool, 0);
+       ks_q_create(&bp->messages_receiving, pool, 0);
        *bpP = bp;
 
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp)
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
 {
        ks_assert(bp);
-       ks_assert(bp->dht);
-
-       return &bp->dht->nodeid;
-}
+       ks_assert(kws);
 
-KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port)
-{
-       ks_assert(bp);
-
-       ks_dht_autoroute(bp->dht, autoroute, port);
+       // @todo: consider using a recycle queue for blade_peer_t in blade_service_t, just need to call startup then
+       
+       blade_peer_shutdown(bp);
+       
+       bp->kws = kws;
+
+    if (ks_thread_create_ex(&bp->kws_thread,
+                                                       blade_peer_kws_thread,
+                                                       bp,
+                                                       KS_THREAD_FLAG_DEFAULT,
+                                                       KS_THREAD_DEFAULT_STACK,
+                                                       KS_PRI_NORMAL,
+                                                       bp->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       
+       return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
+KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
 {
        ks_assert(bp);
-       ks_assert(addr);
 
-       return ks_dht_bind(bp->dht, addr, endpoint);
+       bp->shutdown = KS_TRUE;
+       
+       if (bp->kws_thread) {
+               ks_thread_join(bp->kws_thread);
+               ks_pool_free(bp->pool, &bp->kws_thread);
+       }
+       
+       if (bp->kws) kws_destroy(&bp->kws);
+       
+       bp->shutdown = KS_FALSE;
+       return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout)
+void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
 {
-       ks_assert(bp);
-       ks_assert(timeout >= 0);
-
-       ks_dht_pulse(bp->dht, timeout);
+       blade_peer_t *peer;
+       kws_opcode_t opcode;
+       uint8_t *data;
+       ks_size_t data_len;
+       blade_message_t *message;
+
+       ks_assert(thread);
+       ks_assert(data);
+
+       peer = (blade_peer_t *)data;
+
+       while (!peer->shutdown) {
+               // @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again
+               // or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or
+               // kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout))
+               data_len = kws_read_frame(peer->kws, &opcode, &data);
+
+               if (data_len <= 0) {
+                       // @todo error handling, strerror(ks_errno())
+                       // 0 means socket closed with WS_NONE, which closes websocket with no additional reason
+                       // -1 means socket closed with a general failure
+                       // -2 means nonblocking wait
+                       // other values are based on WS_XXX reasons
+                       // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
+                       
+                       // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
+                       // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
+                       // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
+                       // at the service level
+                       peer->disconnecting = KS_TRUE;
+                       break;
+               }
+
+               // @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data
+               if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) {
+                       // @todo error handling
+                       // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+                       peer->disconnecting = KS_TRUE;
+                       break;
+               }
+               
+               ks_q_push(peer->messages_receiving, message);
+               // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
+
+
+               if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) {
+               }
+       }
+       
+       return NULL;
 }
-
+       
 /* For Emacs:
  * Local Variables:
  * mode:c
diff --git a/libs/libblade/src/blade_service.c b/libs/libblade/src/blade_service.c
new file mode 100644 (file)
index 0000000..e1361bf
--- /dev/null
@@ -0,0 +1,401 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+typedef enum {
+       BS_NONE = 0,
+       BS_MYPOOL = (1 << 0),
+       BS_MYTPOOL = (1 << 1)
+} bspvt_flag_t;
+
+#define BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX 16
+
+struct blade_service_s {
+       bspvt_flag_t flags;
+       ks_pool_t *pool;
+       ks_thread_pool_t *tpool;
+
+       ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+       ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+       int32_t config_websockets_endpoints_ipv4_length;
+       int32_t config_websockets_endpoints_ipv6_length;
+       int32_t config_websockets_endpoints_backlog;
+
+       ks_bool_t shutdown;
+       
+       struct pollfd *listeners_poll;
+       int32_t *listeners_families;
+       int32_t listeners_size;
+       int32_t listeners_length;
+       ks_thread_t *listeners_thread;
+       
+       list_t connected;
+};
+
+
+void *blade_service_listeners_thread(ks_thread_t *thread, void *data);
+ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr);
+
+
+KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
+{
+       blade_service_t *bs = NULL;
+       bspvt_flag_t flags;
+       ks_pool_t *pool;
+
+       ks_assert(bsP);
+
+       bs = *bsP;
+       *bsP = NULL;
+
+       ks_assert(bs);
+
+       flags = bs->flags;
+       pool = bs->pool;
+
+       blade_service_shutdown(bs);
+       
+       list_destroy(&bs->connected);
+
+       if (bs->tpool && (flags & BS_MYTPOOL)) ks_thread_pool_destroy(&bs->tpool);
+       
+       ks_pool_free(bs->pool, &bs);
+
+       if (pool && (flags & BS_MYPOOL)) ks_pool_close(&pool);
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
+{
+       bspvt_flag_t newflags = BS_NONE;
+       blade_service_t *bs = NULL;
+
+       if (!pool) {
+               newflags |= BS_MYPOOL;
+               ks_pool_open(&pool);
+               ks_assert(pool);
+       }
+       if (!tpool) {
+               newflags |= BS_MYTPOOL;
+               ks_thread_pool_create(&tpool, BLADE_SERVICE_TPOOL_MIN, BLADE_SERVICE_TPOOL_MAX, BLADE_SERVICE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_SERVICE_TPOOL_IDLE);
+               ks_assert(tpool);
+       }
+
+       bs = ks_pool_alloc(pool, sizeof(*bs));
+       bs->flags = newflags;
+       bs->pool = pool;
+       bs->tpool = tpool;
+       list_init(&bs->connected);
+       *bsP = bs;
+
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config)
+{
+       config_setting_t *websockets = NULL;
+       config_setting_t *websockets_endpoints = NULL;
+       config_setting_t *websockets_endpoints_ipv4 = NULL;
+       config_setting_t *websockets_endpoints_ipv6 = NULL;
+       config_setting_t *websockets_ssl = NULL;
+    config_setting_t *element;
+       config_setting_t *tmp1;
+       config_setting_t *tmp2;
+       ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+       ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+       int32_t config_websockets_endpoints_ipv4_length = 0;
+       int32_t config_websockets_endpoints_ipv6_length = 0;
+       int32_t config_websockets_endpoints_backlog = 8;
+
+       ks_assert(bs);
+
+       if (!config) return KS_STATUS_FAIL;
+       if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+       websockets = config_setting_get_member(config, "websockets");
+       if (!websockets) return KS_STATUS_FAIL;
+       websockets_endpoints = config_setting_get_member(config, "endpoints");
+       if (!websockets_endpoints) return KS_STATUS_FAIL;
+       websockets_endpoints_ipv4 = config_lookup_from(websockets_endpoints, "ipv4");
+       websockets_endpoints_ipv6 = config_lookup_from(websockets_endpoints, "ipv6");
+       if (websockets_endpoints_ipv4) {
+               if (config_setting_type(websockets_endpoints_ipv4) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
+               if ((config_websockets_endpoints_ipv4_length = config_setting_length(websockets_endpoints_ipv4)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX)
+                       return KS_STATUS_FAIL;
+               
+               for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index) {
+                       element = config_setting_get_elem(websockets_endpoints_ipv4, index);
+            tmp1 = config_lookup_from(element, "address");
+            tmp2 = config_lookup_from(element, "port");
+                       if (!tmp1 || !tmp2) return KS_STATUS_FAIL;
+                       if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
+                       if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
+                       
+                       if (ks_addr_set(&config_websockets_endpoints_ipv4[index],
+                                                       config_setting_get_string(tmp1),
+                                                       config_setting_get_int(tmp2),
+                                                       AF_INET) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+               }
+       }
+       if (websockets_endpoints_ipv6) {
+               if (config_setting_type(websockets_endpoints_ipv6) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
+               if ((config_websockets_endpoints_ipv6_length = config_setting_length(websockets_endpoints_ipv6)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX)
+                       return KS_STATUS_FAIL;
+               
+               for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index) {
+                       element = config_setting_get_elem(websockets_endpoints_ipv6, index);
+            tmp1 = config_lookup_from(element, "address");
+            tmp2 = config_lookup_from(element, "port");
+                       if (!tmp1 || !tmp2) return KS_STATUS_FAIL;
+                       if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
+                       if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
+                       
+                       if (ks_addr_set(&config_websockets_endpoints_ipv6[index],
+                                                       config_setting_get_string(tmp1),
+                                                       config_setting_get_int(tmp2),
+                                                       AF_INET6) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+               }
+       }
+       if (config_websockets_endpoints_ipv4_length + config_websockets_endpoints_ipv6_length <= 0) return KS_STATUS_FAIL;
+       tmp1 = config_lookup_from(websockets_endpoints, "backlog");
+       if (tmp1) {
+               if (config_setting_type(tmp1) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
+               config_websockets_endpoints_backlog = config_setting_get_int(tmp1);
+       }
+       websockets_ssl = config_setting_get_member(websockets, "ssl");
+       if (websockets_ssl) {
+               // @todo: SSL stuffs from websockets_ssl into config_websockets_ssl envelope
+       }
+
+
+       // Configuration is valid, now assign it to the variables that are used
+       // If the configuration was invalid, then this does not get changed from the current config when reloading a new config
+       for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index)
+               bs->config_websockets_endpoints_ipv4[index] = config_websockets_endpoints_ipv4[index];
+       for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index)
+               bs->config_websockets_endpoints_ipv6[index] = config_websockets_endpoints_ipv6[index];
+       bs->config_websockets_endpoints_ipv4_length = config_websockets_endpoints_ipv4_length;
+       bs->config_websockets_endpoints_ipv6_length = config_websockets_endpoints_ipv6_length;
+       bs->config_websockets_endpoints_backlog = config_websockets_endpoints_backlog;
+       //bs->config_websockets_ssl = config_websockets_ssl;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config)
+{
+       ks_assert(bs);
+
+       blade_service_shutdown(bs);
+
+       // @todo: If the configuration is invalid, and this is a case of reloading a new config, then the service shutdown shouldn't occur
+       // but the service may use configuration that changes before we shutdown if it is read successfully, may require a config reader/writer mutex?
+       
+    if (blade_service_config(bs, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv4_length; ++index) {
+               if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv4[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       }
+       for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv6_length; ++index) {
+               if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv6[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       }
+
+       if (ks_thread_create_ex(&bs->listeners_thread,
+                                                       blade_service_listeners_thread,
+                                                       bs,
+                                                       KS_THREAD_FLAG_DEFAULT,
+                                                       KS_THREAD_DEFAULT_STACK,
+                                                       KS_PRI_NORMAL,
+                                                       bs->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs)
+{
+       ks_assert(bs);
+
+       bs->shutdown = KS_TRUE;
+       
+       if (bs->listeners_thread) {
+               ks_thread_join(bs->listeners_thread);
+               ks_pool_free(bs->pool, &bs->listeners_thread);
+       }
+
+       for (int32_t index = 0; index < bs->listeners_length; ++index) {
+               ks_socket_t sock = bs->listeners_poll[index].fd;
+               ks_socket_shutdown(sock, SHUT_RDWR);
+               ks_socket_close(&sock);
+       }
+       bs->listeners_length = 0;
+
+       list_iterator_start(&bs->connected);
+       while (list_iterator_hasnext(&bs->connected)) {
+               blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&bs->connected);
+               blade_peer_destroy(&peer);
+       }
+       list_iterator_stop(&bs->connected);
+       list_clear(&bs->connected);
+
+       bs->shutdown = KS_FALSE;
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
+{
+       ks_socket_t listener = KS_SOCK_INVALID;
+       int32_t listener_index = -1;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       
+       ks_assert(bs);
+       ks_assert(addr);
+
+       if ((listener = socket(addr->family, SOCK_STREAM, IPPROTO_TCP)) == KS_SOCK_INVALID) {
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
+
+       ks_socket_option(listener, SO_REUSEADDR, KS_TRUE);
+       ks_socket_option(listener, TCP_NODELAY, KS_TRUE);
+       // @todo make sure v6 does not automatically map to a v4 using socket option IPV6_V6ONLY?
+
+       if (ks_addr_bind(listener, addr) != KS_STATUS_SUCCESS) {
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
+
+       if (listen(listener, bs->config_websockets_endpoints_backlog) != 0) {
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
+
+       listener_index = bs->listeners_length++;
+       if (bs->listeners_length > bs->listeners_size) {
+               bs->listeners_size = bs->listeners_length;
+               bs->listeners_poll = (struct pollfd *)ks_pool_resize(bs->pool, bs->listeners_poll, sizeof(struct pollfd) * bs->listeners_size);
+               ks_assert(bs->listeners_poll);
+               bs->listeners_families = (int32_t *)ks_pool_resize(bs->pool, bs->listeners_families, sizeof(int32_t) * bs->listeners_size);
+               ks_assert(bs->listeners_families);
+       }
+       bs->listeners_poll[listener_index].fd = listener;
+       bs->listeners_poll[listener_index].events = POLLIN | POLLERR;
+       bs->listeners_families[listener_index] = addr->family;
+
+ done:
+       if (ret != KS_STATUS_SUCCESS) {
+               if (listener != KS_SOCK_INVALID) {
+                       ks_socket_shutdown(listener, SHUT_RDWR);
+                       ks_socket_close(&listener);
+               }
+       }
+       return ret;
+}
+
+void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
+{
+       blade_service_t *service;
+
+       ks_assert(thread);
+       ks_assert(data);
+
+       service = (blade_service_t *)data;
+       
+       while (!service->shutdown) {
+               if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
+                       for (int32_t index = 0; index < service->listeners_length; ++index) {
+                               ks_socket_t sock;
+                               ks_sockaddr_t raddr;
+                               socklen_t slen = 0;
+                               kws_t *kws = NULL;
+                               blade_peer_t *peer = NULL;
+
+                               if (!(service->listeners_poll[index].revents & POLLIN)) continue;
+                               if (service->listeners_poll[index].revents & POLLERR) {
+                                       // @todo: error handling, just skip the listener for now
+                                       continue;
+                               }
+
+                               if (service->listeners_families[index] == AF_INET) {
+                                       slen = sizeof(raddr.v.v4);
+                                       if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) {
+                                               // @todo: error handling, just skip the socket for now
+                                               continue;
+                                       }
+                                       raddr.family = AF_INET;
+                               } else {
+                                       slen = sizeof(raddr.v.v6);
+                                       if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) {
+                                               // @todo: error handling, just skip the socket for now
+                                               continue;
+                                       }
+                                       raddr.family = AF_INET6;
+                               }
+
+                               ks_addr_get_host(&raddr);
+                               ks_addr_get_port(&raddr);
+
+                               // @todo: SSL init stuffs based on data from service->config_websockets_ssl
+                               
+                               if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) {
+                                       // @todo: error handling, just close and skip the socket for now
+                                       ks_socket_close(&sock);
+                                       continue;
+                               }
+
+                               blade_peer_create(&peer, service->pool, service->tpool);
+                               ks_assert(peer);
+
+                               // @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread
+                               
+                               list_append(&service->connected, peer);
+                               
+                               blade_peer_startup(peer, kws);
+                       }
+               }
+       }
+       
+       return NULL;
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index d7cf76235c0d0531015e6278cc0b0534ce4e4df4..5f7c8a7d730e27b973569b28be0911f1bd3c51ce 100644 (file)
@@ -43,7 +43,12 @@ struct blade_handle_s {
        bhpvt_flag_t flags;
        ks_pool_t *pool;
        ks_thread_pool_t *tpool;
-       blade_peer_t *peer;
+       
+       config_setting_t *config_datastore;
+       config_setting_t *config_directory;
+       
+       //blade_peer_t *peer;
+       blade_directory_t *directory;
        blade_datastore_t *datastore;
 };
 
@@ -64,9 +69,9 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
        flags = bh->flags;
        pool = bh->pool;
 
-       if (bh->datastore) blade_datastore_destroy(&bh->datastore);
-
-       blade_peer_destroy(&bh->peer);
+       blade_handle_shutdown(bh);
+       
+       //blade_peer_destroy(&bh->peer);
     if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
 
        ks_pool_free(bh->pool, &bh);
@@ -78,14 +83,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid)
+KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool)
 {
        bhpvt_flag_t newflags = BH_NONE;
        blade_handle_t *bh = NULL;
-       ks_dht_nodeid_t nid;
 
-       ks_assert(nodeid);
-       ks_assert(strlen(nodeid) == (KS_DHT_NODEID_SIZE * 2));
+       ks_assert(bhP);
 
        if (!pool) {
                newflags |= BH_MYPOOL;
@@ -101,75 +104,88 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
        bh->flags = newflags;
        bh->pool = pool;
        bh->tpool = tpool;
-       ks_dht_dehex(nid.id, nodeid, KS_DHT_NODEID_SIZE);
-       blade_peer_create(&bh->peer, bh->pool, bh->tpool, &nid);
+       //blade_peer_create(&bh->peer, bh->pool, bh->tpool);
 
        *bhP = bh;
 
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer)
+ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
 {
-       ks_dht_nodeid_t *nodeid = NULL;
-
+       config_setting_t *datastore = NULL;
+       config_setting_t *directory = NULL;
+       
        ks_assert(bh);
-       ks_assert(bh->peer);
 
-       nodeid = blade_peer_myid(bh->peer);
-       ks_dht_hex(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
+       if (!config) return KS_STATUS_FAIL;
+    if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+    datastore = config_setting_get_member(config, "datastore");
+    //if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
+       
+    directory = config_setting_get_member(config, "directory");
+    //if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL;
+
+       bh->config_datastore = datastore;
+       bh->config_directory = directory;
+       
+       return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port)
+KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config)
 {
        ks_assert(bh);
-       ks_assert(bh->peer);
 
-       blade_peer_autoroute(bh->peer, autoroute, port);
+    if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       
+       if (bh->config_datastore && !blade_handle_datastore_available(bh)) {
+               blade_datastore_create(&bh->datastore, bh->pool, bh->tpool);
+               blade_datastore_startup(bh->datastore, bh->config_datastore);
+       }
+       
+       if (bh->config_directory && !blade_handle_directory_available(bh)) {
+               blade_directory_create(&bh->directory, bh->pool, bh->tpool);
+               blade_directory_startup(bh->directory, config);
+       }
+       
+       return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint)
+KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
 {
-       ks_sockaddr_t addr;
-       int family = AF_INET;
-       
        ks_assert(bh);
-       ks_assert(ip);
-       ks_assert(port);
-
-       if (ip[1] != '.' && ip[2] != '.' && ip[3] != '.') family = AF_INET6;
        
-       ks_addr_set(&addr, ip, port, family);
-       return blade_peer_bind(bh->peer, &addr, endpoint);
+       if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory);
+       
+       if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
+       
+       return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout)
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
 {
        ks_assert(bh);
-       ks_assert(timeout >= 0);
 
-       blade_peer_pulse(bh->peer, timeout);
-       if (bh->datastore) blade_datastore_pulse(bh->datastore, timeout);
+       return bh->datastore != NULL;
 }
 
-
-KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh)
+KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh)
 {
        ks_assert(bh);
 
-       if (bh->datastore) return;
-
-       blade_datastore_create(&bh->datastore, bh->pool);
+       return bh->directory != NULL;
 }
 
 KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
 {
        ks_assert(bh);
-       ks_assert(bh->datastore);
        ks_assert(key);
        ks_assert(key_length > 0);
        ks_assert(data);
        ks_assert(data_length > 0);
+
+       if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE;
        
        return blade_datastore_store(bh->datastore, key, key_length, data, data_length);
 }
@@ -181,11 +197,12 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
                                                                                                         void *userdata)
 {
        ks_assert(bh);
-       ks_assert(bh->datastore);
        ks_assert(callback);
        ks_assert(key);
        ks_assert(key_length > 0);
        
+       if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE;
+       
        return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
 }
 
index 4d44ea07fbb730a8a36731131a4e2cd1c01f74f2..578d41fee0f72050e30ca82178070fc8052ecd04 100644 (file)
 #include <ks.h>
 #include <ks_dht.h>
 #include <sodium.h>
+#include <libconfig.h>
 #include "unqlite.h"
 #include "blade_types.h"
 #include "blade_stack.h"
 #include "blade_peer.h"
+#include "blade_service.h"
 #include "blade_datastore.h"
+#include "blade_directory.h"
 #include "bpcp.h"
 
 KS_BEGIN_EXTERN_C
index 62918a3a41727c353b0a66cdcefd23cd183cdea7..641eccbd55c701c19b0e7b12a35911b0d609bf77 100644 (file)
 #define _BLADE_DATASTORE_H_
 #include <blade.h>
 
+#define BLADE_DATASTORE_TPOOL_MIN 2
+#define BLADE_DATASTORE_TPOOL_MAX 8
+#define BLADE_DATASTORE_TPOOL_STACK (1024 * 256)
+#define BLADE_DATASTORE_TPOOL_IDLE 10
+
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
 KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP);
-KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout);
+KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds);
+
 KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length);
 KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length);
 KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
diff --git a/libs/libblade/src/include/blade_directory.h b/libs/libblade/src/include/blade_directory.h
new file mode 100644 (file)
index 0000000..e0a6698
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _BLADE_DIRECTORY_H_
+#define _BLADE_DIRECTORY_H_
+#include <blade.h>
+
+#define BLADE_DIRECTORY_TPOOL_MIN 2
+#define BLADE_DIRECTORY_TPOOL_MAX 8
+#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256)
+#define BLADE_DIRECTORY_TPOOL_IDLE 10
+
+KS_BEGIN_EXTERN_C
+KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP);
+KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd);
+KS_END_EXTERN_C
+
+#endif
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 3c0bb5925dbf2139d082d63fdf9c822468bdb979..8a15044ac525a6b0cfb2356f7fbb4306b940dc29 100644 (file)
 #define BLADE_PEER_TPOOL_IDLE 10
 
 KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid);
+KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool);
 KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
-KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp);
-KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port);
-KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
-KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout);
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws);
+KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
 KS_END_EXTERN_C
 
 #endif
diff --git a/libs/libblade/src/include/blade_service.h b/libs/libblade/src/include/blade_service.h
new file mode 100644 (file)
index 0000000..9ec014f
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _BLADE_SERVICE_H_
+#define _BLADE_SERVICE_H_
+#include <blade.h>
+
+#define BLADE_SERVICE_TPOOL_MIN 2
+#define BLADE_SERVICE_TPOOL_MAX 8
+#define BLADE_SERVICE_TPOOL_STACK (1024 * 256)
+#define BLADE_SERVICE_TPOOL_IDLE 10
+
+KS_BEGIN_EXTERN_C
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
+KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
+KS_END_EXTERN_C
+
+#endif
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 81411fabdc164a3abb16578e95eee6e0190487fa..1eb4229d726cc8d0ad46ee09635f600de507e6b2 100644 (file)
 
 KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP);
-KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid);
-KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer);
-KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port);
-KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint);
-KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout);
-KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh);
+KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
+
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
+KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh);
+
 KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
 KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
                                                                                                         blade_datastore_fetch_callback_t callback,
index 0bb43a4c07150d8e86df84337ce74adfa1b9c1bc..0cb2ec8e3ef6824627d45f9d6b5641738b3cc087 100644 (file)
@@ -39,7 +39,9 @@ KS_BEGIN_EXTERN_C
 
 typedef struct blade_handle_s blade_handle_t;
 typedef struct blade_peer_s blade_peer_t;
+typedef struct blade_service_s blade_service_t;
 typedef struct blade_datastore_s blade_datastore_t;
+typedef struct blade_directory_s blade_directory_t;
 
 typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);
 
index 478b614bc0acdb930f470003ba3836bc31f4a3a3..09e821edc218da9d01fac6375ee78ad1a5787e20 100644 (file)
@@ -28,16 +28,12 @@ struct command_def_s {
 
 void command_test(blade_handle_t *bh, char *args);
 void command_quit(blade_handle_t *bh, char *args);
-void command_myid(blade_handle_t *bh, char *args);
-void command_bind(blade_handle_t *bh, char *args);
 void command_store(blade_handle_t *bh, char *args);
 void command_fetch(blade_handle_t *bh, char *args);
 
 static const struct command_def_s command_defs[] = {
        { "test", command_test },
        { "quit", command_quit },
-       { "myid", command_myid },
-       { "bind", command_bind },
        { "store", command_store },
        { "fetch", command_fetch },
        
@@ -48,19 +44,12 @@ static const struct command_def_s command_defs[] = {
 int main(int argc, char **argv)
 {
        blade_handle_t *bh = NULL;
-       const char *nodeid;
-
-       ks_assert(argc >= 2);
-
-       nodeid = argv[1];
 
        ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
        
        blade_init();
 
-       blade_handle_create(&bh, NULL, NULL, nodeid);
-
-       blade_handle_autoroute(bh, KS_TRUE, KS_DHT_DEFAULT_PORT);
+       blade_handle_create(&bh, NULL, NULL);
 
        loop(bh);
        
@@ -121,7 +110,7 @@ void loop(blade_handle_t *bh)
                        // @todo lines must not exceed 512 bytes, treat as error and ignore buffer until next new line?
                        ks_assert(0);
                }
-               blade_handle_pulse(bh, 1);
+               blade_handle_pulse(bh);
        }
 }
 
@@ -179,34 +168,6 @@ void command_quit(blade_handle_t *bh, char *args)
        g_shutdown = KS_TRUE;
 }
 
-void command_myid(blade_handle_t *bh, char *args)
-{
-       char buf[KS_DHT_NODEID_SIZE * 2 + 1];
-
-       ks_assert(bh);
-       ks_assert(args);
-
-       blade_handle_myid(bh, buf);
-
-       ks_log(KS_LOG_INFO, "%s\n", buf);
-}
-
-void command_bind(blade_handle_t *bh, char *args)
-{
-       char *ip = NULL;
-       char *port = NULL;
-       ks_port_t p;
-
-       ks_assert(args);
-
-       parse_argument(&args, &ip, ' ');
-       parse_argument(&args, &port, ' ');
-
-       p = atoi(port); // @todo use strtol for error handling
-
-       blade_handle_bind(bh, ip, p, NULL);
-}
-
 void command_store(blade_handle_t *bh, char *args)
 {
        char *key;
@@ -214,7 +175,7 @@ void command_store(blade_handle_t *bh, char *args)
 
        ks_assert(args);
 
-       blade_handle_datastore_start(bh);
+       blade_handle_datastore_startup(bh, NULL);
        
        parse_argument(&args, &key, ' ');
        parse_argument(&args, &data, ' ');
@@ -234,7 +195,7 @@ void command_fetch(blade_handle_t *bh, char *args)
 
        ks_assert(args);
 
-       blade_handle_datastore_start(bh);
+       blade_handle_datastore_startup(bh, NULL);
        
        parse_argument(&args, &key, ' ');