libunqlite_la_LIBADD = -lpthread
lib_LTLIBRARIES = libblade.la
-libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/bpcp.c src/blade_datastore.c
+libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c
libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS)
-libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm $(AM_LDFLAGS)
+libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
libblade_la_LIBADD = libunqlite.la
library_includedir = $(prefix)/include
-library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/bpcp.h
-library_include_HEADERS += src/include/blade_datastore.h
+library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
+library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h
library_include_HEADERS += src/include/unqlite.h test/tap.h
tests: libblade.la
typedef enum {
BDS_NONE = 0,
BDS_MYPOOL = (1 << 0),
+ BDS_MYTPOOL = (1 << 1),
} bdspvt_flag_t;
struct blade_datastore_s {
bdspvt_flag_t flags;
ks_pool_t *pool;
+ ks_thread_pool_t *tpool;
+
+ const char *config_database_path;
+ //config_setting_t *config_service;
+
unqlite *db;
+ //blade_service_t *service;
};
struct blade_datastore_fetch_userdata_s
flags = bds->flags;
pool = bds->pool;
- if (bds->db) {
- unqlite_close(bds->db);
- bds->db = NULL;
- }
+ blade_datastore_shutdown(bds);
+ if (bds->tpool && (flags & BDS_MYTPOOL)) ks_thread_pool_destroy(&bds->tpool);
+
ks_pool_free(bds->pool, &bds);
if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool)
+KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bdspvt_flag_t newflags = BDS_NONE;
blade_datastore_t *bds = NULL;
ks_pool_open(&pool);
ks_assert(pool);
}
-
+ // @todo: move thread pool creation to startup which allows thread pool to be configurable
+ if (!tpool) {
+ newflags |= BDS_MYTPOOL;
+ ks_thread_pool_create(&tpool,
+ BLADE_DATASTORE_TPOOL_MIN,
+ BLADE_DATASTORE_TPOOL_MAX,
+ BLADE_DATASTORE_TPOOL_STACK,
+ KS_PRI_NORMAL,
+ BLADE_DATASTORE_TPOOL_IDLE);
+ ks_assert(tpool);
+ }
+
bds = ks_pool_alloc(pool, sizeof(*bds));
bds->flags = newflags;
bds->pool = pool;
+ bds->tpool = tpool;
*bdsP = bds;
- if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_datastore_config(blade_datastore_t *bds, config_setting_t *config)
+{
+ config_setting_t *tmp;
+ config_setting_t *database = NULL;
+ //config_setting_t *service = NULL;
+ const char *config_database_path = NULL;
+
+ ks_assert(bds);
+
+ if (!config) return KS_STATUS_FAIL;
+ if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+ database = config_setting_get_member(config, "database");
+ if (!database) return KS_STATUS_FAIL;
+ tmp = config_lookup_from(database, "path");
+ if (!tmp) return KS_STATUS_FAIL;
+ if (config_setting_type(tmp) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
+ config_database_path = config_setting_get_string(tmp);
+ //service = config_setting_get_member(config, "service");
+
+ if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
+ bds->config_database_path = ks_pstrdup(bds->pool, config_database_path);
+ //bds->config_service = service;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config)
+{
+ ks_assert(bds);
+
+ // @todo check if already started
+
+ if (blade_datastore_config(bds, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ //if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
+ if (unqlite_open(&bds->db, bds->config_database_path, UNQLITE_OPEN_CREATE) != UNQLITE_OK) {
const char *errbuf = NULL;
blade_datastore_error(bds, &errbuf, NULL);
- ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+ ks_log(KS_LOG_ERROR, "BDS Open Error: %s\n", errbuf);
return KS_STATUS_FAIL;
}
// @todo VM init if document store is used (and output consumer callback)
+ //blade_service_create(&bds->service, bds->pool, bds->tpool);
+ //ks_assert(bds->service);
+ //blade_service_startup(bds->service, bds->config_service);
+
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout)
+KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds)
{
ks_assert(bds);
- ks_assert(timeout >= 0);
+
+ //if (bds->service) blade_service_destroy(&bds->service);
+
+ if (bds->db) {
+ unqlite_close(bds->db);
+ bds->db = NULL;
+ }
+
+ if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
+ //bds->config_service = NULL;
+
+ return KS_STATUS_SUCCESS;
}
+
KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length)
{
ks_assert(bds);
else {
const char *errbuf;
blade_datastore_error(bds, &errbuf, NULL);
- ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+ ks_log(KS_LOG_ERROR, "BDS Store Error: %s\n", errbuf);
ret = KS_STATUS_FAIL;
}
if (rc != UNQLITE_OK) {
if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
+ else if(rc == UNQLITE_NOTFOUND) ret = KS_STATUS_NOT_FOUND;
else {
const char *errbuf;
blade_datastore_error(bds, &errbuf, NULL);
- ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+ ks_log(KS_LOG_ERROR, "BDS Fetch Error: %s\n", errbuf);
ret = KS_STATUS_FAIL;
}
--- /dev/null
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+
+typedef enum {
+ BD_NONE = 0,
+ BD_MYPOOL = (1 << 0),
+ BD_MYTPOOL = (1 << 1),
+} bdpvt_flag_t;
+
+struct blade_directory_s {
+ bdpvt_flag_t flags;
+ ks_pool_t *pool;
+ ks_thread_pool_t *tpool;
+
+ config_setting_t *config_service;
+
+ blade_service_t *service;
+};
+
+
+
+
+KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP)
+{
+ blade_directory_t *bd = NULL;
+ bdpvt_flag_t flags;
+ ks_pool_t *pool;
+
+ ks_assert(bdP);
+
+ bd = *bdP;
+ *bdP = NULL;
+
+ ks_assert(bd);
+
+ flags = bd->flags;
+ pool = bd->pool;
+
+ blade_directory_shutdown(bd);
+
+ if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool);
+
+ ks_pool_free(bd->pool, &bd);
+
+ if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool);
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool)
+{
+ bdpvt_flag_t newflags = BD_NONE;
+ blade_directory_t *bd = NULL;
+
+ if (!pool) {
+ newflags |= BD_MYPOOL;
+ ks_pool_open(&pool);
+ ks_assert(pool);
+ }
+ // @todo: move thread pool creation to startup which allows thread pool to be configurable
+ if (!tpool) {
+ newflags |= BD_MYTPOOL;
+ ks_thread_pool_create(&tpool,
+ BLADE_DIRECTORY_TPOOL_MIN,
+ BLADE_DIRECTORY_TPOOL_MAX,
+ BLADE_DIRECTORY_TPOOL_STACK,
+ KS_PRI_NORMAL,
+ BLADE_DIRECTORY_TPOOL_IDLE);
+ ks_assert(tpool);
+ }
+
+ bd = ks_pool_alloc(pool, sizeof(*bd));
+ bd->flags = newflags;
+ bd->pool = pool;
+ bd->tpool = tpool;
+ *bdP = bd;
+
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config)
+{
+ config_setting_t *service = NULL;
+
+ ks_assert(bd);
+
+ if (!config) return KS_STATUS_FAIL;
+ if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+ service = config_setting_get_member(config, "service");
+ if (!service) return KS_STATUS_FAIL;
+
+ bd->config_service = service;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config)
+{
+ ks_assert(bd);
+
+ blade_directory_shutdown(bd);
+
+ if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ blade_service_create(&bd->service, bd->pool, bd->tpool);
+ ks_assert(bd->service);
+ if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd)
+{
+ ks_assert(bd);
+
+ if (bd->service) blade_service_destroy(&bd->service);
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
#include "blade.h"
-#define KS_DHT_TPOOL_MIN 2
-#define KS_DHT_TPOOL_MAX 8
-#define KS_DHT_TPOOL_STACK (1024 * 256)
-#define KS_DHT_TPOOL_IDLE 10
-
typedef enum {
BP_NONE = 0,
BP_MYPOOL = (1 << 0),
bppvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
- ks_dht_t *dht;
+ blade_service_t *service;
+
+ ks_bool_t shutdown;
+ kws_t *kws;
+ ks_thread_t *kws_thread;
+
+ ks_q_t *messages_sending;
+ ks_q_t *messages_receiving;
};
+void *blade_peer_kws_thread(ks_thread_t *thread, void *data);
+
+
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
{
blade_peer_t *bp = NULL;
flags = bp->flags;
pool = bp->pool;
- if (bp->dht) ks_dht_destroy(&bp->dht);
+ blade_peer_shutdown(bp);
+
+ ks_q_destroy(&bp->messages_sending);
+ ks_q_destroy(&bp->messages_receiving);
+
if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool);
ks_pool_free(bp->pool, &bp);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid)
+KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service)
{
bppvt_flag_t newflags = BP_NONE;
blade_peer_t *bp = NULL;
- ks_dht_t *dht = NULL;
+
+ ks_assert(bpP);
+ ks_assert(service);
if (!pool) {
newflags |= BP_MYPOOL;
ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE);
ks_assert(tpool);
}
- ks_dht_create(&dht, pool, tpool, nodeid);
- ks_assert(dht);
bp = ks_pool_alloc(pool, sizeof(*bp));
bp->flags = newflags;
bp->pool = pool;
bp->tpool = tpool;
- bp->dht = dht;
+ bp->service = service;
+ ks_q_create(&bp->messages_sending, pool, 0);
+ ks_q_create(&bp->messages_receiving, pool, 0);
*bpP = bp;
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp)
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
{
ks_assert(bp);
- ks_assert(bp->dht);
-
- return &bp->dht->nodeid;
-}
+ ks_assert(kws);
-KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port)
-{
- ks_assert(bp);
-
- ks_dht_autoroute(bp->dht, autoroute, port);
+ // @todo: consider using a recycle queue for blade_peer_t in blade_service_t, just need to call startup then
+
+ blade_peer_shutdown(bp);
+
+ bp->kws = kws;
+
+ if (ks_thread_create_ex(&bp->kws_thread,
+ blade_peer_kws_thread,
+ bp,
+ KS_THREAD_FLAG_DEFAULT,
+ KS_THREAD_DEFAULT_STACK,
+ KS_PRI_NORMAL,
+ bp->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
+KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
{
ks_assert(bp);
- ks_assert(addr);
- return ks_dht_bind(bp->dht, addr, endpoint);
+ bp->shutdown = KS_TRUE;
+
+ if (bp->kws_thread) {
+ ks_thread_join(bp->kws_thread);
+ ks_pool_free(bp->pool, &bp->kws_thread);
+ }
+
+ if (bp->kws) kws_destroy(&bp->kws);
+
+ bp->shutdown = KS_FALSE;
+ return KS_STATUS_SUCCESS;
}
-KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout)
+void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
{
- ks_assert(bp);
- ks_assert(timeout >= 0);
-
- ks_dht_pulse(bp->dht, timeout);
+ blade_peer_t *peer;
+ kws_opcode_t opcode;
+ uint8_t *data;
+ ks_size_t data_len;
+ blade_message_t *message;
+
+ ks_assert(thread);
+ ks_assert(data);
+
+ peer = (blade_peer_t *)data;
+
+ while (!peer->shutdown) {
+ // @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again
+ // or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or
+ // kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout))
+ data_len = kws_read_frame(peer->kws, &opcode, &data);
+
+ if (data_len <= 0) {
+ // @todo error handling, strerror(ks_errno())
+ // 0 means socket closed with WS_NONE, which closes websocket with no additional reason
+ // -1 means socket closed with a general failure
+ // -2 means nonblocking wait
+ // other values are based on WS_XXX reasons
+ // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
+
+ // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
+ // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
+ // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
+ // at the service level
+ peer->disconnecting = KS_TRUE;
+ break;
+ }
+
+ // @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data
+ if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) {
+ // @todo error handling
+ // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
+ peer->disconnecting = KS_TRUE;
+ break;
+ }
+
+ ks_q_push(peer->messages_receiving, message);
+ // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
+
+
+ if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) {
+ }
+ }
+
+ return NULL;
}
-
+
/* For Emacs:
* Local Variables:
* mode:c
--- /dev/null
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+typedef enum {
+ BS_NONE = 0,
+ BS_MYPOOL = (1 << 0),
+ BS_MYTPOOL = (1 << 1)
+} bspvt_flag_t;
+
+#define BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX 16
+
+struct blade_service_s {
+ bspvt_flag_t flags;
+ ks_pool_t *pool;
+ ks_thread_pool_t *tpool;
+
+ ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+ ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+ int32_t config_websockets_endpoints_ipv4_length;
+ int32_t config_websockets_endpoints_ipv6_length;
+ int32_t config_websockets_endpoints_backlog;
+
+ ks_bool_t shutdown;
+
+ struct pollfd *listeners_poll;
+ int32_t *listeners_families;
+ int32_t listeners_size;
+ int32_t listeners_length;
+ ks_thread_t *listeners_thread;
+
+ list_t connected;
+};
+
+
+void *blade_service_listeners_thread(ks_thread_t *thread, void *data);
+ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr);
+
+
+KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
+{
+ blade_service_t *bs = NULL;
+ bspvt_flag_t flags;
+ ks_pool_t *pool;
+
+ ks_assert(bsP);
+
+ bs = *bsP;
+ *bsP = NULL;
+
+ ks_assert(bs);
+
+ flags = bs->flags;
+ pool = bs->pool;
+
+ blade_service_shutdown(bs);
+
+ list_destroy(&bs->connected);
+
+ if (bs->tpool && (flags & BS_MYTPOOL)) ks_thread_pool_destroy(&bs->tpool);
+
+ ks_pool_free(bs->pool, &bs);
+
+ if (pool && (flags & BS_MYPOOL)) ks_pool_close(&pool);
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
+{
+ bspvt_flag_t newflags = BS_NONE;
+ blade_service_t *bs = NULL;
+
+ if (!pool) {
+ newflags |= BS_MYPOOL;
+ ks_pool_open(&pool);
+ ks_assert(pool);
+ }
+ if (!tpool) {
+ newflags |= BS_MYTPOOL;
+ ks_thread_pool_create(&tpool, BLADE_SERVICE_TPOOL_MIN, BLADE_SERVICE_TPOOL_MAX, BLADE_SERVICE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_SERVICE_TPOOL_IDLE);
+ ks_assert(tpool);
+ }
+
+ bs = ks_pool_alloc(pool, sizeof(*bs));
+ bs->flags = newflags;
+ bs->pool = pool;
+ bs->tpool = tpool;
+ list_init(&bs->connected);
+ *bsP = bs;
+
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config)
+{
+ config_setting_t *websockets = NULL;
+ config_setting_t *websockets_endpoints = NULL;
+ config_setting_t *websockets_endpoints_ipv4 = NULL;
+ config_setting_t *websockets_endpoints_ipv6 = NULL;
+ config_setting_t *websockets_ssl = NULL;
+ config_setting_t *element;
+ config_setting_t *tmp1;
+ config_setting_t *tmp2;
+ ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+ ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
+ int32_t config_websockets_endpoints_ipv4_length = 0;
+ int32_t config_websockets_endpoints_ipv6_length = 0;
+ int32_t config_websockets_endpoints_backlog = 8;
+
+ ks_assert(bs);
+
+ if (!config) return KS_STATUS_FAIL;
+ if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+ websockets = config_setting_get_member(config, "websockets");
+ if (!websockets) return KS_STATUS_FAIL;
+ websockets_endpoints = config_setting_get_member(config, "endpoints");
+ if (!websockets_endpoints) return KS_STATUS_FAIL;
+ websockets_endpoints_ipv4 = config_lookup_from(websockets_endpoints, "ipv4");
+ websockets_endpoints_ipv6 = config_lookup_from(websockets_endpoints, "ipv6");
+ if (websockets_endpoints_ipv4) {
+ if (config_setting_type(websockets_endpoints_ipv4) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
+ if ((config_websockets_endpoints_ipv4_length = config_setting_length(websockets_endpoints_ipv4)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX)
+ return KS_STATUS_FAIL;
+
+ for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index) {
+ element = config_setting_get_elem(websockets_endpoints_ipv4, index);
+ tmp1 = config_lookup_from(element, "address");
+ tmp2 = config_lookup_from(element, "port");
+ if (!tmp1 || !tmp2) return KS_STATUS_FAIL;
+ if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
+ if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
+
+ if (ks_addr_set(&config_websockets_endpoints_ipv4[index],
+ config_setting_get_string(tmp1),
+ config_setting_get_int(tmp2),
+ AF_INET) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ }
+ }
+ if (websockets_endpoints_ipv6) {
+ if (config_setting_type(websockets_endpoints_ipv6) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
+ if ((config_websockets_endpoints_ipv6_length = config_setting_length(websockets_endpoints_ipv6)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX)
+ return KS_STATUS_FAIL;
+
+ for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index) {
+ element = config_setting_get_elem(websockets_endpoints_ipv6, index);
+ tmp1 = config_lookup_from(element, "address");
+ tmp2 = config_lookup_from(element, "port");
+ if (!tmp1 || !tmp2) return KS_STATUS_FAIL;
+ if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
+ if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
+
+ if (ks_addr_set(&config_websockets_endpoints_ipv6[index],
+ config_setting_get_string(tmp1),
+ config_setting_get_int(tmp2),
+ AF_INET6) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ }
+ }
+ if (config_websockets_endpoints_ipv4_length + config_websockets_endpoints_ipv6_length <= 0) return KS_STATUS_FAIL;
+ tmp1 = config_lookup_from(websockets_endpoints, "backlog");
+ if (tmp1) {
+ if (config_setting_type(tmp1) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
+ config_websockets_endpoints_backlog = config_setting_get_int(tmp1);
+ }
+ websockets_ssl = config_setting_get_member(websockets, "ssl");
+ if (websockets_ssl) {
+ // @todo: SSL stuffs from websockets_ssl into config_websockets_ssl envelope
+ }
+
+
+ // Configuration is valid, now assign it to the variables that are used
+ // If the configuration was invalid, then this does not get changed from the current config when reloading a new config
+ for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index)
+ bs->config_websockets_endpoints_ipv4[index] = config_websockets_endpoints_ipv4[index];
+ for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index)
+ bs->config_websockets_endpoints_ipv6[index] = config_websockets_endpoints_ipv6[index];
+ bs->config_websockets_endpoints_ipv4_length = config_websockets_endpoints_ipv4_length;
+ bs->config_websockets_endpoints_ipv6_length = config_websockets_endpoints_ipv6_length;
+ bs->config_websockets_endpoints_backlog = config_websockets_endpoints_backlog;
+ //bs->config_websockets_ssl = config_websockets_ssl;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config)
+{
+ ks_assert(bs);
+
+ blade_service_shutdown(bs);
+
+ // @todo: If the configuration is invalid, and this is a case of reloading a new config, then the service shutdown shouldn't occur
+ // but the service may use configuration that changes before we shutdown if it is read successfully, may require a config reader/writer mutex?
+
+ if (blade_service_config(bs, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv4_length; ++index) {
+ if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv4[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ }
+ for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv6_length; ++index) {
+ if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv6[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ }
+
+ if (ks_thread_create_ex(&bs->listeners_thread,
+ blade_service_listeners_thread,
+ bs,
+ KS_THREAD_FLAG_DEFAULT,
+ KS_THREAD_DEFAULT_STACK,
+ KS_PRI_NORMAL,
+ bs->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs)
+{
+ ks_assert(bs);
+
+ bs->shutdown = KS_TRUE;
+
+ if (bs->listeners_thread) {
+ ks_thread_join(bs->listeners_thread);
+ ks_pool_free(bs->pool, &bs->listeners_thread);
+ }
+
+ for (int32_t index = 0; index < bs->listeners_length; ++index) {
+ ks_socket_t sock = bs->listeners_poll[index].fd;
+ ks_socket_shutdown(sock, SHUT_RDWR);
+ ks_socket_close(&sock);
+ }
+ bs->listeners_length = 0;
+
+ list_iterator_start(&bs->connected);
+ while (list_iterator_hasnext(&bs->connected)) {
+ blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&bs->connected);
+ blade_peer_destroy(&peer);
+ }
+ list_iterator_stop(&bs->connected);
+ list_clear(&bs->connected);
+
+ bs->shutdown = KS_FALSE;
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
+{
+ ks_socket_t listener = KS_SOCK_INVALID;
+ int32_t listener_index = -1;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(bs);
+ ks_assert(addr);
+
+ if ((listener = socket(addr->family, SOCK_STREAM, IPPROTO_TCP)) == KS_SOCK_INVALID) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ ks_socket_option(listener, SO_REUSEADDR, KS_TRUE);
+ ks_socket_option(listener, TCP_NODELAY, KS_TRUE);
+ // @todo make sure v6 does not automatically map to a v4 using socket option IPV6_V6ONLY?
+
+ if (ks_addr_bind(listener, addr) != KS_STATUS_SUCCESS) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ if (listen(listener, bs->config_websockets_endpoints_backlog) != 0) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ listener_index = bs->listeners_length++;
+ if (bs->listeners_length > bs->listeners_size) {
+ bs->listeners_size = bs->listeners_length;
+ bs->listeners_poll = (struct pollfd *)ks_pool_resize(bs->pool, bs->listeners_poll, sizeof(struct pollfd) * bs->listeners_size);
+ ks_assert(bs->listeners_poll);
+ bs->listeners_families = (int32_t *)ks_pool_resize(bs->pool, bs->listeners_families, sizeof(int32_t) * bs->listeners_size);
+ ks_assert(bs->listeners_families);
+ }
+ bs->listeners_poll[listener_index].fd = listener;
+ bs->listeners_poll[listener_index].events = POLLIN | POLLERR;
+ bs->listeners_families[listener_index] = addr->family;
+
+ done:
+ if (ret != KS_STATUS_SUCCESS) {
+ if (listener != KS_SOCK_INVALID) {
+ ks_socket_shutdown(listener, SHUT_RDWR);
+ ks_socket_close(&listener);
+ }
+ }
+ return ret;
+}
+
+void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
+{
+ blade_service_t *service;
+
+ ks_assert(thread);
+ ks_assert(data);
+
+ service = (blade_service_t *)data;
+
+ while (!service->shutdown) {
+ if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
+ for (int32_t index = 0; index < service->listeners_length; ++index) {
+ ks_socket_t sock;
+ ks_sockaddr_t raddr;
+ socklen_t slen = 0;
+ kws_t *kws = NULL;
+ blade_peer_t *peer = NULL;
+
+ if (!(service->listeners_poll[index].revents & POLLIN)) continue;
+ if (service->listeners_poll[index].revents & POLLERR) {
+ // @todo: error handling, just skip the listener for now
+ continue;
+ }
+
+ if (service->listeners_families[index] == AF_INET) {
+ slen = sizeof(raddr.v.v4);
+ if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) {
+ // @todo: error handling, just skip the socket for now
+ continue;
+ }
+ raddr.family = AF_INET;
+ } else {
+ slen = sizeof(raddr.v.v6);
+ if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) {
+ // @todo: error handling, just skip the socket for now
+ continue;
+ }
+ raddr.family = AF_INET6;
+ }
+
+ ks_addr_get_host(&raddr);
+ ks_addr_get_port(&raddr);
+
+ // @todo: SSL init stuffs based on data from service->config_websockets_ssl
+
+ if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) {
+ // @todo: error handling, just close and skip the socket for now
+ ks_socket_close(&sock);
+ continue;
+ }
+
+ blade_peer_create(&peer, service->pool, service->tpool);
+ ks_assert(peer);
+
+ // @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread
+
+ list_append(&service->connected, peer);
+
+ blade_peer_startup(peer, kws);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
bhpvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
- blade_peer_t *peer;
+
+ config_setting_t *config_datastore;
+ config_setting_t *config_directory;
+
+ //blade_peer_t *peer;
+ blade_directory_t *directory;
blade_datastore_t *datastore;
};
flags = bh->flags;
pool = bh->pool;
- if (bh->datastore) blade_datastore_destroy(&bh->datastore);
-
- blade_peer_destroy(&bh->peer);
+ blade_handle_shutdown(bh);
+
+ //blade_peer_destroy(&bh->peer);
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid)
+KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bhpvt_flag_t newflags = BH_NONE;
blade_handle_t *bh = NULL;
- ks_dht_nodeid_t nid;
- ks_assert(nodeid);
- ks_assert(strlen(nodeid) == (KS_DHT_NODEID_SIZE * 2));
+ ks_assert(bhP);
if (!pool) {
newflags |= BH_MYPOOL;
bh->flags = newflags;
bh->pool = pool;
bh->tpool = tpool;
- ks_dht_dehex(nid.id, nodeid, KS_DHT_NODEID_SIZE);
- blade_peer_create(&bh->peer, bh->pool, bh->tpool, &nid);
+ //blade_peer_create(&bh->peer, bh->pool, bh->tpool);
*bhP = bh;
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer)
+ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
{
- ks_dht_nodeid_t *nodeid = NULL;
-
+ config_setting_t *datastore = NULL;
+ config_setting_t *directory = NULL;
+
ks_assert(bh);
- ks_assert(bh->peer);
- nodeid = blade_peer_myid(bh->peer);
- ks_dht_hex(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
+ if (!config) return KS_STATUS_FAIL;
+ if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
+
+ datastore = config_setting_get_member(config, "datastore");
+ //if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
+
+ directory = config_setting_get_member(config, "directory");
+ //if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL;
+
+ bh->config_datastore = datastore;
+ bh->config_directory = directory;
+
+ return KS_STATUS_SUCCESS;
}
-KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port)
+KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config)
{
ks_assert(bh);
- ks_assert(bh->peer);
- blade_peer_autoroute(bh->peer, autoroute, port);
+ if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ if (bh->config_datastore && !blade_handle_datastore_available(bh)) {
+ blade_datastore_create(&bh->datastore, bh->pool, bh->tpool);
+ blade_datastore_startup(bh->datastore, bh->config_datastore);
+ }
+
+ if (bh->config_directory && !blade_handle_directory_available(bh)) {
+ blade_directory_create(&bh->directory, bh->pool, bh->tpool);
+ blade_directory_startup(bh->directory, config);
+ }
+
+ return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint)
+KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
{
- ks_sockaddr_t addr;
- int family = AF_INET;
-
ks_assert(bh);
- ks_assert(ip);
- ks_assert(port);
-
- if (ip[1] != '.' && ip[2] != '.' && ip[3] != '.') family = AF_INET6;
- ks_addr_set(&addr, ip, port, family);
- return blade_peer_bind(bh->peer, &addr, endpoint);
+ if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory);
+
+ if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
+
+ return KS_STATUS_SUCCESS;
}
-KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout)
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
{
ks_assert(bh);
- ks_assert(timeout >= 0);
- blade_peer_pulse(bh->peer, timeout);
- if (bh->datastore) blade_datastore_pulse(bh->datastore, timeout);
+ return bh->datastore != NULL;
}
-
-KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh)
+KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh)
{
ks_assert(bh);
- if (bh->datastore) return;
-
- blade_datastore_create(&bh->datastore, bh->pool);
+ return bh->directory != NULL;
}
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
{
ks_assert(bh);
- ks_assert(bh->datastore);
ks_assert(key);
ks_assert(key_length > 0);
ks_assert(data);
ks_assert(data_length > 0);
+
+ if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE;
return blade_datastore_store(bh->datastore, key, key_length, data, data_length);
}
void *userdata)
{
ks_assert(bh);
- ks_assert(bh->datastore);
ks_assert(callback);
ks_assert(key);
ks_assert(key_length > 0);
+ if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE;
+
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
}
#include <ks.h>
#include <ks_dht.h>
#include <sodium.h>
+#include <libconfig.h>
#include "unqlite.h"
#include "blade_types.h"
#include "blade_stack.h"
#include "blade_peer.h"
+#include "blade_service.h"
#include "blade_datastore.h"
+#include "blade_directory.h"
#include "bpcp.h"
KS_BEGIN_EXTERN_C
#define _BLADE_DATASTORE_H_
#include <blade.h>
+#define BLADE_DATASTORE_TPOOL_MIN 2
+#define BLADE_DATASTORE_TPOOL_MAX 8
+#define BLADE_DATASTORE_TPOOL_STACK (1024 * 256)
+#define BLADE_DATASTORE_TPOOL_IDLE 10
+
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP);
-KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout);
+KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds);
+
KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length);
KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
--- /dev/null
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _BLADE_DIRECTORY_H_
+#define _BLADE_DIRECTORY_H_
+#include <blade.h>
+
+#define BLADE_DIRECTORY_TPOOL_MIN 2
+#define BLADE_DIRECTORY_TPOOL_MAX 8
+#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256)
+#define BLADE_DIRECTORY_TPOOL_IDLE 10
+
+KS_BEGIN_EXTERN_C
+KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP);
+KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd);
+KS_END_EXTERN_C
+
+#endif
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
#define BLADE_PEER_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
-KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid);
+KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
-KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp);
-KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port);
-KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
-KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout);
+KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws);
+KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
KS_END_EXTERN_C
#endif
--- /dev/null
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _BLADE_SERVICE_H_
+#define _BLADE_SERVICE_H_
+#include <blade.h>
+
+#define BLADE_SERVICE_TPOOL_MIN 2
+#define BLADE_SERVICE_TPOOL_MAX 8
+#define BLADE_SERVICE_TPOOL_STACK (1024 * 256)
+#define BLADE_SERVICE_TPOOL_IDLE 10
+
+KS_BEGIN_EXTERN_C
+KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
+KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
+KS_END_EXTERN_C
+
+#endif
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP);
-KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid);
-KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer);
-KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port);
-KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint);
-KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout);
-KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh);
+KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool);
+KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
+KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
+
+KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
+KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh);
+
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
blade_datastore_fetch_callback_t callback,
typedef struct blade_handle_s blade_handle_t;
typedef struct blade_peer_s blade_peer_t;
+typedef struct blade_service_s blade_service_t;
typedef struct blade_datastore_s blade_datastore_t;
+typedef struct blade_directory_s blade_directory_t;
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);
void command_test(blade_handle_t *bh, char *args);
void command_quit(blade_handle_t *bh, char *args);
-void command_myid(blade_handle_t *bh, char *args);
-void command_bind(blade_handle_t *bh, char *args);
void command_store(blade_handle_t *bh, char *args);
void command_fetch(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "test", command_test },
{ "quit", command_quit },
- { "myid", command_myid },
- { "bind", command_bind },
{ "store", command_store },
{ "fetch", command_fetch },
int main(int argc, char **argv)
{
blade_handle_t *bh = NULL;
- const char *nodeid;
-
- ks_assert(argc >= 2);
-
- nodeid = argv[1];
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
blade_init();
- blade_handle_create(&bh, NULL, NULL, nodeid);
-
- blade_handle_autoroute(bh, KS_TRUE, KS_DHT_DEFAULT_PORT);
+ blade_handle_create(&bh, NULL, NULL);
loop(bh);
// @todo lines must not exceed 512 bytes, treat as error and ignore buffer until next new line?
ks_assert(0);
}
- blade_handle_pulse(bh, 1);
+ blade_handle_pulse(bh);
}
}
g_shutdown = KS_TRUE;
}
-void command_myid(blade_handle_t *bh, char *args)
-{
- char buf[KS_DHT_NODEID_SIZE * 2 + 1];
-
- ks_assert(bh);
- ks_assert(args);
-
- blade_handle_myid(bh, buf);
-
- ks_log(KS_LOG_INFO, "%s\n", buf);
-}
-
-void command_bind(blade_handle_t *bh, char *args)
-{
- char *ip = NULL;
- char *port = NULL;
- ks_port_t p;
-
- ks_assert(args);
-
- parse_argument(&args, &ip, ' ');
- parse_argument(&args, &port, ' ');
-
- p = atoi(port); // @todo use strtol for error handling
-
- blade_handle_bind(bh, ip, p, NULL);
-}
-
void command_store(blade_handle_t *bh, char *args)
{
char *key;
ks_assert(args);
- blade_handle_datastore_start(bh);
+ blade_handle_datastore_startup(bh, NULL);
parse_argument(&args, &key, ' ');
parse_argument(&args, &data, ' ');
ks_assert(args);
- blade_handle_datastore_start(bh);
+ blade_handle_datastore_startup(bh, NULL);
parse_argument(&args, &key, ' ');