]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Some cleanup and bug fixes in DHT, switched to using hash destructors, and...
authorShane Bryldt <astaelan@gmail.com>
Wed, 28 Dec 2016 15:18:38 +0000 (15:18 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_search.c

index fc71cddacf03400268de5e88359e3f5e8a71c62e..1c33b914788a41aa2421c0f7099dedee5641aa81 100644 (file)
@@ -22,11 +22,18 @@ KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
 
 /**
- * Called internally to expire various data.
- * Handles purging of expired and finished transactions, rotating token secrets, etc.
+ * Called internally to expire search data.
+ * Handles completing and purging of finished searches.
  * @param dht pointer to the dht instance
  */
-KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
+KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht);
+
+/**
+ * Called internally to process job state machine.
+ * Handles completing and purging of finished jobs.
+ * @param dht pointer to the dht instance
+ */
+KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht);
 
 /**
  * Called internally to send queued messages.
@@ -35,6 +42,20 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
  */
 KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
 
+/**
+ * Called internally to expire transactions.
+ * Handles purging of expired and finished transactions.
+ * @param dht pointer to the dht instance
+ */
+KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht);
+
+/**
+ * Called internally to expire and cycle tokens.
+ * Handles cycling new secret entropy for token generation.
+ * @param dht pointer to the dht instance
+ */
+KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht);
+
 /**
  * Converts a ks_dht_nodeid_t into it's hex string representation.
  * @param id pointer to the nodeid
index 056584943e690ce53ae9529116fc1667828aa04b..8392ba4017078a6e08c533f5f61e3dba378cb588 100644 (file)
@@ -2,6 +2,12 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
+void ks_dht_endpoint_destructor(void *ptr) { ks_dht_endpoint_destroy((ks_dht_endpoint_t **)&ptr); }
+
+void ks_dht_transaction_destructor(void *ptr) { ks_dht_transaction_destroy((ks_dht_transaction_t **)&ptr); }
+
+void ks_dht_storageitem_destructor(void *ptr) { ks_dht_storageitem_destroy((ks_dht_storageitem_t **)&ptr); }
+
 KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool)
 {
        ks_bool_t pool_alloc = !pool;
@@ -52,7 +58,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the message type registry.
         */
-       ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_hash_create(&d->registry_type, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
        ks_assert(d->registry_type);
 
        /**
@@ -65,7 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the message query registry.
         */
-       ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_hash_create(&d->registry_query, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
        ks_assert(d->registry_query);
 
        /**
@@ -79,7 +85,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the message error registry.
         */
-       ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_hash_create(&d->registry_error, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
        ks_assert(d->registry_error);
        // @todo register 301 error for internal get/put CAS hash mismatch retry handler
 
@@ -88,6 +94,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
         */
        d->endpoints = NULL;
+       d->endpoints_length = 0;
        d->endpoints_size = 0;
        d->endpoints_poll = NULL;
 
@@ -96,13 +103,20 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
         * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
         */
-       ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool);
+       ks_hash_create_ex(&d->endpoints_hash,
+                                         2,
+                                         NULL,
+                                         NULL,
+                                         KS_HASH_MODE_CASE_INSENSITIVE,
+                                         KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
+                                         ks_dht_endpoint_destructor,
+                                         d->pool);
        ks_assert(d->endpoints_hash);
 
        /**
-        * Default expirations to not be checked for one pulse.
+        * Default transactions expirations to not be checked for one pulse.
         */
-       d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
+       d->transactions_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC);
 
        /**
         * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
@@ -132,8 +146,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Initialize the transaction id mutex, should use atomic increment instead
         */
-       ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
-       ks_assert(d->tid_mutex);
+       ks_mutex_create(&d->transactionid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+       ks_assert(d->transactionid_mutex);
 
        /**
         * Initialize the first transaction id randomly, this doesn't really matter.
@@ -144,7 +158,14 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * Create the hash to track pending transactions on queries that are pending responses.
         * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
         */
-       ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool);
+       ks_hash_create_ex(&d->transactions_hash,
+                                         16,
+                                         NULL,
+                                         NULL,
+                                         KS_HASH_MODE_INT,
+                                         KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
+                                         ks_dht_transaction_destructor,
+                                         d->pool);
        ks_assert(d->transactions_hash);
 
        /**
@@ -163,12 +184,19 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
         */
        d->token_secret_current = d->token_secret_previous = rand();
-       d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
+       d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
 
        /**
         * Create the hash to store arbitrary data for BEP44.
         */
-       ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_hash_create_ex(&d->storageitems_hash,
+                                         16,
+                                         NULL,
+                                         NULL,
+                                         KS_HASH_MODE_ARBITRARY,
+                                         KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK,
+                                         ks_dht_storageitem_destructor,
+                                         d->pool);
        ks_assert(d->storageitems_hash);
 
        /**
@@ -191,7 +219,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        ks_dht_t *d = NULL;
        ks_pool_t *pool = NULL;
        ks_bool_t pool_alloc = KS_FALSE;
-       ks_hash_iterator_t *it = NULL;
        
        ks_assert(dht);
        ks_assert(*dht);
@@ -201,15 +228,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        /**
         * Cleanup the storageitems hash and it's contents if it is allocated.
         */
-       if (d->storageitems_hash) {
-               for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       const void *key = NULL;
-                       ks_dht_storageitem_t *val = NULL;
-                       ks_hash_this(it, &key, NULL, (void **)&val);
-                       ks_dht_storageitem_destroy(&val);
-               }
-               ks_hash_destroy(&d->storageitems_hash);
-       }
+       if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
 
        /**
         * Zero out the opaque write token variables.
@@ -229,7 +248,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
 
        /**
         * Cleanup the route tables if they are allocated.
-        * @todo check if endpoints need to be destroyed first to release the readlock on their node
         */
        if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
        if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
@@ -238,7 +256,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
         * Cleanup the transactions mutex and hash if they are allocated.
         */
        d->transactionid_next = 0;
-       if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex);
+       if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
        if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
 
        /**
@@ -272,15 +290,9 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        /**
         * Probably don't need this
         */
-       d->pulse_expirations = 0;
+       d->transactions_pulse = 0;
 
-       /**
-        * Cleanup any endpoints that have been allocated.
-        */
-       for (int32_t i = 0; i < d->endpoints_size; ++i) {
-               ks_dht_endpoint_t *ep = d->endpoints[i];
-               ks_dht_endpoint_destroy(&ep);
-       }
+       d->endpoints_length = 0;
        d->endpoints_size = 0;
 
        /**
@@ -294,7 +306,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
 
        /**
-        * Cleanup the endpoints hash if it is allocated.
+        * Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated.
         */
        if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
 
@@ -383,8 +395,9 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_
        /**
         * Check if the endpoint has already been bound for the address we want to route through.
         */
-       ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
-       if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
+       ks_hash_read_lock(dht->endpoints_hash);
+       ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED);
+       ks_hash_read_unlock(dht->endpoints_hash);
 
        /**
         * If the endpoint has not been bound, and autorouting is enabled then try to bind the new address.
@@ -411,38 +424,44 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
+KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
 {
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
+       ks_hash_write_lock(dht->registry_type);
+       ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
+       ks_hash_write_unlock(dht->registry_type);
 }
 
-KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
+KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
 {
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
+       ks_hash_write_lock(dht->registry_query);
+       ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
+       ks_hash_write_unlock(dht->registry_query);
 }
 
-KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
+KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
 {
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
+       ks_hash_write_lock(dht->registry_error);
+       ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
+       ks_hash_write_unlock(dht->registry_error);
 }
 
 
 KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
 {
-       ks_dht_endpoint_t *ep = NULL;
        ks_socket_t sock = KS_SOCK_INVALID;
+       ks_dht_endpoint_t *ep = NULL;
        int32_t epindex = 0;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
@@ -456,17 +475,21 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
         */
        if (endpoint) *endpoint = NULL;
 
-       ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
-       ks_hash_read_unlock(dht->endpoints_hash);
-       if (ep) {
+       ks_hash_write_lock(dht->endpoints_hash);
+
+       if (ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_UNLOCKED)) {
                ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
-               return KS_STATUS_FAIL;
+               ret = KS_STATUS_FAIL;
+               goto done;
        }
 
        /**
         * Attempt to open a UDP datagram socket for the given address family.
         */
-       if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
+       if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
 
        /**
         * Set some common socket options for non-blocking IO and forced binding when already in use
@@ -486,31 +509,36 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        ks_assert(ep);
 
        /**
-        * Resize the endpoints array to take another endpoint pointer.
+        * Add the new endpoint into the endpoints hash for quick lookups.
         */
-       epindex = dht->endpoints_size++;
-       dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
-                                                                                                                  (void *)dht->endpoints,
-                                                                                                                  sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
-       ks_assert(dht->endpoints);
-       dht->endpoints[epindex] = ep;
+       ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
 
        /**
-        * Add the new endpoint into the endpoints hash for quick lookups.
-        * @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed
+        * Resize the endpoints array to take another endpoint pointer.
         */
-       if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done;
-
+       epindex = dht->endpoints_length++;
+       if (dht->endpoints_length > dht->endpoints_size) {
+               dht->endpoints_size = dht->endpoints_length;
+               dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
+                                                                                                                         (void *)dht->endpoints,
+                                                                                                                         sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
+               ks_assert(dht->endpoints);
+               /**
+                * Resize the endpoints_poll array to keep in parallel with endpoints array.
+                */
+               dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
+                                                                                                                         (void *)dht->endpoints_poll,
+                                                                                                                         sizeof(struct pollfd) * dht->endpoints_size);
+               ks_assert(dht->endpoints_poll);
+       }
        /**
-        * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
+        * Populate the new endpoint data
         */
-       dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
-                                                                                                                 (void *)dht->endpoints_poll,
-                                                                                                                 sizeof(struct pollfd) * dht->endpoints_size);
-       ks_assert(dht->endpoints_poll);
+       dht->endpoints[epindex] = ep;
        dht->endpoints_poll[epindex].fd = ep->sock;
        dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
 
+
        /**
         * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
         */
@@ -551,12 +579,13 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
                 */
                if (ep) {
                        ks_hash_remove(dht->endpoints_hash, ep->addr.host);
-                       ks_dht_endpoint_destroy(&ep);
+                       dht->endpoints_length--;
                }
                else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock);
 
                if (endpoint) *endpoint = NULL;
        }
+       ks_hash_write_unlock(dht->endpoints_hash);
        return ret;
 }
 
@@ -566,13 +595,13 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
        ks_sockaddr_t raddr;
 
        ks_assert(dht);
-       ks_assert(timeout > 0);
+       ks_assert(timeout >= 0 && timeout <= 1000);
+       // this should be called with a timeout of less than 1000ms, preferrably around 100ms
 
        if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
 
-       // @todo confirm how poll/wsapoll react to zero size and NULL array
-       if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) {
-               for (int32_t i = 0; i < dht->endpoints_size; ++i) {
+       if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) {
+               for (int32_t i = 0; i < dht->endpoints_length; ++i) {
                        if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
 
                        raddr = (const ks_sockaddr_t){ 0 };
@@ -592,49 +621,30 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
                }
        }
 
+       if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
+       if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
+
+       ks_dht_pulse_searches(dht);
+
+       // @todo pulse_storageitems for keepalive and expiration
+       // hold keepalive counter on items to determine what to reannounce vs expire
+
        ks_dht_pulse_jobs(dht);
 
        ks_dht_pulse_send(dht);
 
-       ks_dht_pulse_expirations(dht);
+       ks_dht_pulse_transactions(dht);
 
-       if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
-       if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
+       ks_dht_pulse_tokens(dht);
 }
 
-KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
+KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht)
 {
-       ks_hash_iterator_t *it = NULL;
        ks_dht_search_t *searches_first = NULL;
        ks_dht_search_t *searches_last = NULL;
-       ks_time_t now = ks_time_now();
-
+       
        ks_assert(dht);
 
-       if (dht->pulse_expirations > now) return;
-       dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
-
-       ks_hash_write_lock(dht->transactions_hash);
-       for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-               const void *key = NULL;
-               ks_dht_transaction_t *value = NULL;
-               ks_bool_t remove = KS_FALSE;
-
-               ks_hash_this(it, &key, NULL, (void **)&value);
-               if (value->finished) remove = KS_TRUE;
-               else if (value->expiration <= now) {
-                       // if the transaction expires, so does the attached job, but the job may try again with a new transaction
-                       value->job->state = KS_DHT_JOB_STATE_EXPIRING;
-                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
-                       remove = KS_TRUE;
-               }
-               if (remove) {
-                       ks_hash_remove(dht->transactions_hash, (void *)key);
-                       ks_dht_transaction_destroy(&value);
-               }
-       }
-       ks_hash_write_unlock(dht->transactions_hash);
-
        ks_mutex_lock(dht->searches_mutex);
        for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
                ks_bool_t done = KS_FALSE;
@@ -665,14 +675,53 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
                if (search->callback) search->callback(dht, search);
                ks_dht_search_destroy(&search);
        }
+}
 
-       if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
-               dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
-               dht->token_secret_previous = dht->token_secret_current;
-               dht->token_secret_current = rand();
+KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
+{
+       ks_dht_job_t *first = NULL;
+       ks_dht_job_t *last = NULL;
+       
+       ks_assert(dht);
+
+       ks_mutex_lock(dht->jobs_mutex);
+       for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
+               ks_bool_t done = KS_FALSE;
+               jobn = job->next;
+
+               if (job->state == KS_DHT_JOB_STATE_QUERYING) {
+                       job->state = KS_DHT_JOB_STATE_RESPONDING;
+                       if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
+               }
+               if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
+                       job->attempts--;
+                       if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
+                       else done = KS_TRUE;
+               }
+               if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
+
+               if (done) {
+                       if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
+                       else if (!jobp) dht->jobs_first = jobn;
+                       else if (!jobn) {
+                               dht->jobs_last = jobp;
+                           dht->jobs_last->next = NULL;
+                       }
+                       else jobp->next = jobn;
+
+                       job->next = NULL;
+                       if (last) last = last->next = job;
+                       else first = last = job;
+               } else jobp = job;
        }
+       ks_mutex_unlock(dht->jobs_mutex);
 
-       // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
+       for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
+               jobn = job->next;
+               // this cannot occur inside of the main loop, may add new jobs invalidating list pointers
+               if (job->finish_callback) job->finish_callback(dht, job);
+               ks_dht_job_destroy(&job);
+       }
 }
 
 KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
@@ -698,6 +747,51 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
        }
 }
 
+KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht)
+{
+       ks_hash_iterator_t *it = NULL;
+       ks_time_t now = ks_time_now();
+
+       ks_assert(dht);
+
+       if (dht->transactions_pulse > now) return;
+       dht->transactions_pulse = now + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC);
+
+       ks_hash_write_lock(dht->transactions_hash);
+       for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const void *key = NULL;
+               ks_dht_transaction_t *value = NULL;
+               ks_bool_t remove = KS_FALSE;
+
+               ks_hash_this(it, &key, NULL, (void **)&value);
+               if (value->finished) remove = KS_TRUE;
+               else if (value->expiration <= now) {
+                       // if the transaction expires, so does the attached job, but the job may try again with a new transaction
+                       value->job->state = KS_DHT_JOB_STATE_EXPIRING;
+                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+                       remove = KS_TRUE;
+               }
+               if (remove) ks_hash_remove(dht->transactions_hash, (void *)key);
+       }
+       ks_hash_write_unlock(dht->transactions_hash);
+}
+
+KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht)
+{
+       ks_time_t now = ks_time_now();
+
+       ks_assert(dht);
+
+       if (dht->tokens_pulse > now) return;
+       dht->tokens_pulse = now + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC);
+
+       if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
+               dht->token_secret_expiration = now + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
+               dht->token_secret_previous = dht->token_secret_current;
+               dht->token_secret_current = rand();
+       }
+}
+
 KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
 {
        char *t = buffer;
@@ -736,8 +830,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *
        ks_assert(buffer_size);
        ks_assert(address->family == AF_INET || address->family == AF_INET6);
 
-       // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
        addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
        
        if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
@@ -775,8 +867,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
        ks_assert(address);
        ks_assert(address->family == AF_INET ||address->family == AF_INET6);
 
-       // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
        addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
        if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM;
 
@@ -800,8 +890,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n
        ks_assert(buffer_size);
        ks_assert(address->family == AF_INET || address->family == AF_INET6);
 
-       // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
        if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) {
                ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
                return KS_STATUS_NO_MEM;
@@ -825,8 +913,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
        ks_assert(address);
        ks_assert(address->family == AF_INET ||address->family == AF_INET6);
 
-       // @todo change parameters to dereferenced pointer and forward buffer pointer directly
-
        if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
 
        memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE);
@@ -1269,10 +1355,9 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
 
        if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
 
-    // @todo atomic increment
-       ks_mutex_lock(dht->tid_mutex);
+       ks_mutex_lock(dht->transactionid_mutex);
        transactionid = dht->transactionid_next++;
-       ks_mutex_unlock(dht->tid_mutex);
+       ks_mutex_unlock(dht->transactionid_mutex);
 
        if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
        if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
@@ -1295,7 +1380,9 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
 
        *message = msg;
 
-       if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_write_lock(dht->transactions_hash);
+       ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans);
+       ks_hash_write_unlock(dht->transactions_hash);
 
        if (transaction) *transaction = trans;
 
@@ -1327,11 +1414,19 @@ KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
 
        *message = NULL;
 
-       if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
+       if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        ep,
+                                        raddr,
+                                        transactionid,
+                                        transactionid_length,
+                                        202,
+                                        "Internal message create error");
+               goto done;
+       }
 
-       //if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
     ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
        ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
 
@@ -1381,7 +1476,8 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data)
 
        if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done;
 
-       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_READLOCKED);
+       ks_hash_read_lock(datagram->dht->registry_type);
+       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_UNLOCKED);
        ks_hash_read_unlock(datagram->dht->registry_type);
 
        if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message->type);
@@ -1413,13 +1509,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
     q = ben_dict_get_by_str(message->data, "q");
     if (!q) {
                ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
-               return KS_STATUS_FAIL;
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query missing required key 'q'");
+               ret = KS_STATUS_FAIL;
+               goto done;
        }
 
     qv = ben_str_val(q);
        qv_len = ben_str_len(q);
     if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
                ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len);
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query 'q' value is too large");
                ret = KS_STATUS_FAIL;
                goto done;
        }
@@ -1431,13 +1542,29 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
        a = ben_dict_get_by_str(message->data, "a");
        if (!a) {
                ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n");
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query missing required key 'a'");
                ret = KS_STATUS_FAIL;
                goto done;
        }
 
        message->args = a;
 
-    if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+    if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query args missing required key 'id'");
+               goto done;
+       }
        message->args_id = *id;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
@@ -1447,13 +1574,41 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
                                                                        message->raddr.host,
                                                                        message->raddr.port,
                                                                        KS_DHTRT_CREATE_PING,
-                                                                       &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
-       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
+                                                                       &node)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        202,
+                                        "Internal route table create node error");
+               goto done;
+       }
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        202,
+                                        "Internal route table release node error");
+               goto done;
+       }
+       
+       ks_hash_read_lock(dht->registry_query);
+       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED);
        ks_hash_read_unlock(dht->registry_query);
 
-       if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+       if (!callback) {
+               ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        204,
+                                        "Message query method is not registered");
+       }
        else ret = callback(dht, message);
 
  done:
@@ -1504,7 +1659,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
        transactionid = ntohl(*tid);
 
        ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid);
-       transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+       
+       ks_hash_read_lock(dht->transactions_hash);
+       transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED);
        ks_hash_read_unlock(dht->transactions_hash);
 
        if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
@@ -1582,12 +1739,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
                                   ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
                                   ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
                                   results_index);
-                       // @todo add lock on node
+
+                       if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]);
                        job->search->results[results_index] = node;
                        job->search->distances[results_index] = distance;
 
                        ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
                        ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
+                       
+                       ks_dhtrt_sharelock_node(node);
 
                        if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
                }
@@ -1668,10 +1828,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
 
                ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
                ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
+
+               ks_dhtrt_sharelock_node(n);
                
-               if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done;
+               if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) {
+                       ks_dhtrt_release_querynodes(&query);
+                       goto done;
+               }
        }
-       //ks_dhtrt_release_querynodes(&query);
+       ks_dhtrt_release_querynodes(&query);
        ks_mutex_unlock(s->mutex);
        locked_search = KS_FALSE;
 
@@ -1812,7 +1977,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        tid = (uint32_t *)message->transactionid;
        transactionid = ntohl(*tid);
 
-       transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+       ks_hash_read_lock(dht->transactions_hash);
+       transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED);
        ks_hash_read_unlock(dht->transactions_hash);
 
        if (!transaction) {
@@ -1834,7 +2000,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
 
        transaction->finished = KS_TRUE;
 
-       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
+       ks_hash_read_lock(dht->registry_error);
+       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED);
        ks_hash_read_unlock(dht->registry_error);
 
        if (callback) ret = callback(dht, message);
@@ -1854,52 +2021,6 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
        ks_mutex_unlock(dht->jobs_mutex);
 }
 
-KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
-{
-       ks_dht_job_t *first = NULL;
-       ks_dht_job_t *last = NULL;
-       
-       ks_assert(dht);
-
-       ks_mutex_lock(dht->jobs_mutex);
-       for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
-               ks_bool_t done = KS_FALSE;
-               jobn = job->next;
-
-               if (job->state == KS_DHT_JOB_STATE_QUERYING) {
-                       job->state = KS_DHT_JOB_STATE_RESPONDING;
-                       if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
-               }
-               if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
-                       job->attempts--;
-                       if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
-                       else done = KS_TRUE;
-               }
-               if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
-
-               if (done) {
-                       if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
-                       else if (!jobp) dht->jobs_first = jobn;
-                       else if (!jobn) {
-                               dht->jobs_last = jobp;
-                           dht->jobs_last->next = NULL;
-                       }
-                       else jobp->next = jobn;
-
-                       job->next = NULL;
-                       if (last) last = last->next = job;
-                       else first = last = job;
-               } else jobp = job;
-       }
-       ks_mutex_unlock(dht->jobs_mutex);
-
-       for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
-               jobn = job->next;
-               // this cannot occur inside of the main loop, may add new jobs invalidating list pointers
-               if (job->finish_callback) job->finish_callback(dht, job);
-               ks_dht_job_destroy(&job);
-       }
-}
 
 KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
 {
@@ -2064,11 +2185,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        ks_assert(message);
        ks_assert(message->args);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query findnode args missing required key 'target'");
+               goto done;
+       }
 
        want = ben_dict_get_by_str(message->args, "want");
        if (want) {
-               // @todo use ben_list_for_each
                size_t want_len = ben_list_len(want);
                for (size_t i = 0; i < want_len; ++i) {
                        struct bencode *iv = ben_list_get(want, i);
@@ -2087,7 +2216,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
 
        query.nodeid = *target;
        query.type = KS_DHT_REMOTE;
-       query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
+       query.max = 8; // @todo should be like KS_DHTRT_BUCKET_SIZE
        if (want4) {
                query.family = AF_INET;
                ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
@@ -2099,7 +2228,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                                                                                                           &qn->addr,
                                                                                                           buffer4,
                                                                                                           &buffer4_length,
-                                                                                                          sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
+                                                                                                          sizeof(buffer4))) != KS_STATUS_SUCCESS) {
+                               ks_dhtrt_release_querynodes(&query);
+                               ks_dht_error(dht,
+                                                        message->endpoint,
+                                                        &message->raddr,
+                                                        message->transactionid,
+                                                        message->transactionid_length,
+                                                        202,
+                                                        "Internal compact v4 nodeinfo error");
+                               goto done;
+                       }
 
                        ks_log(KS_LOG_DEBUG,
                                   "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
@@ -2117,7 +2256,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                                                                                                           &qn->addr,
                                                                                                           buffer6,
                                                                                                           &buffer6_length,
-                                                                                                          sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
+                                                                                                          sizeof(buffer6))) != KS_STATUS_SUCCESS) {
+                               ks_dhtrt_release_querynodes(&query);
+                               ks_dht_error(dht,
+                                                        message->endpoint,
+                                                        &message->raddr,
+                                                        message->transactionid,
+                                                        message->transactionid_length,
+                                                        202,
+                                                        "Internal compact v6 nodeinfo error");
+                               goto done;
+                       }
 
                        ks_log(KS_LOG_DEBUG,
                                   "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
@@ -2244,6 +2393,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
 {
        ks_dht_message_t *message = NULL;
        struct bencode *a = NULL;
+       ks_dht_storageitem_t *item = NULL;
 
        ks_assert(dht);
        ks_assert(job);
@@ -2256,7 +2406,11 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
                                                   &message,
                                                   &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
+       ks_hash_read_lock(dht->storageitems_hash);
+       item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+       ks_hash_read_unlock(dht->storageitems_hash);
+
+       if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
        ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
 
        //ks_log(KS_LOG_DEBUG, "Sending message query get\n");
@@ -2287,7 +2441,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_assert(message);
        ks_assert(message->args);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query get args missing required key 'target'");
+               goto done;
+       }
 
        seq = ben_dict_get_by_str(message->args, "seq");
        if (seq) sequence = ben_int_val(seq);
@@ -2317,7 +2480,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                                                                                                           &qn->addr,
                                                                                                           buffer4,
                                                                                                           &buffer4_length,
-                                                                                                          sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
+                                                                                                          sizeof(buffer4))) != KS_STATUS_SUCCESS) {
+                               ks_dhtrt_release_querynodes(&query);
+                               ks_dht_error(dht,
+                                                        message->endpoint,
+                                                        &message->raddr,
+                                                        message->transactionid,
+                                                        message->transactionid_length,
+                                                        202,
+                                                        "Internal compact v4 nodeinfo error");
+                               goto done;
+                       }
 
                        ks_log(KS_LOG_DEBUG,
                                   "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
@@ -2335,7 +2508,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                                                                                                           &qn->addr,
                                                                                                           buffer6,
                                                                                                           &buffer6_length,
-                                                                                                          sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
+                                                                                                          sizeof(buffer6))) != KS_STATUS_SUCCESS) {
+                               ks_dhtrt_release_querynodes(&query);
+                               ks_dht_error(dht,
+                                                        message->endpoint,
+                                                        &message->raddr,
+                                                        message->transactionid,
+                                                        message->transactionid_length,
+                                                        202,
+                                                        "Internal compact v6 nodeinfo error");
+                               goto done;
+                       }
 
                        ks_log(KS_LOG_DEBUG,
                                   "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
@@ -2523,22 +2706,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
                                                                                                                                           sequence,
                                                                                                                                           sig)) != KS_STATUS_SUCCESS) goto done;
                }
-               if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
-               item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED);
+               if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
        } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
 
        if (item) job->response_storageitem = item;
        else if (olditem) job->response_storageitem = olditem;
 
  done:
-       if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
        if (ret != KS_STATUS_SUCCESS) {
-               if (item) ks_dht_storageitem_destroy(&item);
        }
+       if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
        return ret;
 }
 
-// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers
 // @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
 KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
                                                                   const ks_sockaddr_t *raddr,
@@ -2620,12 +2800,49 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        ks_assert(message->args);
 
 
-       if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query put args missing required key 'token'");
+               goto done;
+       }
 
-       if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query put 'k' is malformed");
+               goto done;
+       }
+       if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query put 'sig' is malformed");
+               goto done;
+       }
 
        salt = ben_dict_get_by_str(message->args, "salt");
+       if (salt && ben_str_len(salt) > KS_DHT_STORAGEITEM_SALT_MAX_SIZE) {
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        207,
+                                        "Message query put 'salt' is too large");
+               goto done;
+       }
 
        seq = ben_dict_get_by_str(message->args, "seq");
        if (seq) sequence = ben_int_val(seq);
@@ -2635,6 +2852,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
 
        if (seq && (!k || !sig)) {
                ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n");
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query put for mutable data must include both 'k' and 'sig'");
                ret = KS_STATUS_ARG_INVALID;
                goto done;
        }
@@ -2642,6 +2866,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        v = ben_dict_get_by_str(message->args, "v");
        if (!v) {
                ks_log(KS_LOG_DEBUG, "Must provide v\n");
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query put args missing required key 'v'");
                ret = KS_STATUS_ARG_INVALID;
                goto done;
        }
@@ -2649,23 +2880,50 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
 
        if (!seq) {
                // immutable
-               if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done;
+               if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) {
+                       ks_dht_error(dht,
+                                                message->endpoint,
+                                                &message->raddr,
+                                                message->transactionid,
+                                                message->transactionid_length,
+                                                202,
+                                                "Internal storage item target immutable error");
+                       goto done;
+               }
        } else {
                // mutable
-               if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done;
+               if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) {
+                       ks_dht_error(dht,
+                                                message->endpoint,
+                                                &message->raddr,
+                                                message->transactionid,
+                                                message->transactionid_length,
+                                                202,
+                                                "Internal storage item target mutable error");
+                       goto done;
+               }
        }
+
+       ks_hash_write_lock(dht->storageitems_hash);
+       storageitems_locked = KS_TRUE;
+       
        olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
 
        if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
                ks_log(KS_LOG_DEBUG, "Invalid token\n");
+               ks_dht_error(dht,
+                                        message->endpoint,
+                                        &message->raddr,
+                                        message->transactionid,
+                                        message->transactionid_length,
+                                        203,
+                                        "Message query put token is invalid");
                ret = KS_STATUS_FAIL;
                goto done;
        }
 
        //ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
 
-       ks_hash_write_lock(dht->storageitems_hash);
-       storageitems_locked = KS_TRUE;
 
        if (!seq) {
                // immutable
@@ -2674,27 +2932,61 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
                                                                                                                                         dht->pool,
                                                                                                                                         &target,
                                                                                                                                         v,
-                                                                                                                                        KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+                                                                                                                                        KS_TRUE)) != KS_STATUS_SUCCESS) {
+                       ks_dht_error(dht,
+                                                message->endpoint,
+                                                &message->raddr,
+                                                message->transactionid,
+                                                message->transactionid_length,
+                                                202,
+                                                "Internal storage item create immutable error");
+                       goto done;
+               }
        } else {
                // mutable
                if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) {
                        ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
+                       ks_dht_error(dht,
+                                                message->endpoint,
+                                                &message->raddr,
+                                                message->transactionid,
+                                                message->transactionid_length,
+                                                206,
+                                                "Message query put signature is invalid");
                        ret = KS_STATUS_FAIL;
                        goto done;
                }
                
                if (olditem) {
                        if (cas && olditem->seq != cas_seq) {
-                               // @todo send 301 error instead of the response
+                               ks_dht_error(dht,
+                                                        message->endpoint,
+                                                        &message->raddr,
+                                                        message->transactionid,
+                                                        message->transactionid_length,
+                                                        301,
+                                                        "Message query put cas mismatch");
                                goto done;
                        }
                        if (olditem->seq > sequence) {
-                               // @todo send 302 error instead of the response
+                               ks_dht_error(dht,
+                                                        message->endpoint,
+                                                        &message->raddr,
+                                                        message->transactionid,
+                                                        message->transactionid_length,
+                                                        302,
+                                                        "Message query put sequence is less than current");
                                goto done;
                        }
                        if (olditem->seq == sequence) {
                                if (ben_cmp(olditem->v, v) != 0) {
-                                       // @todo send 201? error instead of the response
+                                       ks_dht_error(dht,
+                                                                message->endpoint,
+                                                                &message->raddr,
+                                                                message->transactionid,
+                                                                message->transactionid_length,
+                                                                201,
+                                                                "Message query put sequence is equal to current but values are different");
                                        goto done;
                                }
                        } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
@@ -2709,9 +3001,18 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
                                                                                                                                   salt,
                                                                                                                                   KS_TRUE,
                                                                                                                                   sequence,
-                                                                                                                                  sig)) != KS_STATUS_SUCCESS) goto done;
+                                                                                                                                  sig)) != KS_STATUS_SUCCESS) {
+                       ks_dht_error(dht,
+                                                message->endpoint,
+                                                &message->raddr,
+                                                message->transactionid,
+                                                message->transactionid_length,
+                                                202,
+                                                "Internal storage item create mutable error");
+                       goto done;
+               }
        }
-       if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
+       if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
 
        if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
@@ -2725,10 +3026,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        ks_q_push(dht->send_q, (void *)response);
 
  done:
-       if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
        if (ret != KS_STATUS_SUCCESS) {
-               if (item) ks_dht_storageitem_destroy(&item);
+               if (item) ks_hash_remove(dht->storageitems_hash, item->id.id);
        }
+       if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
        return ret;
 }
 
index 7e8598b53f5b7aba8fe5ea86185db46f75af88ce..7f869f67eac0688f185485f6a85224a59839f43b 100644 (file)
@@ -18,7 +18,6 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_DATAGRAM_BUFFER_SIZE 1000
 
 //#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
-#define KS_DHT_PULSE_EXPIRATIONS 1
 
 #define KS_DHT_NODEID_SIZE 20
 
@@ -30,6 +29,8 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
 
 #define KS_DHT_TRANSACTION_EXPIRATION 10
+#define KS_DHT_TRANSACTIONS_PULSE 1
+
 #define KS_DHT_SEARCH_EXPIRATION 10
 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
 
@@ -40,7 +41,8 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_STORAGEITEM_EXPIRATION 7200
 
 #define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
-#define KS_DHT_TOKENSECRET_EXPIRATION 300
+#define KS_DHT_TOKEN_EXPIRATION 300
+#define KS_DHT_TOKENS_PULSE 1
 
 #define  KS_DHTRT_MAXQUERYSIZE 20
 
@@ -257,12 +259,11 @@ struct ks_dht_s {
        ks_hash_t *registry_error;
 
        ks_dht_endpoint_t **endpoints;
+       int32_t endpoints_length;
        int32_t endpoints_size;
        ks_hash_t *endpoints_hash;
        struct pollfd *endpoints_poll;
 
-       ks_time_t pulse_expirations;
-
        ks_q_t *send_q;
        ks_dht_message_t *send_q_unsent;
        uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
@@ -272,7 +273,8 @@ struct ks_dht_s {
        ks_dht_job_t *jobs_first;
        ks_dht_job_t *jobs_last;
 
-       ks_mutex_t *tid_mutex;
+       ks_time_t transactions_pulse;
+       ks_mutex_t *transactionid_mutex;
        volatile uint32_t transactionid_next;
        ks_hash_t *transactions_hash;
 
@@ -283,6 +285,7 @@ struct ks_dht_s {
        ks_dht_search_t *searches_first;
        ks_dht_search_t *searches_last;
 
+       ks_time_t tokens_pulse;
        volatile uint32_t token_secret_current;
        volatile uint32_t token_secret_previous;
        ks_time_t token_secret_expiration;
@@ -323,9 +326,8 @@ KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t
  * @param dht pointer to the dht instance
  * @param value string of the type text under the 'y' key of a message
  * @param callback the callback to be called when a message matches
- * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
  */
-KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
+KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
 
 /**
  * Register a callback for a specific message query.
@@ -333,9 +335,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
  * @param dht pointer to the dht instance
  * @param value string of the type text under the 'q' key of a message
  * @param callback the callback to be called when a message matches
- * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
  */
-KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
+KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
 
 /**
  * Register a callback for a specific message error.
@@ -343,9 +344,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
  * @param dht pointer to the dht instance
  * @param value string of the errorcode under the first item of the 'e' key of a message
  * @param callback the callback to be called when a message matches
- * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
  */
-KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
+KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback);
 
 /**
  * Bind a local address and port for receiving UDP datagrams.
index 2417fd0c4b26174bc7a5a1b4dc8b014a1c313448..313856d09ad3dbae04e1641ea7361459f546eeb5 100644 (file)
@@ -23,11 +23,11 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
 
        s->callback = callback;
 
-       ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
+       ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
        ks_assert(s->searched);
        ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE);
 
-       ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
+       ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
        ks_assert(s->searching);
        ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE);