#include "ks_dht-int.h"
#include "sodium.h"
+void ks_dht_endpoint_destructor(void *ptr) { ks_dht_endpoint_destroy((ks_dht_endpoint_t **)&ptr); }
+
+void ks_dht_transaction_destructor(void *ptr) { ks_dht_transaction_destroy((ks_dht_transaction_t **)&ptr); }
+
+void ks_dht_storageitem_destructor(void *ptr) { ks_dht_storageitem_destroy((ks_dht_storageitem_t **)&ptr); }
+
KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
ks_bool_t pool_alloc = !pool;
/**
* Create the message type registry.
*/
- ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_hash_create(&d->registry_type, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->registry_type);
/**
/**
* Create the message query registry.
*/
- ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_hash_create(&d->registry_query, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->registry_query);
/**
/**
* Create the message error registry.
*/
- ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_hash_create(&d->registry_error, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->registry_error);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
* The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
*/
d->endpoints = NULL;
+ d->endpoints_length = 0;
d->endpoints_size = 0;
d->endpoints_poll = NULL;
* This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
* This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
*/
- ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool);
+ ks_hash_create_ex(&d->endpoints_hash,
+ 2,
+ NULL,
+ NULL,
+ KS_HASH_MODE_CASE_INSENSITIVE,
+ KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
+ ks_dht_endpoint_destructor,
+ d->pool);
ks_assert(d->endpoints_hash);
/**
- * Default expirations to not be checked for one pulse.
+ * Default transactions expirations to not be checked for one pulse.
*/
- d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
+ d->transactions_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC);
/**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
/**
* Initialize the transaction id mutex, should use atomic increment instead
*/
- ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
- ks_assert(d->tid_mutex);
+ ks_mutex_create(&d->transactionid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+ ks_assert(d->transactionid_mutex);
/**
* Initialize the first transaction id randomly, this doesn't really matter.
* Create the hash to track pending transactions on queries that are pending responses.
* It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
*/
- ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool);
+ ks_hash_create_ex(&d->transactions_hash,
+ 16,
+ NULL,
+ NULL,
+ KS_HASH_MODE_INT,
+ KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
+ ks_dht_transaction_destructor,
+ d->pool);
ks_assert(d->transactions_hash);
/**
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_current = d->token_secret_previous = rand();
- d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
+ d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
/**
* Create the hash to store arbitrary data for BEP44.
*/
- ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_hash_create_ex(&d->storageitems_hash,
+ 16,
+ NULL,
+ NULL,
+ KS_HASH_MODE_ARBITRARY,
+ KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
+ ks_dht_storageitem_destructor,
+ d->pool);
ks_assert(d->storageitems_hash);
/**
ks_dht_t *d = NULL;
ks_pool_t *pool = NULL;
ks_bool_t pool_alloc = KS_FALSE;
- ks_hash_iterator_t *it = NULL;
ks_assert(dht);
ks_assert(*dht);
/**
* Cleanup the storageitems hash and it's contents if it is allocated.
*/
- if (d->storageitems_hash) {
- for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *key = NULL;
- ks_dht_storageitem_t *val = NULL;
- ks_hash_this(it, &key, NULL, (void **)&val);
- ks_dht_storageitem_destroy(&val);
- }
- ks_hash_destroy(&d->storageitems_hash);
- }
+ if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
/**
* Zero out the opaque write token variables.
/**
* Cleanup the route tables if they are allocated.
- * @todo check if endpoints need to be destroyed first to release the readlock on their node
*/
if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
* Cleanup the transactions mutex and hash if they are allocated.
*/
d->transactionid_next = 0;
- if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex);
+ if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
/**
/**
* Probably don't need this
*/
- d->pulse_expirations = 0;
+ d->transactions_pulse = 0;
- /**
- * Cleanup any endpoints that have been allocated.
- */
- for (int32_t i = 0; i < d->endpoints_size; ++i) {
- ks_dht_endpoint_t *ep = d->endpoints[i];
- ks_dht_endpoint_destroy(&ep);
- }
+ d->endpoints_length = 0;
d->endpoints_size = 0;
/**
if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
/**
- * Cleanup the endpoints hash if it is allocated.
+ * Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated.
*/
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
/**
* Check if the endpoint has already been bound for the address we want to route through.
*/
- ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
- if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
+ ks_hash_read_lock(dht->endpoints_hash);
+ ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED);
+ ks_hash_read_unlock(dht->endpoints_hash);
/**
* If the endpoint has not been bound, and autorouting is enabled then try to bind the new address.
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
+KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
- return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
+ ks_hash_write_lock(dht->registry_type);
+ ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
+ ks_hash_write_unlock(dht->registry_type);
}
-KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
+KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
- return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
+ ks_hash_write_lock(dht->registry_query);
+ ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
+ ks_hash_write_unlock(dht->registry_query);
}
-KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
+KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
- return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
+ ks_hash_write_lock(dht->registry_error);
+ ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
+ ks_hash_write_unlock(dht->registry_error);
}
KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
{
- ks_dht_endpoint_t *ep = NULL;
ks_socket_t sock = KS_SOCK_INVALID;
+ ks_dht_endpoint_t *ep = NULL;
int32_t epindex = 0;
ks_status_t ret = KS_STATUS_SUCCESS;
*/
if (endpoint) *endpoint = NULL;
- ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
- ks_hash_read_unlock(dht->endpoints_hash);
- if (ep) {
+ ks_hash_write_lock(dht->endpoints_hash);
+
+ if (ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_UNLOCKED)) {
ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
- return KS_STATUS_FAIL;
+ ret = KS_STATUS_FAIL;
+ goto done;
}
/**
* Attempt to open a UDP datagram socket for the given address family.
*/
- if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
+ if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
/**
* Set some common socket options for non-blocking IO and forced binding when already in use
ks_assert(ep);
/**
- * Resize the endpoints array to take another endpoint pointer.
+ * Add the new endpoint into the endpoints hash for quick lookups.
*/
- epindex = dht->endpoints_size++;
- dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
- (void *)dht->endpoints,
- sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
- ks_assert(dht->endpoints);
- dht->endpoints[epindex] = ep;
+ ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
/**
- * Add the new endpoint into the endpoints hash for quick lookups.
- * @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed
+ * Resize the endpoints array to take another endpoint pointer.
*/
- if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done;
-
+ epindex = dht->endpoints_length++;
+ if (dht->endpoints_length > dht->endpoints_size) {
+ dht->endpoints_size = dht->endpoints_length;
+ dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
+ (void *)dht->endpoints,
+ sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
+ ks_assert(dht->endpoints);
+ /**
+ * Resize the endpoints_poll array to keep in parallel with endpoints array.
+ */
+ dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
+ (void *)dht->endpoints_poll,
+ sizeof(struct pollfd) * dht->endpoints_size);
+ ks_assert(dht->endpoints_poll);
+ }
/**
- * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
+ * Populate the new endpoint data
*/
- dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
- (void *)dht->endpoints_poll,
- sizeof(struct pollfd) * dht->endpoints_size);
- ks_assert(dht->endpoints_poll);
+ dht->endpoints[epindex] = ep;
dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
+
/**
* If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
*/
*/
if (ep) {
ks_hash_remove(dht->endpoints_hash, ep->addr.host);
- ks_dht_endpoint_destroy(&ep);
+ dht->endpoints_length--;
}
else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock);
if (endpoint) *endpoint = NULL;
}
+ ks_hash_write_unlock(dht->endpoints_hash);
return ret;
}
ks_sockaddr_t raddr;
ks_assert(dht);
- ks_assert(timeout > 0);
+ ks_assert(timeout >= 0 && timeout <= 1000);
+ // this should be called with a timeout of less than 1000ms, preferrably around 100ms
if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
- // @todo confirm how poll/wsapoll react to zero size and NULL array
- if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) {
- for (int32_t i = 0; i < dht->endpoints_size; ++i) {
+ if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) {
+ for (int32_t i = 0; i < dht->endpoints_length; ++i) {
if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
raddr = (const ks_sockaddr_t){ 0 };
}
}
+ if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
+ if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
+
+ ks_dht_pulse_searches(dht);
+
+ // @todo pulse_storageitems for keepalive and expiration
+ // hold keepalive counter on items to determine what to reannounce vs expire
+
ks_dht_pulse_jobs(dht);
ks_dht_pulse_send(dht);
- ks_dht_pulse_expirations(dht);
+ ks_dht_pulse_transactions(dht);
- if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
- if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
+ ks_dht_pulse_tokens(dht);
}
-KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
+KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht)
{
- ks_hash_iterator_t *it = NULL;
ks_dht_search_t *searches_first = NULL;
ks_dht_search_t *searches_last = NULL;
- ks_time_t now = ks_time_now();
-
+
ks_assert(dht);
- if (dht->pulse_expirations > now) return;
- dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
-
- ks_hash_write_lock(dht->transactions_hash);
- for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *key = NULL;
- ks_dht_transaction_t *value = NULL;
- ks_bool_t remove = KS_FALSE;
-
- ks_hash_this(it, &key, NULL, (void **)&value);
- if (value->finished) remove = KS_TRUE;
- else if (value->expiration <= now) {
- // if the transaction expires, so does the attached job, but the job may try again with a new transaction
- value->job->state = KS_DHT_JOB_STATE_EXPIRING;
- ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
- remove = KS_TRUE;
- }
- if (remove) {
- ks_hash_remove(dht->transactions_hash, (void *)key);
- ks_dht_transaction_destroy(&value);
- }
- }
- ks_hash_write_unlock(dht->transactions_hash);
-
ks_mutex_lock(dht->searches_mutex);
for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
ks_bool_t done = KS_FALSE;
if (search->callback) search->callback(dht, search);
ks_dht_search_destroy(&search);
}
+}
- if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
- dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
- dht->token_secret_previous = dht->token_secret_current;
- dht->token_secret_current = rand();
+KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
+{
+ ks_dht_job_t *first = NULL;
+ ks_dht_job_t *last = NULL;
+
+ ks_assert(dht);
+
+ ks_mutex_lock(dht->jobs_mutex);
+ for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
+ ks_bool_t done = KS_FALSE;
+ jobn = job->next;
+
+ if (job->state == KS_DHT_JOB_STATE_QUERYING) {
+ job->state = KS_DHT_JOB_STATE_RESPONDING;
+ if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
+ }
+ if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
+ job->attempts--;
+ if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
+ else done = KS_TRUE;
+ }
+ if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
+
+ if (done) {
+ if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
+ else if (!jobp) dht->jobs_first = jobn;
+ else if (!jobn) {
+ dht->jobs_last = jobp;
+ dht->jobs_last->next = NULL;
+ }
+ else jobp->next = jobn;
+
+ job->next = NULL;
+ if (last) last = last->next = job;
+ else first = last = job;
+ } else jobp = job;
}
+ ks_mutex_unlock(dht->jobs_mutex);
- // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
+ for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
+ jobn = job->next;
+ // this cannot occur inside of the main loop, may add new jobs invalidating list pointers
+ if (job->finish_callback) job->finish_callback(dht, job);
+ ks_dht_job_destroy(&job);
+ }
}
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
}
}
+KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht)
+{
+ ks_hash_iterator_t *it = NULL;
+ ks_time_t now = ks_time_now();
+
+ ks_assert(dht);
+
+ if (dht->transactions_pulse > now) return;
+ dht->transactions_pulse = now + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC);
+
+ ks_hash_write_lock(dht->transactions_hash);
+ for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *key = NULL;
+ ks_dht_transaction_t *value = NULL;
+ ks_bool_t remove = KS_FALSE;
+
+ ks_hash_this(it, &key, NULL, (void **)&value);
+ if (value->finished) remove = KS_TRUE;
+ else if (value->expiration <= now) {
+ // if the transaction expires, so does the attached job, but the job may try again with a new transaction
+ value->job->state = KS_DHT_JOB_STATE_EXPIRING;
+ ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+ remove = KS_TRUE;
+ }
+ if (remove) ks_hash_remove(dht->transactions_hash, (void *)key);
+ }
+ ks_hash_write_unlock(dht->transactions_hash);
+}
+
+KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht)
+{
+ ks_time_t now = ks_time_now();
+
+ ks_assert(dht);
+
+ if (dht->tokens_pulse > now) return;
+ dht->tokens_pulse = now + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC);
+
+ if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
+ dht->token_secret_expiration = now + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
+ dht->token_secret_previous = dht->token_secret_current;
+ dht->token_secret_current = rand();
+ }
+}
+
KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
{
char *t = buffer;
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
- // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
- // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM;
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
- // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_NO_MEM;
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
- // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE);
if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
- // @todo atomic increment
- ks_mutex_lock(dht->tid_mutex);
+ ks_mutex_lock(dht->transactionid_mutex);
transactionid = dht->transactionid_next++;
- ks_mutex_unlock(dht->tid_mutex);
+ ks_mutex_unlock(dht->transactionid_mutex);
if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
*message = msg;
- if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_write_lock(dht->transactions_hash);
+ ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans);
+ ks_hash_write_unlock(dht->transactions_hash);
if (transaction) *transaction = trans;
*message = NULL;
- if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
+ if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ ep,
+ raddr,
+ transactionid,
+ transactionid_length,
+ 202,
+ "Internal message create error");
+ goto done;
+ }
- //if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done;
- callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_READLOCKED);
+ ks_hash_read_lock(datagram->dht->registry_type);
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_UNLOCKED);
ks_hash_read_unlock(datagram->dht->registry_type);
if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message->type);
q = ben_dict_get_by_str(message->data, "q");
if (!q) {
ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
- return KS_STATUS_FAIL;
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query missing required key 'q'");
+ ret = KS_STATUS_FAIL;
+ goto done;
}
qv = ben_str_val(q);
qv_len = ben_str_len(q);
if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len);
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query 'q' value is too large");
ret = KS_STATUS_FAIL;
goto done;
}
a = ben_dict_get_by_str(message->data, "a");
if (!a) {
ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n");
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query missing required key 'a'");
ret = KS_STATUS_FAIL;
goto done;
}
message->args = a;
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query args missing required key 'id'");
+ goto done;
+ }
message->args_id = *id;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
message->raddr.host,
message->raddr.port,
KS_DHTRT_CREATE_PING,
- &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
- callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
+ &node)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal route table create node error");
+ goto done;
+ }
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal route table release node error");
+ goto done;
+ }
+
+ ks_hash_read_lock(dht->registry_query);
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED);
ks_hash_read_unlock(dht->registry_query);
- if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+ if (!callback) {
+ ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 204,
+ "Message query method is not registered");
+ }
else ret = callback(dht, message);
done:
transactionid = ntohl(*tid);
ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid);
- transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+
+ ks_hash_read_lock(dht->transactions_hash);
+ transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
results_index);
- // @todo add lock on node
+
+ if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]);
job->search->results[results_index] = node;
job->search->distances[results_index] = distance;
ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
+
+ ks_dhtrt_sharelock_node(node);
if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
}
ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
+
+ ks_dhtrt_sharelock_node(n);
- if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_release_querynodes(&query);
+ goto done;
+ }
}
- //ks_dhtrt_release_querynodes(&query);
+ ks_dhtrt_release_querynodes(&query);
ks_mutex_unlock(s->mutex);
locked_search = KS_FALSE;
tid = (uint32_t *)message->transactionid;
transactionid = ntohl(*tid);
- transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+ ks_hash_read_lock(dht->transactions_hash);
+ transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) {
transaction->finished = KS_TRUE;
- callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
+ ks_hash_read_lock(dht->registry_error);
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED);
ks_hash_read_unlock(dht->registry_error);
if (callback) ret = callback(dht, message);
ks_mutex_unlock(dht->jobs_mutex);
}
-KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
-{
- ks_dht_job_t *first = NULL;
- ks_dht_job_t *last = NULL;
-
- ks_assert(dht);
-
- ks_mutex_lock(dht->jobs_mutex);
- for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
- ks_bool_t done = KS_FALSE;
- jobn = job->next;
-
- if (job->state == KS_DHT_JOB_STATE_QUERYING) {
- job->state = KS_DHT_JOB_STATE_RESPONDING;
- if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
- }
- if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
- job->attempts--;
- if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
- else done = KS_TRUE;
- }
- if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
-
- if (done) {
- if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
- else if (!jobp) dht->jobs_first = jobn;
- else if (!jobn) {
- dht->jobs_last = jobp;
- dht->jobs_last->next = NULL;
- }
- else jobp->next = jobn;
-
- job->next = NULL;
- if (last) last = last->next = job;
- else first = last = job;
- } else jobp = job;
- }
- ks_mutex_unlock(dht->jobs_mutex);
-
- for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
- jobn = job->next;
- // this cannot occur inside of the main loop, may add new jobs invalidating list pointers
- if (job->finish_callback) job->finish_callback(dht, job);
- ks_dht_job_destroy(&job);
- }
-}
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
{
ks_assert(message);
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query findnode args missing required key 'target'");
+ goto done;
+ }
want = ben_dict_get_by_str(message->args, "want");
if (want) {
- // @todo use ben_list_for_each
size_t want_len = ben_list_len(want);
for (size_t i = 0; i < want_len; ++i) {
struct bencode *iv = ben_list_get(want, i);
query.nodeid = *target;
query.type = KS_DHT_REMOTE;
- query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
+ query.max = 8; // @todo should be like KS_DHTRT_BUCKET_SIZE
if (want4) {
query.family = AF_INET;
ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
&qn->addr,
buffer4,
&buffer4_length,
- sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
+ sizeof(buffer4))) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_release_querynodes(&query);
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal compact v4 nodeinfo error");
+ goto done;
+ }
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
&qn->addr,
buffer6,
&buffer6_length,
- sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
+ sizeof(buffer6))) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_release_querynodes(&query);
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal compact v6 nodeinfo error");
+ goto done;
+ }
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
+ ks_dht_storageitem_t *item = NULL;
ks_assert(dht);
ks_assert(job);
&message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
+ ks_hash_read_lock(dht->storageitems_hash);
+ item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+ ks_hash_read_unlock(dht->storageitems_hash);
+
+ if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
//ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_assert(message);
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query get args missing required key 'target'");
+ goto done;
+ }
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
&qn->addr,
buffer4,
&buffer4_length,
- sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
+ sizeof(buffer4))) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_release_querynodes(&query);
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal compact v4 nodeinfo error");
+ goto done;
+ }
ks_log(KS_LOG_DEBUG,
"Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
&qn->addr,
buffer6,
&buffer6_length,
- sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
+ sizeof(buffer6))) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_release_querynodes(&query);
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal compact v6 nodeinfo error");
+ goto done;
+ }
ks_log(KS_LOG_DEBUG,
"Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
sequence,
sig)) != KS_STATUS_SUCCESS) goto done;
}
- if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
- item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED);
+ if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
} else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
if (item) job->response_storageitem = item;
else if (olditem) job->response_storageitem = olditem;
done:
- if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
- if (item) ks_dht_storageitem_destroy(&item);
}
+ if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
return ret;
}
-// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers
// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query put args missing required key 'token'");
+ goto done;
+ }
- if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query put 'k' is malformed");
+ goto done;
+ }
+ if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query put 'sig' is malformed");
+ goto done;
+ }
salt = ben_dict_get_by_str(message->args, "salt");
+ if (salt && ben_str_len(salt) > KS_DHT_STORAGEITEM_SALT_MAX_SIZE) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 207,
+ "Message query put 'salt' is too large");
+ goto done;
+ }
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
if (seq && (!k || !sig)) {
ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n");
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query put for mutable data must include both 'k' and 'sig'");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
v = ben_dict_get_by_str(message->args, "v");
if (!v) {
ks_log(KS_LOG_DEBUG, "Must provide v\n");
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query put args missing required key 'v'");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
if (!seq) {
// immutable
- if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal storage item target immutable error");
+ goto done;
+ }
} else {
// mutable
- if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal storage item target mutable error");
+ goto done;
+ }
}
+
+ ks_hash_write_lock(dht->storageitems_hash);
+ storageitems_locked = KS_TRUE;
+
olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
ks_log(KS_LOG_DEBUG, "Invalid token\n");
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 203,
+ "Message query put token is invalid");
ret = KS_STATUS_FAIL;
goto done;
}
//ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
- ks_hash_write_lock(dht->storageitems_hash);
- storageitems_locked = KS_TRUE;
if (!seq) {
// immutable
dht->pool,
&target,
v,
- KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ KS_TRUE)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal storage item create immutable error");
+ goto done;
+ }
} else {
// mutable
if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) {
ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 206,
+ "Message query put signature is invalid");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) {
if (cas && olditem->seq != cas_seq) {
- // @todo send 301 error instead of the response
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 301,
+ "Message query put cas mismatch");
goto done;
}
if (olditem->seq > sequence) {
- // @todo send 302 error instead of the response
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 302,
+ "Message query put sequence is less than current");
goto done;
}
if (olditem->seq == sequence) {
if (ben_cmp(olditem->v, v) != 0) {
- // @todo send 201? error instead of the response
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 201,
+ "Message query put sequence is equal to current but values are different");
goto done;
}
} else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
salt,
KS_TRUE,
sequence,
- sig)) != KS_STATUS_SUCCESS) goto done;
+ sig)) != KS_STATUS_SUCCESS) {
+ ks_dht_error(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ 202,
+ "Internal storage item create mutable error");
+ goto done;
+ }
}
- if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
+ if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
if ((ret = ks_dht_response_setup(dht,
message->endpoint,
ks_q_push(dht->send_q, (void *)response);
done:
- if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
- if (item) ks_dht_storageitem_destroy(&item);
+ if (item) ks_hash_remove(dht->storageitems_hash, item->id.id);
}
+ if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
return ret;
}