struct blade_datastore_s {
bdspvt_flag_t flags;
ks_pool_t *pool;
+ unqlite *db;
};
+struct blade_datastore_fetch_userdata_s
+{
+ blade_datastore_t *bds;
+ blade_datastore_fetch_callback_t callback;
+ void *userdata;
+};
+typedef struct blade_datastore_fetch_userdata_s blade_datastore_fetch_userdata_t;
+
+
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
{
flags = bds->flags;
pool = bds->pool;
+ if (bds->db) {
+ unqlite_close(bds->db);
+ bds->db = NULL;
+ }
+
ks_pool_free(bds->pool, &bds);
if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool);
bds->pool = pool;
*bdsP = bds;
+ if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
+ const char *errbuf = NULL;
+ blade_datastore_error(bds, &errbuf, NULL);
+ ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+ return KS_STATUS_FAIL;
+ }
+
+ // @todo unqlite_lib_config(UNQLITE_LIB_CONFIG_MEM_ERR_CALLBACK)
+
+ // @todo VM init if document store is used (and output consumer callback)
+
return KS_STATUS_SUCCESS;
}
ks_assert(timeout >= 0);
}
+KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length)
+{
+ ks_assert(bds);
+ ks_assert(bds->db);
+ ks_assert(buffer);
+
+ unqlite_config(bds->db, UNQLITE_CONFIG_ERR_LOG, buffer, buffer_length);
+}
+
+KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length)
+{
+ int32_t rc;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(bds);
+ ks_assert(bds->db);
+ ks_assert(key);
+ ks_assert(key_length > 0);
+ ks_assert(data);
+ ks_assert(data_length > 0);
+
+ rc = unqlite_begin(bds->db);
+
+ if (rc != UNQLITE_OK) {
+ if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
+ else {
+ const char *errbuf;
+ blade_datastore_error(bds, &errbuf, NULL);
+ ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+
+ ret = KS_STATUS_FAIL;
+ }
+ } else if (unqlite_kv_store(bds->db, key, key_length, data, data_length) == UNQLITE_OK) unqlite_commit(bds->db);
+ else unqlite_rollback(bds->db);
+
+ return ret;
+}
+
+int blade_datastore_fetch_callback(const void *data, unsigned int data_length, void *userdata)
+{
+ int rc = UNQLITE_OK;
+ blade_datastore_fetch_userdata_t *ud = NULL;
+
+ ks_assert(data);
+ ks_assert(data_length > 0);
+ ks_assert(userdata);
+
+ ud = (blade_datastore_fetch_userdata_t *)userdata;
+ if (!ud->callback(ud->bds, data, data_length, ud->userdata)) rc = UNQLITE_ABORT;
+
+ return rc;
+}
+
+KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
+ blade_datastore_fetch_callback_t callback,
+ const void *key,
+ int32_t key_length,
+ void *userdata)
+{
+ int32_t rc;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ blade_datastore_fetch_userdata_t ud;
+
+ ks_assert(bds);
+ ks_assert(bds->db);
+ ks_assert(callback);
+ ks_assert(key);
+ ks_assert(key_length > 0);
+
+ ud.bds = bds;
+ ud.callback = callback;
+ ud.userdata = userdata;
+
+ rc = unqlite_kv_fetch_callback(bds->db, key, key_length, blade_datastore_fetch_callback, &ud);
+
+ if (rc != UNQLITE_OK) {
+ if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
+ else {
+ const char *errbuf;
+ blade_datastore_error(bds, &errbuf, NULL);
+ ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
+
+ ret = KS_STATUS_FAIL;
+ }
+ }
+
+ return ret;
+}
+
/* For Emacs:
* Local Variables:
* mode:c
blade_datastore_create(&bh->datastore, bh->pool);
}
+KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
+{
+ ks_assert(bh);
+ ks_assert(bh->datastore);
+ ks_assert(key);
+ ks_assert(key_length > 0);
+ ks_assert(data);
+ ks_assert(data_length > 0);
+
+ return blade_datastore_store(bh->datastore, key, key_length, data, data_length);
+}
+
+KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
+ blade_datastore_fetch_callback_t callback,
+ const void *key,
+ int32_t key_length,
+ void *userdata)
+{
+ ks_assert(bh);
+ ks_assert(bh->datastore);
+ ks_assert(callback);
+ ks_assert(key);
+ ks_assert(key_length > 0);
+
+ return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
+}
+
/* For Emacs:
* Local Variables:
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool);
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP);
KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout);
+KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length);
+KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length);
+KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
+ blade_datastore_fetch_callback_t callback,
+ const void *key,
+ int32_t key_length,
+ void *userdata);
KS_END_EXTERN_C
#endif
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout);
+KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh);
+KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
+KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
+ blade_datastore_fetch_callback_t callback,
+ const void *key,
+ int32_t key_length,
+ void *userdata);
KS_END_EXTERN_C
#endif
typedef struct blade_peer_s blade_peer_t;
typedef struct blade_datastore_s blade_datastore_t;
+typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);
+
KS_END_EXTERN_C
#endif
void command_quit(blade_handle_t *bh, char *args);
void command_myid(blade_handle_t *bh, char *args);
void command_bind(blade_handle_t *bh, char *args);
+void command_store(blade_handle_t *bh, char *args);
+void command_fetch(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "test", command_test },
{ "quit", command_quit },
{ "myid", command_myid },
{ "bind", command_bind },
+ { "store", command_store },
+ { "fetch", command_fetch },
{ NULL, NULL }
};
blade_handle_bind(bh, ip, p, NULL);
}
+
+void command_store(blade_handle_t *bh, char *args)
+{
+ char *key;
+ char *data;
+
+ ks_assert(args);
+
+ blade_handle_datastore_start(bh);
+
+ parse_argument(&args, &key, ' ');
+ parse_argument(&args, &data, ' ');
+
+ blade_handle_datastore_store(bh, key, strlen(key), data, strlen(data) + 1);
+}
+
+ks_bool_t blade_datastore_fetch_callback(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata)
+{
+ ks_log(KS_LOG_INFO, "%s\n", data);
+ return KS_TRUE;
+}
+
+void command_fetch(blade_handle_t *bh, char *args)
+{
+ char *key;
+
+ ks_assert(args);
+
+ blade_handle_datastore_start(bh);
+
+ parse_argument(&args, &key, ' ');
+
+ blade_handle_datastore_fetch(bh, blade_datastore_fetch_callback, key, strlen(key), bh);
+}