]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Most of search functionality is finished, needs testing when route table...
authorShane Bryldt <astaelan@gmail.com>
Fri, 16 Dec 2016 01:58:21 +0000 (01:58 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:36 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_datagram.c
libs/libks/src/dht/ks_dht_endpoint.c
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_search.c
libs/libks/src/dht/ks_dht_storageitem.c
libs/libks/src/dht/ks_dht_transaction.c

index 9128275ac3ed90ecb8b64965c9e99afadaf4f4c1..a236f2ff8a222c9d78a9ebe2354c67553e7a0d98 100644 (file)
@@ -15,16 +15,16 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created.
         */
-       if (pool_alloc && (ret = ks_pool_open(&pool)) != KS_STATUS_SUCCESS) goto done;
+       if (pool_alloc) {
+               ks_pool_open(&pool);
+               ks_assert(pool);
+       }
 
        /**
         * Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created.
         */
        *dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t));
-       if (!d) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
+       ks_assert(d);
 
        /**
         * Keep track of the pool used for future allocations and cleanup.
@@ -39,12 +39,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        d->tpool = tpool;
        if (!tpool) {
                d->tpool_alloc = KS_TRUE;
-               if ((ret = ks_thread_pool_create(&d->tpool,
-                                                                                KS_DHT_TPOOL_MIN,
-                                                                                KS_DHT_TPOOL_MAX,
-                                                                                KS_DHT_TPOOL_STACK,
-                                                                                KS_PRI_NORMAL,
-                                                                                KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) goto done;
+               ks_thread_pool_create(&d->tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+               ks_assert(d->tpool);
        }
 
        /**
@@ -56,10 +52,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the message type registry.
         */
-       if ((ret = ks_hash_create(&d->registry_type,
-                                                         KS_HASH_MODE_DEFAULT,
-                                                         KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_assert(d->registry_type);
 
        /**
         * Register the message type callbacks for query (q), response (r), and error (e)
@@ -71,10 +65,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the message query registry.
         */
-       if ((ret = ks_hash_create(&d->registry_query,
-                                                         KS_HASH_MODE_DEFAULT,
-                                                         KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_assert(d->registry_query);
 
        /**
         * Register the message query callbacks for ping, find_node, etc.
@@ -87,10 +79,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the message error registry.
         */
-       if ((ret = ks_hash_create(&d->registry_error,
-                                                         KS_HASH_MODE_DEFAULT,
-                                                         KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_assert(d->registry_error);
        // @todo register 301 error for internal get/put CAS hash mismatch retry handler
 
        /**
@@ -113,20 +103,19 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
         * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
         */
-       if ((ret = ks_hash_create(&d->endpoints_hash,
-                                                         KS_HASH_MODE_DEFAULT,
-                                                         KS_HASH_FLAG_RWLOCK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool);
+       ks_assert(d->endpoints_hash);
 
        /**
         * Default expirations to not be checked for one pulse.
         */
-       d->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS;
+       d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000);
 
        /**
         * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
         */
-       if ((ret = ks_q_create(&d->send_q, d->pool, 0)) != KS_STATUS_SUCCESS) goto done;
+       ks_q_create(&d->send_q, d->pool, 0);
+       ks_assert(d->send_q);
        
        /**
         * If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message.
@@ -141,7 +130,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Initialize the transaction id mutex, should use atomic increment instead
         */
-       if ((ret = ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+       ks_assert(d->tid_mutex);
 
        /**
         * Initialize the first transaction id randomly, this doesn't really matter.
@@ -152,10 +142,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * Create the hash to track pending transactions on queries that are pending responses.
         * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
         */
-       if ((ret = ks_hash_create(&d->transactions_hash,
-                                                         KS_HASH_MODE_INT,
-                                                         KS_HASH_FLAG_RWLOCK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool);
+       ks_assert(d->transactions_hash);
 
        /**
         * The internal route tables will be latent allocated when binding.
@@ -166,10 +154,9 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the hash to store searches.
         */
-       if ((ret = ks_hash_create(&d->search_hash,
-                                                         KS_HASH_MODE_ARBITRARY,
-                                                         KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_assert(d->search_hash);
+
        /**
         * The search hash uses arbitrary key size, which requires the key size be provided.
         */
@@ -179,21 +166,20 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
         */
        d->token_secret_current = d->token_secret_previous = rand();
-       d->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+       d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
 
        /**
         * Create the hash to store arbitrary data for BEP44.
         */
-       if ((ret = ks_hash_create(&d->storage_hash,
-                                                         KS_HASH_MODE_ARBITRARY,
-                                                         KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
-                                                         d->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_assert(d->storage_hash);
+
        /**
         * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
         */
        ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE);
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (d) ks_dht_destroy(&d);
                else if (pool_alloc && pool) ks_pool_close(&pool);
@@ -348,7 +334,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
         */
        pool = d->pool;
        pool_alloc = d->pool_alloc;
-       
+
        /**
         * Free the dht instance from the pool, after this the dht instance memory is invalid.
         */
@@ -442,7 +428,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+       return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
 }
 
 KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
@@ -451,7 +437,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+       return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
 }
 
 KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
@@ -460,7 +446,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+       return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
 }
 
 
@@ -482,7 +468,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        if (endpoint) *endpoint = NULL;
 
        ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
-       if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
+       ks_hash_read_unlock(dht->endpoints_hash);
        if (ep) {
                ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
                return KS_STATUS_FAIL;
@@ -514,7 +500,8 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        /**
         * Allocate the endpoint to track the local socket.
         */
-       if ((ret = ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock);
+       ks_assert(ep);
 
        /**
         * Resize the endpoints array to take another endpoint pointer.
@@ -523,15 +510,14 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
                                                                                                                   (void *)dht->endpoints,
                                                                                                                   sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
+       ks_assert(dht->endpoints);
        dht->endpoints[epindex] = ep;
 
        /**
         * Add the new endpoint into the endpoints hash for quick lookups.
+        * @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed
         */
-       if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) {
-               ret = KS_STATUS_FAIL;
-               goto done;
-       }
+       if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done;
 
        /**
         * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
@@ -539,6 +525,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
                                                                                                                  (void *)dht->endpoints_poll,
                                                                                                                  sizeof(struct pollfd) * dht->endpoints_size);
+       ks_assert(dht->endpoints_poll);
        dht->endpoints_poll[epindex].fd = ep->sock;
        dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
 
@@ -553,9 +540,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
                                                                                ep->addr.host,
                                                                                ep->addr.port,
                                                                                &ep->node)) != KS_STATUS_SUCCESS) goto done;
-               /**
-                * Do not release the ep->node, keep it alive until cleanup
-                */
        } else {
                if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
                if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
@@ -564,18 +548,16 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
                                                                                ep->addr.host,
                                                                                ep->addr.port,
                                                                                &ep->node)) != KS_STATUS_SUCCESS) goto done;
-               /**
-                * Do not release the ep->node, keep it alive until cleanup
-                */
        }
+       /**
+        * Do not release the ep->node, keep it alive until cleanup
+        */
 
        /**
         * If the endpoint output is being captured, assign it and return successfully.
         */
        if (endpoint) *endpoint = ep;
 
-       ret = KS_STATUS_SUCCESS;
-
  done:
        if (ret != KS_STATUS_SUCCESS) {
                /**
@@ -583,7 +565,10 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
                 * This will be done in ks_dht_endpoint_destroy only if the socket was assigned during a successful ks_dht_endpoint_create.
                 * Then return whatever failure condition resulted in landed here.
                 */
-               if (ep) ks_dht_endpoint_destroy(&ep);
+               if (ep) {
+                       ks_hash_remove(dht->endpoints_hash, ep->addr.host);
+                       ks_dht_endpoint_destroy(&ep);
+               }
                else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock);
 
                if (endpoint) *endpoint = NULL;
@@ -594,31 +579,32 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
 KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
 {
        ks_dht_datagram_t *datagram = NULL;
-       int32_t result;
        ks_sockaddr_t raddr;
 
        ks_assert(dht);
-       ks_assert (timeout > 0);
+       ks_assert(timeout > 0);
 
        if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
 
-       result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout);
-       if (result > 0) {
+       // @todo confirm how poll/wsapoll react to zero size and NULL array
+       if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) {
                for (int32_t i = 0; i < dht->endpoints_size; ++i) {
                        if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
-                       
+
                        raddr = (const ks_sockaddr_t){ 0 };
                        dht->recv_buffer_length = sizeof(dht->recv_buffer);
                        raddr.family = dht->endpoints[i]->addr.family;
                        if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue;
-                       
+
                        if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) {
                                ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port);
                                continue;
                        }
-                       
-                       if (ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr) == KS_STATUS_SUCCESS &&
-                               ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
+
+                       ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr);
+                       ks_assert(datagram);
+
+                       if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
                }
        }
 
@@ -633,13 +619,12 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
 KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
 {
        ks_hash_iterator_t *it = NULL;
-       ks_time_t now = ks_time_now_sec();
+       ks_time_t now = ks_time_now();
 
        ks_assert(dht);
 
-       if (dht->pulse_expirations <= now) {
-               dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS;
-       }
+       if (dht->pulse_expirations > now) return;
+       dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000);
 
        ks_hash_write_lock(dht->transactions_hash);
        for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
@@ -660,8 +645,47 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
        }
        ks_hash_write_unlock(dht->transactions_hash);
 
+       ks_hash_write_lock(dht->search_hash);
+       for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const void *search_key = NULL;
+               ks_dht_search_t *search_value = NULL;
+
+               ks_hash_this(it, &search_key, NULL, (void **)&search_value);
+
+               ks_hash_write_lock(search_value->pending);
+               for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
+                       const void *pending_key = NULL;
+                       ks_dht_search_pending_t *pending_value = NULL;
+                       ks_bool_t pending_remove = KS_FALSE;
+
+                       ks_hash_this(i, &pending_key, NULL, (void **)&pending_value);
+
+                       if (pending_value->finished) pending_remove = KS_TRUE;
+                       else if (pending_value->expiration <= now) {
+                               char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+                               char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+                               ks_log(KS_LOG_DEBUG,
+                                          "Search for %s pending find_node to %s has expired without response\n",
+                                          ks_dht_hexid(&search_value->target, id_buf),
+                                          ks_dht_hexid(&pending_value->nodeid, id2_buf));
+                               pending_remove = KS_TRUE;
+                       }
+                       if (pending_remove) {
+                               ks_hash_remove(search_value->pending, (void *)pending_key);
+                               ks_dht_search_pending_destroy(&pending_value);
+                       }
+               }
+               ks_hash_write_unlock(search_value->pending);
+               if (ks_hash_count(search_value->pending) == 0) {
+                       for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value);
+                       ks_hash_remove(dht->search_hash, (void *)search_key);
+                       ks_dht_search_destroy(&search_value);
+               }
+       }
+       ks_hash_write_unlock(dht->search_hash);
+
        if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
-               dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+               dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
                dht->token_secret_previous = dht->token_secret_current;
                dht->token_secret_current = rand();
        }
@@ -987,10 +1011,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
 
        *message = msg;
 
-       if (!ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) {
-               ret = KS_STATUS_FAIL;
-               goto done;
-       }
+       if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done;
 
        if (transaction) *transaction = trans;
 
@@ -1176,11 +1197,13 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
                                                                          ks_dht_search_callback_t callback,
                                                                          ks_dht_search_t **search)
 {
+       ks_bool_t locked_search = KS_FALSE;
+       ks_bool_t locked_pending = KS_FALSE;
        ks_dht_search_t *s = NULL;
-       ks_status_t ret = KS_STATUS_SUCCESS;
        ks_bool_t inserted = KS_FALSE;
        ks_bool_t allocated = KS_FALSE;
     ks_dhtrt_querynodes_t query;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(family == AF_INET || family == AF_INET6);
@@ -1188,9 +1211,12 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
 
        if (search) *search = NULL;
 
+       // @todo start write lock on search_hash and hold until after inserting
        // check hash for target to see if search already exists
-       s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED);
-       ks_hash_read_unlock(dht->search_hash); // @todo hold lock until finished adding new entry?
+       ks_hash_write_lock(dht->search_hash);
+       locked_search = KS_TRUE;
+
+       s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED);
 
        // if search does not exist, create new search and store in hash by target
        if (!s) {
@@ -1204,6 +1230,17 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
        // if the search is old then bail out and return successfully
        if (!allocated) goto done;
 
+       if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
+       inserted = KS_TRUE;
+
+       // lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up
+       ks_hash_write_lock(s->pending);
+       locked_pending = KS_TRUE;
+
+       // release search hash lock now, but pending is still locked
+       ks_hash_write_unlock(dht->search_hash);
+       locked_search = KS_FALSE;
+
        // find closest good nodes to target locally and store as the closest results
     query.nodeid = *target;
        query.type = KS_DHT_REMOTE;
@@ -1219,27 +1256,27 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
                ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target);
                // add to pending with expiration
                if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
-               if (!ks_hash_insert(s->pending, n->nodeid.id, pending)) {
+               if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
                        ks_dht_search_pending_destroy(&pending);
-                       ret = KS_STATUS_FAIL;
                        goto done;
                }
                if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
+               // increment here in case we end up bailing out; execute with what it has or destroy the search?
+               s->results_length++;
        }
-       s->results_length = query.count;
        // @todo release query nodes
-
-       // @todo if entry has been added since we checked above this may fail, try adding callback instead of failing? or retain lock from earlier
-       if (!ks_hash_insert(dht->search_hash, s->target.id, s)) {
-               ret = KS_STATUS_FAIL;
-               goto done;
-       }
-       inserted = KS_TRUE;
+       ks_hash_write_unlock(s->pending);
+       locked_pending = KS_FALSE;
 
        if (search) *search = s;
 
  done:
-       if (ret != KS_STATUS_SUCCESS && !inserted && s) ks_dht_search_destroy(&s);
+       if (locked_search) ks_hash_write_unlock(dht->search_hash);
+       if (locked_pending) ks_hash_write_unlock(s->pending);
+       if (ret != KS_STATUS_SUCCESS) {
+               if (!inserted && s) ks_dht_search_destroy(&s);
+               *search = NULL;
+       }
        return ret;
 }
 
@@ -1254,18 +1291,18 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
 {
        ks_dht_message_t *error = NULL;
        struct bencode *e = NULL;
-       ks_status_t ret = KS_STATUS_FAIL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
        ks_assert(transactionid);
        ks_assert(errorstr);
 
-       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
 
        ben_list_append(e, ben_int(errorcode));
        ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
@@ -1273,8 +1310,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
        ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
        ks_q_push(dht->send_q, (void *)error);
 
-       ret = KS_STATUS_SUCCESS;
-
  done:
        if (ret != KS_STATUS_SUCCESS && error) ks_dht_message_destroy(&error);
        return ret;
@@ -1292,7 +1327,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        ks_dht_transaction_t *transaction;
        uint32_t *tid;
        uint32_t transactionid;
-       ks_status_t ret = KS_STATUS_FAIL;
+       ks_dht_message_callback_t callback;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
@@ -1308,7 +1344,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        es_len = ben_str_len(es);
        if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) {
                ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len);
-               return KS_STATUS_FAIL;
+               ret = KS_STATUS_FAIL;
+               goto done;
        }
        errorcode = ben_int_val(ec);
        et = ben_str_val(es);
@@ -1327,27 +1364,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
 
        if (!transaction) {
                ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
-       } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
+
+       if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
                ks_log(KS_LOG_DEBUG,
                           "Message error rejected due to spoofing from %s %d, expected %s %d\n",
                           message->raddr.host,
                           message->raddr.port,
                           transaction->raddr.host,
                           transaction->raddr.port);
-       } else {
-               ks_dht_message_callback_t callback;
-               transaction->finished = KS_TRUE;
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
 
-               callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
-               ks_hash_read_unlock(dht->registry_error);
+       transaction->finished = KS_TRUE;
 
-               if (callback) ret = callback(dht, message);
-               else {
-                       ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
-                       ret = KS_STATUS_SUCCESS;
-               }
-       }
+       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
+       ks_hash_read_unlock(dht->registry_error);
+
+       if (callback) ret = callback(dht, message);
+       else ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
 
+ done:
        return ret;
 }
 
@@ -1356,25 +1396,27 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k
 {
        ks_dht_message_t *message = NULL;
        struct bencode *a = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
 
-       if (ks_dht_setup_query(dht,
-                                                  ep,
-                                                  raddr,
-                                                  "ping",
-                                                  ks_dht_process_response_ping,
-                                                  NULL,
-                                                  &message,
-                                                  &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_setup_query(dht,
+                                                                 ep,
+                                                                 raddr,
+                                                                 "ping",
+                                                                 ks_dht_process_response_ping,
+                                                                 NULL,
+                                                                 &message,
+                                                                 &a)) != KS_STATUS_SUCCESS) goto done;
 
        ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
        ks_q_push(dht->send_q, (void *)message);
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
@@ -1385,37 +1427,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
 
-       if (ks_dht_setup_response(dht,
-                                                         message->endpoint,
-                                                         &message->raddr,
-                                                         message->transactionid,
-                                                         message->transactionid_length,
-                                                         &response,
-                                                         &r) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if ((ret = ks_dht_setup_response(dht,
+                                                                        message->endpoint,
+                                                                        &message->raddr,
+                                                                        message->transactionid,
+                                                                        message->transactionid_length,
+                                                                        &response,
+                                                                        &r)) != KS_STATUS_SUCCESS) goto done;
 
        ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
        ks_q_push(dht->send_q, (void *)response);
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
@@ -1424,24 +1466,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
        
        ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 
@@ -1450,19 +1494,20 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
        ks_dht_transaction_t *transaction = NULL;
        ks_dht_message_t *message = NULL;
        struct bencode *a = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
        ks_assert(targetid);
 
-       if (ks_dht_setup_query(dht,
-                                                  ep,
-                                                  raddr,
-                                                  "find_node",
-                                                  ks_dht_process_response_findnode,
-                                                  &transaction,
-                                                  &message,
-                                                  &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_setup_query(dht,
+                                                                 ep,
+                                                                 raddr,
+                                                                 "find_node",
+                                                                 ks_dht_process_response_findnode,
+                                                                 &transaction,
+                                                                 &message,
+                                                                 &a)) != KS_STATUS_SUCCESS) goto done;
 
        memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE);
 
@@ -1473,7 +1518,8 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
        ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
        ks_q_push(dht->send_q, (void *)message);
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
@@ -1493,14 +1539,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        ks_dht_node_t *node = NULL;
        ks_dhtrt_querynodes_t query;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
 
        want = ben_dict_get_by_str(message->args, "want");
        if (want) {
@@ -1521,8 +1568,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
 
@@ -1537,11 +1584,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                for (int32_t i = 0; i < query.count; ++i) {
                        ks_dht_node_t *qn = query.nodes[i];
 
-                       if (ks_dht_utility_compact_nodeinfo(&qn->nodeid,
-                                                                                               &qn->addr,
-                                                                                               buffer4,
-                                                                                               &buffer4_length,
-                                                                                               sizeof(buffer4)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+                       if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+                                                                                                          &qn->addr,
+                                                                                                          buffer4,
+                                                                                                          &buffer4_length,
+                                                                                                          sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
 
                        ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
                }
@@ -1553,23 +1600,23 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                for (int32_t i = 0; i < query.count; ++i) {
                        ks_dht_node_t *qn = query.nodes[i];
 
-                       if (ks_dht_utility_compact_nodeinfo(&qn->nodeid,
-                                                                                               &qn->addr,
-                                                                                               buffer6,
-                                                                                               &buffer6_length,
-                                                                                               sizeof(buffer6)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+                       if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+                                                                                                          &qn->addr,
+                                                                                                          buffer6,
+                                                                                                          &buffer6_length,
+                                                                                                          sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
 
                        ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
                }
        }
 
-       if (ks_dht_setup_response(dht,
-                                                         message->endpoint,
-                                                         &message->raddr,
-                                                         message->transactionid,
-                                                         message->transactionid_length,
-                                                         &response,
-                                                         &r) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_setup_response(dht,
+                                                                        message->endpoint,
+                                                                        &message->raddr,
+                                                                        message->transactionid,
+                                                                        message->transactionid_length,
+                                                                        &response,
+                                                                        &r)) != KS_STATUS_SUCCESS) goto done;
 
        ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
        if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
@@ -1578,7 +1625,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
        ks_q_push(dht->send_q, (void *)response);
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
@@ -1595,12 +1643,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_dht_search_t *search = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->transaction);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
        n = ben_dict_get_by_str(message->args, "nodes");
        if (n) {
@@ -1616,13 +1665,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
 
-       search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_READLOCKED);
+       ks_hash_read_lock(dht->search_hash);
+       search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED);
        ks_hash_read_unlock(dht->search_hash);
        if (search) {
                ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED);
@@ -1635,7 +1685,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                ks_sockaddr_t addr;
 
                addr.family = AF_INET;
-               if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+               if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
 
                ks_log(KS_LOG_DEBUG,
                           "Expanded ipv4 nodeinfo for %s (%s %d)\n",
@@ -1681,12 +1731,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                                search->results[results_index] = nid;
                                search->distances[results_index] = distance;
 
-                               if (ks_dht_search_pending_create(&pending, search->pool, &nid) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-                               if (!ks_hash_insert(search->pending, nid.id, pending)) {
+                               if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
+                               if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
                                        ks_dht_search_pending_destroy(&pending);
-                                       return KS_STATUS_FAIL;
+                                       goto done;
                                }
-                               if (ks_dht_send_findnode(dht, NULL, &addr, &search->target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+                               if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
                        }
                }
        }
@@ -1696,7 +1746,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                ks_sockaddr_t addr;
 
                addr.family = AF_INET6;
-               if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+               if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
 
                ks_log(KS_LOG_DEBUG,
                           "Expanded ipv6 nodeinfo for %s (%s %d)\n",
@@ -1712,7 +1762,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
 
        ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 
@@ -1758,14 +1809,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
 
        seq = ben_dict_get_by_str(message->args, "seq");
        if (seq) sequence = ben_int_val(seq);
@@ -1773,8 +1825,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
 
@@ -1790,15 +1842,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
 
        // @todo compact ipv4 and ipv6 nodes into separate buffers
 
-       if (ks_dht_setup_response(dht,
-                                                         message->endpoint,
-                                                         &message->raddr,
-                                                         message->transactionid,
-                                                         message->transactionid_length,
-                                                         &response,
-                                                         &r) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if ((ret = ks_dht_setup_response(dht,
+                                                                        message->endpoint,
+                                                                        &message->raddr,
+                                                                        message->transactionid,
+                                                                        message->transactionid_length,
+                                                                        &response,
+                                                                        &r)) != KS_STATUS_SUCCESS) goto done;
 
        ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
        ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE));
@@ -1817,7 +1867,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_log(KS_LOG_DEBUG, "Sending message response get\n");
        ks_q_push(dht->send_q, (void *)response);
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
@@ -1827,14 +1878,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
 
        // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
 
        // @todo add extract function for mutable ks_dht_storageitem_key_t
        // @todo add extract function for mutable ks_dht_storageitem_signature_t
@@ -1842,16 +1894,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
        // @todo add/touch bucket entries for other nodes/nodes6 returned
 
        ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 
@@ -1865,37 +1918,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
 
-       if (ks_dht_setup_response(dht,
-                                                         message->endpoint,
-                                                         &message->raddr,
-                                                         message->transactionid,
-                                                         message->transactionid_length,
-                                                         &response,
-                                                         &r) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if ((ret = ks_dht_setup_response(dht,
+                                                                        message->endpoint,
+                                                                        &message->raddr,
+                                                                        message->transactionid,
+                                                                        message->transactionid_length,
+                                                                        &response,
+                                                                        &r)) != KS_STATUS_SUCCESS) goto done;
 
        //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message response put\n");
        ks_q_push(dht->send_q, (void *)response);
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
@@ -1904,24 +1957,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
 
        routetable = message->endpoint->node->table;
 
        ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-       if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
-       return KS_STATUS_SUCCESS;
+ done:
+       return ret;
 }
 
 /* For Emacs:
index 0b4007b5c1ba13fd2f84c8814899448a1a71bc4c..59311de80112344f5367ee9cd8d003ddb6bdc08e 100644 (file)
@@ -27,7 +27,7 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
 
-#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
+#define KS_DHT_TRANSACTION_EXPIRATION 30
 #define KS_DHT_SEARCH_EXPIRATION 10
 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
 
index 8b6140f2b2b49287b58a20a425b2a0a7a413e8e5..83927ddb9bc4ad3f766eddacf03a3b95d2b32e4d 100644 (file)
@@ -17,14 +17,11 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
        ks_assert(endpoint);
        ks_assert(raddr);
        ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
-       
+
        *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t));
-       if (!dg) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
-       dg->pool = pool;
+       ks_assert(dg);
 
+       dg->pool = pool;
        dg->dht = dht;
        dg->endpoint = endpoint;
        dg->raddr = *raddr;
@@ -32,7 +29,7 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
        memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length);
        dg->buffer_length = dht->recv_buffer_length;
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (dg) ks_dht_datagram_destroy(&dg);
                *datagram = NULL;
index 61184bfcbf7d5643b17013c8009bb90a0d910152..ad9a44ec5eb80a63909298e33ec445bdc4b06425 100644 (file)
@@ -18,19 +18,17 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
        ks_assert(pool);
        ks_assert(addr);
        ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
-       
+
        *endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t));
-       if (!ep) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
+       ks_assert(ep);
+
        ep->pool = pool;
     if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE);
        else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
        ep->addr = *addr;
        ep->sock = sock;
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (ep) ks_dht_endpoint_destroy(&ep);
                *endpoint = NULL;
@@ -50,9 +48,6 @@ KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint)
 
        ep = *endpoint;
 
-       if (ep->node) {
-               // @todo release the node?
-       }
        if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock);
        ks_pool_free(ep->pool, ep);
 
index 63ea519fc0efe2519d407ed9bd9f05cf997dbe83..1b2284decbc26641b5389934835ee06d44bb3867 100644 (file)
@@ -1,9 +1,6 @@
 #include "ks_dht.h"
 #include "ks_dht-int.h"
 
-/**
- *
- */
 KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
                                                                                          ks_pool_t *pool,
                                                                                          ks_dht_endpoint_t *endpoint,
@@ -17,17 +14,17 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
        ks_assert(pool);
 
        *message = m = ks_pool_alloc(pool, sizeof(ks_dht_message_t));
-       if (!m) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
-       m->pool = pool;
+       ks_assert(m);
 
+       m->pool = pool;
        m->endpoint = endpoint;
        m->raddr = *raddr;
-       if (alloc_data) m->data = ben_dict();
+       if (alloc_data) {
+               m->data = ben_dict();
+               ks_assert(m->data);
+       }
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (m) ks_dht_message_destroy(&m);
                *message = NULL;
@@ -35,9 +32,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
        return ret;
 }
 
-/**
- *
- */
 KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message)
 {
        ks_dht_message_t *m;
@@ -57,9 +51,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message)
 }
 
 
-/**
- *
- */
 KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length)
 {
        struct bencode *t;
@@ -121,9 +112,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
        return KS_STATUS_SUCCESS;
 }
 
-/**
- *
- */
 KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
                                                                                         uint32_t transactionid,
                                                                                         const char *query,
@@ -143,6 +131,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
 
        // @note r joins message->data and will be freed with it
        a = ben_dict();
+       ks_assert(a);
        ben_dict_set(message->data, ben_blob("a", 1), a);
 
        if (args) *args = a;
@@ -150,9 +139,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
        return KS_STATUS_SUCCESS;
 }
 
-/**
- *
- */
 KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
                                                                                                uint8_t *transactionid,
                                                                                                ks_size_t transactionid_length,
@@ -168,6 +154,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
 
        // @note r joins message->data and will be freed with it
        r = ben_dict();
+       ks_assert(r);
        ben_dict_set(message->data, ben_blob("r", 1), r);
 
        if (args) *args = r;
@@ -193,6 +180,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
 
        // @note r joins message->data and will be freed with it
        e = ben_list();
+       ks_assert(e);
        ben_dict_set(message->data, ben_blob("e", 1), e);
 
        if (args) *args = e;
index ce314e6155f62cbfebce5a78b2050806ed0691b5..1974a3d0cf53ce74bafdf460b94f4dc18c3f74a7 100644 (file)
@@ -2,9 +2,6 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
-/**
- *
- */
 KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target)
 {
        ks_dht_search_t *s;
@@ -15,32 +12,27 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
        ks_assert(target);
 
        *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t));
-       if (!s) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
+       ks_assert(s);
+
        s->pool = pool;
 
-       if ((ret = ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool);
+       ks_assert(s->mutex);
+
        memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE);
 
-       if ((ret = ks_hash_create(&s->pending,
-                                                         KS_HASH_MODE_ARBITRARY,
-                                                         KS_HASH_FLAG_RWLOCK,
-                                                         s->pool)) != KS_STATUS_SUCCESS) goto done;
+       ks_hash_create(&s->pending, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK, s->pool);
+       ks_assert(s->pending);
        ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE);
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (s) ks_dht_search_destroy(&s);
                *search = NULL;
        }
-       return KS_STATUS_SUCCESS;
+       return ret;
 }
 
-/**
- *
- */
 KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
 {
        ks_dht_search_t *s;
@@ -83,7 +75,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d
                search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
                                                                                                                                           (void *)search->callbacks,
                                                                                                                                           sizeof(ks_dht_search_callback_t) * search->callbacks_size);
-               if (!search->callbacks) return KS_STATUS_NO_MEM;
+               ks_assert(search->callbacks);
                search->callbacks[index] = callback;
                ks_mutex_unlock(search->mutex);
        }
@@ -97,19 +89,16 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p
 
        ks_assert(pending);
        ks_assert(pool);
-       
+
        *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t));
-       if (!p) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
-       p->pool = pool;
+       ks_assert(p);
 
+       p->pool = pool;
        p->nodeid = *nodeid;
-       p->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION;
+       p->expiration = ks_time_now() + (KS_DHT_SEARCH_EXPIRATION * 1000);
        p->finished = KS_FALSE;
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (p) ks_dht_search_pending_destroy(&p);
                *pending = NULL;
index 58c3018a6a2e2bf3fa7b499f4a03004838e305b4..d395e49540df439c53f911fd9b909c6fc1ef9552 100644 (file)
@@ -2,9 +2,6 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
-/**
- *
- */
 KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v)
 {
        ks_dht_storageitem_t *si;
@@ -19,27 +16,21 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
        ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
 
        *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
-       if (!si) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
-       si->pool = pool;
+       ks_assert(si);
 
+       si->pool = pool;
        si->mutable = KS_FALSE;
-       
        si->v = ben_clone(v);
-       if (!si->v) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
+       ks_assert(si->v);
        
        enc = ben_encode(&enc_len, si->v);
+       ks_assert(enc);
        SHA1_Init(&sha);
        SHA1_Update(&sha, enc, enc_len);
        SHA1_Final(si->id.id, &sha);
        free(enc);
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (si) ks_dht_storageitem_destroy(&si);
                *item = NULL;
@@ -70,15 +61,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
        ks_assert(signature);
 
        *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
-       if (!si) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
-       si->pool = pool;
-
-       si->v = ben_clone(v);
+       ks_assert(si);
 
+       si->pool = pool;
        si->mutable = KS_TRUE;
+       si->v = ben_clone(v);
+       ks_assert(si->v);
 
        memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE);
        if (salt && salt_length > 0) {
@@ -93,7 +81,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
        if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length);
        SHA1_Final(si->id.id, &sha);
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (si) ks_dht_storageitem_destroy(&si);
                *item = NULL;
index 18978bf68997084106ff4b7cd726b29ae8c7a977..0912fa7589cfea21f7535dab0b0745d36b2ded73 100644 (file)
@@ -15,18 +15,15 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac
        ks_assert(raddr);
 
        *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t));
-       if (!t) {
-               ret = KS_STATUS_NO_MEM;
-               goto done;
-       }
-       t->pool = pool;
+       ks_assert(t);
 
+       t->pool = pool;
        t->raddr = *raddr;
        t->transactionid = transactionid;
        t->callback = callback;
-       t->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY;
+       t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000);
 
- done:
      // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (t) ks_dht_transaction_destroy(&t);
                *transaction = NULL;