]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Incorporated route table to test find_node before adding deep searching...
authorShane Bryldt <astaelan@gmail.com>
Mon, 12 Dec 2016 01:02:43 +0000 (01:02 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:35 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_endpoint.c
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_storageitem.c
libs/libks/test/testdht2.c

index 9c5b0ea691f48991e5932ebe4a666a1589e148cd..57a1fff88204edf9d47ca21caba4a1ba5829616b 100644 (file)
@@ -8,22 +8,30 @@ KS_BEGIN_EXTERN_C
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address,
-                                                                                                          uint8_t *buffer,
+KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
+                                                                                                                  uint8_t *buffer,
+                                                                                                                  ks_size_t *buffer_length,
+                                                                                                                  ks_size_t buffer_size);
+KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
+                                                                                                                 ks_size_t *buffer_length,
+                                                                                                                 ks_size_t buffer_size,
+                                                                                                                 ks_sockaddr_t *address);
+KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
+                                                                                                               const ks_sockaddr_t *address,
+                                                                                                               uint8_t *buffer,
+                                                                                                               ks_size_t *buffer_length,
+                                                                                                               ks_size_t buffer_size);
+KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
                                                                                                           ks_size_t *buffer_length,
-                                                                                                          ks_size_t buffer_size);
-KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid,
-                                                                                                       ks_sockaddr_t *address,
-                                                                                                       uint8_t *buffer,
-                                                                                                       ks_size_t *buffer_length,
-                                                                                                       ks_size_t buffer_size);
+                                                                                                          ks_size_t buffer_size,
+                                                                                                          ks_dht_nodeid_t *nodeid,
+                                                                                                          ks_sockaddr_t *address);
 
 /**
  *
  */
-KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht);
-KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht);
-KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht);
+KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht);
+KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
 
 KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
@@ -44,14 +52,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
 KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message);
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
-
 KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message);
+
+KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message);
+
+KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message);
 
+KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message);
+
 /**
  *
  */
index b4b25a0edd03258a87bbf7b1da4916af540844c0..f1372c65caf09b1b3a2f584e7206f8f9254de273 100644 (file)
@@ -73,7 +73,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
 
        dht->autoroute = KS_FALSE;
        dht->autoroute_port = 0;
-       
+
        ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
        ks_dht_register_type(dht, "q", ks_dht_process_query);
        ks_dht_register_type(dht, "r", ks_dht_process_response);
@@ -87,15 +87,17 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
 
        ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
        // @todo register 301 error for internal get/put CAS hash mismatch retry handler
-       
+
     dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
-       
+
        dht->endpoints = NULL;
        dht->endpoints_size = 0;
        ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool);
        dht->endpoints_poll = NULL;
 
+       dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS;
+
        ks_q_create(&dht->send_q, dht->pool, 0);
        dht->send_q_unsent = NULL;
        dht->recv_buffer_length = 0;
@@ -111,7 +113,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
 
        ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
        ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE);
-       
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -120,30 +122,32 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
  */
 KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
 {
+       ks_hash_iterator_t *it;
        ks_assert(dht);
 
-       // @todo free storage_hash entries
        if (dht->storage_hash) {
+               for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       const void *key;
+                       ks_dht_storageitem_t *val;
+                       ks_hash_this(it, &key, NULL, (void **)&val);
+                       ks_dht_storageitem_deinit(val);
+                       ks_dht_storageitem_free(&val);
+               }
                ks_hash_destroy(&dht->storage_hash);
-               dht->storage_hash = NULL;
        }
+
        dht->token_secret_current = 0;
        dht->token_secret_previous = 0;
        dht->token_secret_expiration = 0;
-       if (dht->rt_ipv4) {
-               ks_dhtrt_deinitroute(&dht->rt_ipv4);
-               dht->rt_ipv4 = NULL;
-       }
-       if (dht->rt_ipv6) {
-               ks_dhtrt_deinitroute(&dht->rt_ipv6);
-               dht->rt_ipv6 = NULL;
-       }
+
+       if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4);
+       if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6);
+
        dht->transactionid_next = 0;
-       if (dht->transactions_hash) {
-               ks_hash_destroy(&dht->transactions_hash);
-               dht->transactions_hash = NULL;
-       }
+       if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash);
+
        dht->recv_buffer_length = 0;
+
        if (dht->send_q) {
                ks_dht_message_t *msg;
                while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) {
@@ -151,12 +155,14 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
                        ks_dht_message_free(&msg);
                }
                ks_q_destroy(&dht->send_q);
-               dht->send_q = NULL;
        }
        if (dht->send_q_unsent) {
                ks_dht_message_deinit(dht->send_q_unsent);
                ks_dht_message_free(&dht->send_q_unsent);
        }
+
+       dht->pulse_expirations = 0;
+
        for (int32_t i = 0; i < dht->endpoints_size; ++i) {
                ks_dht_endpoint_t *ep = dht->endpoints[i];
                ks_dht_endpoint_deinit(ep);
@@ -167,33 +173,23 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
                ks_pool_free(dht->pool, dht->endpoints);
                dht->endpoints = NULL;
        }
+
        if (dht->endpoints_poll) {
                ks_pool_free(dht->pool, dht->endpoints_poll);
                dht->endpoints_poll = NULL;
        }
-       if (dht->endpoints_hash) {
-               ks_hash_destroy(&dht->endpoints_hash);
-               dht->endpoints_hash = NULL;
-       }
+       if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash);
+
        dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
 
-       if (dht->registry_type) {
-               ks_hash_destroy(&dht->registry_type);
-               dht->registry_type = NULL;
-       }
-       if (dht->registry_query) {
-               ks_hash_destroy(&dht->registry_query);
-               dht->registry_query = NULL;
-       }
-       if (dht->registry_error) {
-               ks_hash_destroy(&dht->registry_error);
-               dht->registry_error = NULL;
-       }
+       if (dht->registry_type) ks_hash_destroy(&dht->registry_type);
+       if (dht->registry_query) ks_hash_destroy(&dht->registry_query);
+       if (dht->registry_error) ks_hash_destroy(&dht->registry_error);
 
        dht->autoroute = KS_FALSE;
        dht->autoroute_port = 0;
-       
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -204,15 +200,12 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_
 {
        ks_assert(dht);
 
-       if (!autoroute) {
-               port = 0;
-       } else if (port <= 0) {
-               port = KS_DHT_DEFAULT_PORT;
-       }
-       
+       if (!autoroute) port = 0;
+    else if (port <= 0) port = KS_DHT_DEFAULT_PORT;
+
        dht->autoroute = autoroute;
        dht->autoroute_port = port;
-       
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -230,23 +223,23 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *rad
        ks_assert(endpoint);
 
        *endpoint = NULL;
-       
+
     ks_ip_route(ip, sizeof(ip), raddr->host);
 
-       // @todo readlock hash
-       if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+       ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
+       ks_hash_read_unlock(dht->endpoints_hash);
+
+       if (!ep && dht->autoroute) {
                ks_sockaddr_t addr;
                ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
-               if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) {
-                       return KS_STATUS_FAIL;
-               }
+               if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
        }
 
        if (!ep) {
                ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
                return KS_STATUS_FAIL;
        }
-       
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -258,7 +251,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
-       // @todo writelock registry
+
        return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
@@ -270,7 +263,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
-       // @todo writelock registry
+
        return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
@@ -282,7 +275,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
-       // @todo writelock registry
+
        return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
@@ -294,34 +287,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        ks_dht_endpoint_t *ep;
        ks_socket_t sock;
        int32_t epindex;
-       
+
        ks_assert(dht);
        ks_assert(addr);
        ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
        ks_assert(addr->port);
 
-       if (endpoint) {
-               *endpoint = NULL;
-       }
+       if (endpoint) *endpoint = NULL;
 
        dht->bind_ipv4 |= addr->family == AF_INET;
        dht->bind_ipv6 |= addr->family == AF_INET6;
 
-       if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
-               return KS_STATUS_FAIL;
-       }
+       if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
 
        // @todo shouldn't ks_addr_bind take a const addr *?
        if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) {
                ks_socket_close(&sock);
                return KS_STATUS_FAIL;
        }
-       
+
        if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) {
                ks_socket_close(&sock);
                return KS_STATUS_FAIL;
        }
-       
+
        if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) {
                ks_dht_endpoint_free(&ep);
                ks_socket_close(&sock);
@@ -331,35 +320,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE);
        ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE);
 
-       
+
        epindex = dht->endpoints_size++;
        dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
                                                                                                                   (void *)dht->endpoints,
                                                                                                                   sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
        dht->endpoints[epindex] = ep;
        ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
-       
+
        dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
                                                                                                                  (void *)dht->endpoints_poll,
                                                                                                                  sizeof(struct pollfd) * dht->endpoints_size);
        dht->endpoints_poll[epindex].fd = ep->sock;
        dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
 
-       // @todo initialize or add local nodeid to appropriate route table
        if (ep->addr.family == AF_INET) {
-               if (!dht->rt_ipv4) {
-                       ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
-               }
+               if (!dht->rt_ipv4) ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
+               ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
        } else {
-               if (!dht->rt_ipv6) {
-                       ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
-               }
-       }
-       
-       if (endpoint) {
-               *endpoint = ep;
+               if (!dht->rt_ipv6) ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
+               ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
        }
 
+       if (endpoint) *endpoint = ep;
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -369,7 +353,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
 KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
 {
        int32_t result;
-               
+
        ks_assert(dht);
        ks_assert (timeout >= 0);
 
@@ -378,14 +362,14 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
        if (timeout == 0) {
                // @todo deal with default timeout, should return quickly but not hog the CPU polling
        }
-       
+
        result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout);
        if (result > 0) {
                for (int32_t i = 0; i < dht->endpoints_size; ++i) {
                        if (dht->endpoints_poll[i].revents & POLLIN) {
                                ks_sockaddr_t raddr = KS_SA_INIT;
                                dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE;
-                       
+
                                raddr.family = dht->endpoints[i]->addr.family;
                                if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
                                        // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool
@@ -395,69 +379,180 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
                }
        }
 
-       ks_dht_idle(dht);
+       ks_dht_pulse_expirations(dht);
+
+       ks_dht_pulse_send(dht);
+
+       if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
+       if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
 }
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address,
-                                                                                                          uint8_t *buffer,
-                                                                                                          ks_size_t *buffer_length,
-                                                                                                          ks_size_t buffer_size)
+KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
+{
+       ks_hash_iterator_t *it = NULL;
+       ks_time_t now = ks_time_now_sec();
+
+       ks_assert(dht);
+
+       if (dht->pulse_expirations <= now) {
+               dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS;
+       }
+
+       ks_hash_write_lock(dht->transactions_hash);
+       for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const void *key = NULL;
+               ks_dht_transaction_t *value = NULL;
+               ks_bool_t remove = KS_FALSE;
+
+               ks_hash_this(it, &key, NULL, (void **)&value);
+               if (value->finished) remove = KS_TRUE;
+               else if (value->expiration <= now) {
+                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+                       remove = KS_TRUE;
+               }
+               if (remove) {
+                       ks_hash_remove(dht->transactions_hash, (char *)key);
+                       ks_pool_free(value->pool, value);
+               }
+       }
+       ks_hash_write_unlock(dht->transactions_hash);
+
+       if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
+               dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+               dht->token_secret_previous = dht->token_secret_current;
+               dht->token_secret_current = rand();
+       }
+}
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
+{
+       ks_dht_message_t *message;
+       ks_bool_t bail = KS_FALSE;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+
+       while (!bail) {
+               message = NULL;
+               if (dht->send_q_unsent) {
+                       message = dht->send_q_unsent;
+                       dht->send_q_unsent = NULL;
+               }
+               if (!message) bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
+               if (!bail) {
+                       bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS;
+                       if (ret == KS_STATUS_BREAK) dht->send_q_unsent = message;
+                       else if (ret == KS_STATUS_SUCCESS) {
+                               ks_dht_message_deinit(message);
+                               ks_dht_message_free(&message);
+                       }
+               }
+       }
+}
+
+/**
+ *
+ */
+static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
+{
+       char *t = buffer;
+
+       ks_assert(id);
+       ks_assert(buffer);
+
+       memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1);
+
+       for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]);
+
+       return buffer;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
+                                                                                                                  uint8_t *buffer,
+                                                                                                                  ks_size_t *buffer_length,
+                                                                                                                  ks_size_t buffer_size)
 {
-       ks_size_t required = sizeof(uint16_t);
+       ks_size_t addr_len;
+       const void *paddr = NULL;
        uint16_t port = 0;
-       
+
        ks_assert(address);
        ks_assert(buffer);
        ks_assert(buffer_length);
        ks_assert(buffer_size);
        ks_assert(address->family == AF_INET || address->family == AF_INET6);
 
-       if (address->family == AF_INET) {
-               required += sizeof(uint32_t);
-       } else {
-               required += 8 * sizeof(uint16_t);
-       }
-
-       if (*buffer_length + required > buffer_size) {
+       addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
+       
+       if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
                ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
                return KS_STATUS_FAIL;
        }
 
        if (address->family == AF_INET) {
-               uint32_t *paddr = (uint32_t *)&address->v.v4.sin_addr;
-               uint32_t addr = htonl(*paddr);
-               port = htons(address->v.v4.sin_port);
-
-               memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint32_t));
-               *buffer_length += sizeof(uint32_t);
+               paddr = &address->v.v4.sin_addr; // already network byte order
+               port = address->v.v4.sin_port; // already network byte order
        } else {
-               uint16_t *paddr = (uint16_t *)&address->v.v6.sin6_addr;
-               port = htons(address->v.v6.sin6_port);
-
-               for (int32_t i = 0; i < 8; ++i) {
-                       uint16_t addr = htons(paddr[i]);
-                       memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint16_t));
-                       *buffer_length += sizeof(uint16_t);
-               }
+               paddr = &address->v.v6.sin6_addr; // already network byte order
+               port = address->v.v6.sin6_port; // already network byte order
        }
+       memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t));
+       *buffer_length += addr_len;
 
-       memcpy(buffer + (*buffer_length), (void *)&port, sizeof(uint16_t));
+       memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t));
        *buffer_length += sizeof(uint16_t);
-       
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
+                                                                                                                 ks_size_t *buffer_length,
+                                                                                                                 ks_size_t buffer_size,
+                                                                                                                 ks_sockaddr_t *address)
+{
+       ks_size_t addr_len;
+       const void *paddr = NULL;
+       uint16_t port = 0;
+
+       ks_assert(buffer);
+       ks_assert(buffer_length);
+       ks_assert(address);
+       ks_assert(address->family == AF_INET ||address->family == AF_INET6);
+
+       addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
+       if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_FAIL;
+
+       paddr = buffer + *buffer_length;
+       *buffer_length += addr_len;
+       port = *((uint16_t *)(buffer + *buffer_length));
+       *buffer_length += sizeof(uint16_t);
+
+       // @todo ks_addr_set_raw second parameter should be const?
+       ks_addr_set_raw(address, (void *)paddr, port, address->family);
+
        return KS_STATUS_SUCCESS;
 }
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid,
-                                                                                                       ks_sockaddr_t *address,
-                                                                                                       uint8_t *buffer,
-                                                                                                       ks_size_t *buffer_length,
-                                                                                                       ks_size_t buffer_size)
+KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
+                                                                                                               const ks_sockaddr_t *address,
+                                                                                                               uint8_t *buffer,
+                                                                                                               ks_size_t *buffer_length,
+                                                                                                               ks_size_t buffer_size)
 {
        ks_assert(address);
        ks_assert(buffer);
@@ -473,7 +568,30 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid,
        memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE);
        *buffer_length += KS_DHT_NODEID_SIZE;
 
-       return ks_dht_utility_compact_address(address, buffer, buffer_length, buffer_size);
+       return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
+                                                                                                          ks_size_t *buffer_length,
+                                                                                                          ks_size_t buffer_size,
+                                                                                                          ks_dht_nodeid_t *nodeid,
+                                                                                                          ks_sockaddr_t *address)
+{
+       ks_assert(buffer);
+       ks_assert(buffer_length);
+       ks_assert(nodeid);
+       ks_assert(address);
+       ks_assert(address->family == AF_INET ||address->family == AF_INET6);
+
+       if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_FAIL;
+
+       memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
+       *buffer_length += KS_DHT_NODEID_SIZE;
+
+       return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address);
 }
 
 /**
@@ -490,13 +608,13 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons
        ks_assert(nodeid);
 
        *nodeid = NULL;
-       
+
     id = ben_dict_get_by_str(args, key);
     if (!id) {
                ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
                return KS_STATUS_FAIL;
        }
-       
+
     idv = ben_str_val(id);
        idv_len = ben_str_len(id);
     if (idv_len != KS_DHT_NODEID_SIZE) {
@@ -523,7 +641,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
        ks_assert(token);
 
        *token = NULL;
-       
+
     tok = ben_dict_get_by_str(args, key);
     if (!tok) {
                ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
@@ -558,7 +676,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra
 
        secret = htonl(secret);
        port = htons(raddr->port);
-       
+
        SHA1_Init(&sha);
        SHA1_Update(&sha, &secret, sizeof(uint32_t));
        SHA1_Update(&sha, raddr->host, strlen(raddr->host));
@@ -578,9 +696,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k
 
        ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok);
 
-       if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) {
-               return KS_TRUE;
-       }
+       if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) return KS_TRUE;
 
        ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok);
 
@@ -590,212 +706,65 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k
 /**
  *
  */
-KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht)
+KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       ks_assert(dht);
-
-       ks_dht_idle_expirations(dht);
-
-       ks_dht_idle_send(dht);
-}
+       // @todo calculate max IPV6 payload size?
+       char buf[1000];
+       ks_size_t buf_len;
 
-/**
- *
- */
-KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht)
-{
-       ks_hash_iterator_t *it = NULL;
-       ks_time_t now = ks_time_now_sec();
-       
        ks_assert(dht);
+       ks_assert(message);
+       ks_assert(message->endpoint);
+       ks_assert(message->data);
 
-       // @todo add delay between checking expirations, every 10 seconds?
+       // @todo blacklist check
 
-       ks_hash_write_lock(dht->transactions_hash);
-       for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-               const void *key = NULL;
-               ks_dht_transaction_t *value = NULL;
-               ks_bool_t remove = KS_FALSE;
+       buf_len = ben_encode2(buf, sizeof(buf), message->data);
 
-               ks_hash_this(it, &key, NULL, (void **)&value);
-               if (value->finished) {
-                       remove = KS_TRUE;
-               } else if (value->expiration <= now) {
-                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
-                       remove = KS_TRUE;
-               }
-               if (remove) {
-                       ks_hash_remove(dht->transactions_hash, (char *)key);
-                       ks_pool_free(value->pool, value);
-               }
-       }
-       ks_hash_write_unlock(dht->transactions_hash);
+       ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
+       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
 
-       if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
-               dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
-               dht->token_secret_previous = dht->token_secret_current;
-               dht->token_secret_current = rand();
-       }
+       return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
 }
 
 /**
  *
  */
-KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht)
+KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
+                                                                                  ks_dht_endpoint_t *ep,
+                                                                                  ks_sockaddr_t *raddr,
+                                                                                  const char *query,
+                                                                                  ks_dht_message_callback_t callback,
+                                                                                  ks_dht_message_t **message,
+                                                                                  struct bencode **args)
 {
-       ks_dht_message_t *message;
-       ks_bool_t bail = KS_FALSE;
-       ks_status_t ret = KS_STATUS_SUCCESS;
+       uint32_t transactionid;
+       ks_dht_transaction_t *trans = NULL;
+       ks_dht_message_t *msg = NULL;
+       ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
-
-       while (!bail) {
-               message = NULL;
-               if (dht->send_q_unsent) {
-                       message = dht->send_q_unsent;
-                       dht->send_q_unsent = NULL;
-               }
-               if (!message) {
-                       bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
-               }
-               if (!bail) {
-                       bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS;
-                       if (ret == KS_STATUS_BREAK) {
-                               dht->send_q_unsent = message;
-                       } else if (ret == KS_STATUS_SUCCESS) {
-                               ks_dht_message_deinit(message);
-                               ks_dht_message_free(&message);
-                       }
-               }
-       }
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
-{
-       // @todo calculate max IPV6 payload size?
-       char buf[1000];
-       ks_size_t buf_len;
-       
-       ks_assert(dht);
-       ks_assert(message);
-       ks_assert(message->endpoint);
-       ks_assert(message->data);
-
-       // @todo blacklist check
-
-       buf_len = ben_encode2(buf, sizeof(buf), message->data);
-
-       ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
-       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
-
-       return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
-                                                                                 ks_dht_endpoint_t *ep,
-                                                                                 ks_sockaddr_t *raddr,
-                                                                                 uint8_t *transactionid,
-                                                                                 ks_size_t transactionid_length,
-                                                                                 long long errorcode,
-                                                                                 const char *errorstr)
-{
-       ks_dht_message_t *error = NULL;
-       struct bencode *e = NULL;
-       ks_status_t ret = KS_STATUS_FAIL;
-
-       ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(transactionid);
-       ks_assert(errorstr);
-
-       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-
-       if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-
-       if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
-
-       if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
-
-       ben_list_append(e, ben_int(errorcode));
-       ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
-
-       ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
-       ks_q_push(dht->send_q, (void *)error);
-
-       ret = KS_STATUS_SUCCESS;
-
- done:
-       if (ret != KS_STATUS_SUCCESS && error) {
-               ks_dht_message_deinit(error);
-               ks_dht_message_free(&error);
-       }
-       return ret;
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
-                                                                                  ks_dht_endpoint_t *ep,
-                                                                                  ks_sockaddr_t *raddr,
-                                                                                  const char *query,
-                                                                                  ks_dht_message_callback_t callback,
-                                                                                  ks_dht_message_t **message,
-                                                                                  struct bencode **args)
-{
-       uint32_t transactionid;
-       ks_dht_transaction_t *trans = NULL;
-       ks_dht_message_t *msg = NULL;
-       ks_status_t ret = KS_STATUS_FAIL;
-
-       ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(query);
-       ks_assert(callback);
-       ks_assert(message);
+       ks_assert(raddr);
+       ks_assert(query);
+       ks_assert(callback);
+       ks_assert(message);
 
        *message = NULL;
 
-       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
     // @todo atomic increment or mutex
        transactionid = dht->transactionid_next++;
 
-       if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
-           goto done;
-       }
+       if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) goto done;
 
        *message = msg;
 
@@ -836,25 +805,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
        ks_assert(raddr);
        ks_assert(transactionid);
        ks_assert(message);
-       
+
        *message = NULL;
 
-       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
+
+       if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) goto done;
 
-       if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
-       
        *message = msg;
 
        ret = KS_STATUS_SUCCESS;
@@ -868,80 +829,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
        return ret;
 }
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
-{
-       ks_dht_message_t *message = NULL;
-       struct bencode *a = NULL;
-
-       ks_assert(dht);
-       ks_assert(raddr);
-
-       if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-
-       ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
-       ks_q_push(dht->send_q, (void *)message);
-
-       return KS_STATUS_SUCCESS;
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
-{
-       ks_dht_message_t *message = NULL;
-       struct bencode *a = NULL;
-       
-       ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(targetid);
-
-       if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-       
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-       ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
-
-       ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
-       ks_q_push(dht->send_q, (void *)message);
-
-       return KS_STATUS_SUCCESS;
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
-{
-       ks_dht_message_t *message = NULL;
-       struct bencode *a = NULL;
-       
-       ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(targetid);
-
-       if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-       // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
-       ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
-
-       ks_log(KS_LOG_DEBUG, "Sending message query get\n");
-       ks_q_push(dht->send_q, (void *)message);
-
-       return KS_STATUS_SUCCESS;
-}
-
 /**
  *
  */
@@ -961,29 +848,22 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_
        }
 
        // @todo blacklist check for bad actor nodes
-       
-       if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
 
-       if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
-               goto done;
-       }
+       if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       // @todo readlocking registry for calling from threadpool
-       if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
-               ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
-       } else {
-               ret = callback(dht, &message);
-       }
+       if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) goto done;
+
+       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_READLOCKED);
+       ks_hash_read_unlock(dht->registry_type);
+
+       if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
+       else ret = callback(dht, &message);
 
  done:
        ks_dht_message_deinit(&message);
-       
+
        return ret;
 }
 
@@ -1009,7 +889,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
                ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
                return KS_STATUS_FAIL;
        }
-       
+
     qv = ben_str_val(q);
        qv_len = ben_str_len(q);
     if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
@@ -1030,12 +910,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
 
        message->args = a;
 
-       // @todo readlocking registry for calling from threadpool
-       if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
-               ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
-       } else {
-               ret = callback(dht, message);
-       }
+       callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
+       ks_hash_read_unlock(dht->registry_query);
+
+       if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+       else ret = callback(dht, message);
 
        return ret;
 }
@@ -1069,10 +948,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
 
        transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
        ks_hash_read_unlock(dht->transactions_hash);
-       
-       if (!transaction) {
-               ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
-       } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+
+       if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
+       else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
                ks_log(KS_LOG_DEBUG,
                           "Message response rejected due to spoofing from %s %d, expected %s %d\n",
                           message->raddr.host,
@@ -1087,6 +965,93 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
        return ret;
 }
 
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback)
+{
+       ks_assert(dht);
+       ks_assert(id);
+
+       // @todo check hash for id to see if search already exists
+
+       // @todo if search does not exist, create new search and store in hash by id
+
+       // @todo queue callback into search, if multiple tasks are searching the same id they can all be notified of results
+
+       // @todo if search existed already and is already running then bail out and let it run
+
+       // @todo find closest nodes to id locally, store as closest results, and queue in search pending a find_node call for closer nodes
+
+       // @todo pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
+
+
+       // @todo upon receiving response to find_node, check for an existing search by the id
+
+       // @todo keep track of the closest K(8) nodes found to the id
+
+       // @todo if there is closer node(s) in response, update furthest search result(s) and queue find_node calls for closer nodes
+
+       // @todo if search queue is empty, call callbacks
+
+       // @todo otherwise pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
+
+
+       // @todo during pulse iterate searches and check for last popped timeout where find_node received no reply
+
+       // @todo pop a pending find_node call, or call callbacks if empty
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
+                                                                                 ks_dht_endpoint_t *ep,
+                                                                                 ks_sockaddr_t *raddr,
+                                                                                 uint8_t *transactionid,
+                                                                                 ks_size_t transactionid_length,
+                                                                                 long long errorcode,
+                                                                                 const char *errorstr)
+{
+       ks_dht_message_t *error = NULL;
+       struct bencode *e = NULL;
+       ks_status_t ret = KS_STATUS_FAIL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(transactionid);
+       ks_assert(errorstr);
+
+       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
+
+       if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done;
+
+       ben_list_append(e, ben_int(errorcode));
+       ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
+
+       ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
+       ks_q_push(dht->send_q, (void *)error);
+
+       ret = KS_STATUS_SUCCESS;
+
+ done:
+       if (ret != KS_STATUS_SUCCESS && error) {
+               ks_dht_message_deinit(error);
+               ks_dht_message_free(&error);
+       }
+       return ret;
+}
+
 /**
  *
  */
@@ -1122,7 +1087,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        }
        errorcode = ben_int_val(ec);
        et = ben_str_val(es);
-       
+
        memcpy(error, et, es_len);
        error[es_len] = '\0';
        // @todo end of ks_dht_message_parse_error
@@ -1134,7 +1099,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
 
        transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
        ks_hash_read_unlock(dht->transactions_hash);
-       
+
        if (!transaction) {
                ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
        } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
@@ -1148,10 +1113,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
                ks_dht_message_callback_t callback;
                transaction->finished = KS_TRUE;
 
-               // @todo readlock on registry
-               if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
-                       ret = callback(dht, message);
-               } else {
+               callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
+               ks_hash_read_unlock(dht->registry_error);
+
+               if (callback) ret = callback(dht, message);
+               else {
                        ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
                        ret = KS_STATUS_SUCCESS;
                }
@@ -1160,6 +1126,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        return ret;
 }
 
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
+{
+       ks_dht_message_t *message = NULL;
+       struct bencode *a = NULL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+
+       if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+
+       ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+       ks_q_push(dht->send_q, (void *)message);
+
+       return KS_STATUS_SUCCESS;
+}
+
 /**
  *
  */
@@ -1168,16 +1156,21 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        ks_dht_nodeid_t *id;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       // @todo add/touch bucket entry for remote node
+       routetable = message->endpoint->node->table;
+
+       // @todo touch here, or only create if not exists?
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
 
        ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
 
@@ -1199,6 +1192,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
+{
+       ks_dht_nodeid_t *id;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
+
+       ks_assert(dht);
+       ks_assert(message);
+
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       routetable = message->endpoint->node->table;
+
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
+
+       ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+{
+       ks_dht_message_t *message = NULL;
+       struct bencode *a = NULL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(targetid);
+
+       if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+
+       ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
+       ks_q_push(dht->send_q, (void *)message);
+
+       return KS_STATUS_SUCCESS;
+}
+
 /**
  *
  */
@@ -1215,30 +1257,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        uint8_t buffer6[1000];
        ks_size_t buffer4_length = 0;
        ks_size_t buffer6_length = 0;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
+       ks_dhtrt_querynodes_t query;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        want = ben_dict_get_by_str(message->args, "want");
        if (want) {
                size_t want_len = ben_list_len(want);
                for (size_t i = 0; i < want_len; ++i) {
                        struct bencode *iv = ben_list_get(want, i);
-                       if (!ben_cmp_with_str(iv, "n4")) {
-                               want4 = KS_TRUE;
-                       }
-                       if (!ben_cmp_with_str(iv, "n6")) {
-                               want6 = KS_TRUE;
-                       }
+                       if (!ben_cmp_with_str(iv, "n4")) want4 = KS_TRUE;
+                       if (!ben_cmp_with_str(iv, "n6")) want6 = KS_TRUE;
                }
        }
 
@@ -1247,27 +1285,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                want6 = message->raddr.family == AF_INET6;
        }
 
-       // @todo add/touch bucket entry for remote node
-       
+       routetable = message->endpoint->node->table;
+
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
+
        ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
 
 
+       query.nodeid = *target;
+       query.type = ks_dht_remote_t;
+       query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
        if (want4) {
-               // @todo get closest nodes to target from ipv4 route table
-               // @todo compact nodes into buffer4
+               query.family = AF_INET;
+               ks_dhtrt_findclosest_nodes(routetable, &query);
+
+               for (int32_t i = 0; i < query.count; ++i) {
+                       if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
+                                                                                               &query.nodes[i]->addr,
+                                                                                               buffer4,
+                                                                                               &buffer4_length,
+                                                                                               sizeof(buffer4)) != KS_STATUS_SUCCESS) {
+                               return KS_STATUS_FAIL;
+                       }
+                       ks_log(KS_LOG_DEBUG,
+                                  "Compacted ipv4 nodeinfo for %s (%s %d)\n",
+                                  ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
+                                  query.nodes[i]->addr.host,
+                                  query.nodes[i]->addr.port);
+               }
        }
        if (want6) {
-               // @todo get closest nodes to target from ipv6 route table
-               // @todo compact nodes into buffer6
-       }
-
-       // @todo remove this, testing only
-       if (ks_dht_utility_compact_node(id,
-                                                                       &message->raddr,
-                                                                       message->raddr.family == AF_INET ? buffer4 : buffer6,
-                                                                       message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length,
-                                                                       message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
+               query.family = AF_INET6;
+               ks_dhtrt_findclosest_nodes(routetable, &query);
+
+               for (int32_t i = 0; i < query.count; ++i) {
+                       if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
+                                                                                               &query.nodes[i]->addr,
+                                                                                               buffer6,
+                                                                                               &buffer6_length,
+                                                                                               sizeof(buffer6)) != KS_STATUS_SUCCESS) {
+                               return KS_STATUS_FAIL;
+                       }
+                       ks_log(KS_LOG_DEBUG,
+                                  "Compacted ipv6 nodeinfo for %s (%s %d)\n",
+                                  ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
+                                  query.nodes[i]->addr.host,
+                                  query.nodes[i]->addr.port);
+               }
        }
 
        if (ks_dht_setup_response(dht,
@@ -1281,12 +1347,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        }
 
        ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-       if (want4) {
-               ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
-       }
-       if (want6) {
-               ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
-       }
+       if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
+       if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
 
        ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
        ks_q_push(dht->send_q, (void *)response);
@@ -1294,6 +1356,112 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
+{
+       ks_dht_nodeid_t *id;
+       struct bencode *n;
+       const uint8_t *nodes = NULL;
+       const uint8_t *nodes6 = NULL;
+       size_t nodes_size = 0;
+       size_t nodes6_size = 0;
+       size_t nodes_len = 0;
+       size_t nodes6_len = 0;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+
+       ks_assert(dht);
+       ks_assert(message);
+
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       n = ben_dict_get_by_str(message->args, "nodes");
+       if (n) {
+               nodes = (const uint8_t *)ben_str_val(n);
+               nodes_size = ben_str_len(n);
+       }
+       n = ben_dict_get_by_str(message->args, "nodes6");
+       if (n) {
+               nodes6 = (const uint8_t *)ben_str_val(n);
+               nodes6_size = ben_str_len(n);
+       }
+
+       routetable = message->endpoint->node->table;
+
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
+
+       while (nodes_len < nodes_size) {
+               ks_dht_nodeid_t nid;
+               ks_sockaddr_t addr;
+
+               addr.family = AF_INET;
+               if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+               ks_log(KS_LOG_DEBUG,
+                          "Expanded ipv4 nodeinfo for %s (%s %d)\n",
+                          ks_dht_hexid(&nid, id_buf),
+                          addr.host,
+                          addr.port);
+
+               if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) {
+                       ks_dhtrt_create_node(dht->rt_ipv4, nid, ks_dht_remote_t, addr.host, addr.port, &node);
+               }
+       }
+
+       while (nodes6_len < nodes6_size) {
+               ks_dht_nodeid_t nid;
+               ks_sockaddr_t addr;
+
+               addr.family = AF_INET6;
+               if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+               ks_log(KS_LOG_DEBUG,
+                          "Expanded ipv6 nodeinfo for %s (%s %d)\n",
+                          ks_dht_hexid(&nid, id_buf),
+                          addr.host,
+                          addr.port);
+
+               if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) {
+                       ks_dhtrt_create_node(dht->rt_ipv6, nid, ks_dht_remote_t, addr.host, addr.port, &node);
+               }
+       }
+       // @todo repeat above for ipv6 table
+
+       ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+{
+       ks_dht_message_t *message = NULL;
+       struct bencode *a = NULL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(targetid);
+
+       if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+       // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
+       ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+
+       ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+       ks_q_push(dht->send_q, (void *)message);
+
+       return KS_STATUS_SUCCESS;
+}
+
 /**
  *
  */
@@ -1308,30 +1476,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_dht_storageitem_t *item = NULL;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-       
        seq = ben_dict_get_by_str(message->args, "seq");
-       if (seq) {
-               sequence = ben_int_val(seq);
-       }
+       if (seq) sequence = ben_int_val(seq);
 
-       // @todo add/touch bucket entry for remote node
+       routetable = message->endpoint->node->table;
+
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
 
        ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
 
        ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
-       
+
        item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED);
        ks_hash_read_unlock(dht->storage_hash);
 
@@ -1341,7 +1509,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        // @todo find closest ipv4 and ipv6 nodes to target
 
        // @todo compact ipv4 and ipv6 nodes into separate buffers
-       
+
        if (ks_dht_setup_response(dht,
                                                          message->endpoint,
                                                          &message->raddr,
@@ -1362,9 +1530,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                        }
                        ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
                }
-               if (!sequence_snuffed) {
-                       ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
-               }
+               if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
        }
        // @todo nodes, nodes6
 
@@ -1374,19 +1540,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
+{
+       ks_dht_nodeid_t *id;
+       ks_dht_token_t *token;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
+
+       ks_assert(dht);
+       ks_assert(message);
+
+       // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       // @todo add extract function for mutable ks_dht_storageitem_key_t
+       // @todo add extract function for mutable ks_dht_storageitem_signature_t
+
+       routetable = message->endpoint->node->table;
+
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
+       // @todo add/touch bucket entries for other nodes/nodes6 returned
+
+       ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+
+       return KS_STATUS_SUCCESS;
+}
+
+
 /**
  *
  */
 KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
 {
+       ks_dht_nodeid_t *id;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       // @todo add/touch bucket entry for remote node
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       routetable = message->endpoint->node->table;
+
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+       }
 
        ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
 
@@ -1408,63 +1617,27 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        return KS_STATUS_SUCCESS;
 }
 
-
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       ks_assert(dht);
-       ks_assert(message);
-
-       // @todo add/touch bucket entry for remote node
-       
-       ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
-
-       return KS_STATUS_SUCCESS;
-}
+       ks_dht_nodeid_t *id;
+       ks_dhtrt_routetable_t *routetable = NULL;
+       ks_dht_node_t *node = NULL;
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
-{
        ks_assert(dht);
        ks_assert(message);
 
-       // @todo add/touch bucket entry for remote node and other nodes returned
-
-       ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
-
-       return KS_STATUS_SUCCESS;
-}
+       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
-{
-       ks_dht_nodeid_t *id;
-       ks_dht_token_t *token;
-       
-       ks_assert(dht);
-       ks_assert(message);
+       routetable = message->endpoint->node->table;
 
-       // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
-       if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+               ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
        }
-       
-       if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) {
-               return KS_STATUS_FAIL;
-       }
-
-       // @todo add extract function for mutable ks_dht_storageitem_key_t
-       // @todo add extract function for mutable ks_dht_storageitem_signature_t
-       
-       // @todo add/touch bucket entry for remote node and other nodes returned
 
-       ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+       ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
        return KS_STATUS_SUCCESS;
 }
index b2df07ed3a71c5453cf94be6a72bc6bc7d74d303..bb4580a47145b70427e68a1dbfdb0efbf03c4420 100644 (file)
@@ -10,6 +10,7 @@ KS_BEGIN_EXTERN_C
 
 #define KS_DHT_DEFAULT_PORT 5309
 #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
+#define KS_DHT_PULSE_EXPIRATIONS 10
 
 #define KS_DHT_NODEID_SIZE 20
 
@@ -19,6 +20,7 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
 
 #define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
+#define KS_DHT_SEARCH_EXPIRATION 10
 
 #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
 #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
@@ -106,6 +108,7 @@ struct ks_dht_endpoint_s {
        ks_dht_nodeid_t nodeid;
        ks_sockaddr_t addr;
        ks_socket_t sock;
+       ks_dht_node_t *node;
 };
 
 struct ks_dht_transaction_s {
@@ -151,6 +154,8 @@ struct ks_dht_s {
        ks_hash_t *endpoints_hash;
        struct pollfd *endpoints_poll;
 
+       ks_time_t pulse_expirations;
+
        ks_q_t *send_q;
        ks_dht_message_t *send_q_unsent;
        uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE];
index e31a29a6a0b256a5b8acb9e6cd2c2b24f6aaaea6..cf07c8ccb6350ca3ee014e65ee1321a98f2c6415 100644 (file)
@@ -62,11 +62,8 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, const
        ks_assert(addr);
        ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
        
-    if (!nodeid) {
-               randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE);
-       } else {
-               memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
-       }
+    if (!nodeid) randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE);
+       else memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
 
        endpoint->addr = *addr;
        endpoint->sock = sock;
@@ -81,10 +78,9 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint)
 {
        ks_assert(endpoint);
 
-       if (endpoint->sock != KS_SOCK_INVALID) {
-               ks_socket_close(&endpoint->sock);
-               endpoint->sock = KS_SOCK_INVALID;
-       }
+       endpoint->node = NULL;
+       if (endpoint->sock != KS_SOCK_INVALID) ks_socket_close(&endpoint->sock);
+       endpoint->addr = (const ks_sockaddr_t){ 0 };
 
        return KS_STATUS_SUCCESS;
 }
index 122983ca7d2887d66c11113f5bd2f6329f266463..8c71a9f6e020fedc36f573177f5a4ac8b0e7b12d 100644 (file)
@@ -59,13 +59,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_en
 
        message->endpoint = ep;
        message->raddr = *raddr;
-       message->data = NULL;
-       message->args = NULL;
-       message->transactionid_length = 0;
-       message->type[0] = '\0';
-       if (alloc_data) {
-               message->data = ben_dict();
-       }
+       if (alloc_data) message->data = ben_dict();
 
        return KS_STATUS_SUCCESS;
 }
@@ -173,7 +167,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
        ks_assert(query);
 
        tid = htonl(transactionid);
-       
+
     ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t)));
        ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1));
        ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
@@ -182,9 +176,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
        a = ben_dict();
        ben_dict_set(message->data, ben_blob("a", 1), a);
 
-       if (args) {
-               *args = a;
-       }
+       if (args) *args = a;
 
        return KS_STATUS_SUCCESS;
 }
@@ -198,20 +190,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
                                                                                                struct bencode **args)
 {
        struct bencode *r;
-       
+
        ks_assert(message);
        ks_assert(transactionid);
-       
+
     ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
        ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1));
-       
+
        // @note r joins message->data and will be freed with it
        r = ben_dict();
        ben_dict_set(message->data, ben_blob("r", 1), r);
 
-       if (args) {
-               *args = r;
-       }
+       if (args) *args = r;
 
        return KS_STATUS_SUCCESS;
 }
@@ -225,20 +215,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
                                                                                         struct bencode **args)
 {
        struct bencode *e;
-       
+
        ks_assert(message);
        ks_assert(transactionid);
-       
+
     ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
        ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1));
-       
+
        // @note r joins message->data and will be freed with it
        e = ben_list();
        ben_dict_set(message->data, ben_blob("e", 1), e);
 
-       if (args) {
-               *args = e;
-       }
+       if (args) *args = e;
 
        return KS_STATUS_SUCCESS;
 }
index e7855284630583cf10f391081137e6e399ef2129..d76247bcf74e9ec02fa10bc67dadde43a4284cd1 100644 (file)
@@ -185,9 +185,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item,
 
        SHA1_Init(&sha);
        SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
-       if (item->salt && item->salt_length > 0) {
-               SHA1_Update(&sha, item->salt, item->salt_length);
-       }
+       if (item->salt && item->salt_length > 0) SHA1_Update(&sha, item->salt, item->salt_length);
        SHA1_Final(item->id.id, &sha);
 
        return KS_STATUS_SUCCESS;
index 618f034488d125028fa67448166b4ad4eaba08f7..eb7ec3a6076aed518651bb1a1db6ff82bcaf4db0 100644 (file)
@@ -15,18 +15,22 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
 }
 
 int main() {
-  ks_size_t buflen;
+       //ks_size_t buflen;
   ks_status_t err;
   int mask = 0;
   ks_dht_t *dht1 = NULL;
   ks_dht_t dht2;
+  ks_dht_t *dht3 = NULL;
   ks_dht_endpoint_t *ep1;
   ks_dht_endpoint_t *ep2;
+  ks_dht_endpoint_t *ep3;
   ks_bool_t have_v4, have_v6;
   char v4[48] = {0}, v6[48] = {0};
   ks_sockaddr_t addr;
-  ks_sockaddr_t raddr;
-  
+  ks_sockaddr_t raddr1;
+  //ks_sockaddr_t raddr2;
+  //ks_sockaddr_t raddr3;
+
   err = ks_init();
   ok(!err);
 
@@ -61,6 +65,13 @@ int main() {
   err = ks_dht_init(&dht2);
   ok(err == KS_STATUS_SUCCESS);
 
+  err = ks_dht_alloc(&dht3, NULL);
+  ok(err == KS_STATUS_SUCCESS);
+  
+  err = ks_dht_init(dht3);
+  ok(err == KS_STATUS_SUCCESS);
+
+  
   ks_dht_register_type(dht1, "z", dht_z_callback);
   
   if (have_v4) {
@@ -70,13 +81,23 @@ int main() {
     err = ks_dht_bind(dht1, NULL, &addr, &ep1);
     ok(err == KS_STATUS_SUCCESS);
 
+       raddr1 = addr;
+       
        err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET);
        ok(err == KS_STATUS_SUCCESS);
        
        err = ks_dht_bind(&dht2, NULL, &addr, &ep2);
        ok(err == KS_STATUS_SUCCESS);
 
-       raddr = addr;
+       //raddr2 = addr;
+
+       err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 2, AF_INET);
+       ok(err == KS_STATUS_SUCCESS);
+       
+       err = ks_dht_bind(dht3, NULL, &addr, &ep3);
+       ok(err == KS_STATUS_SUCCESS);
+
+       //raddr3 = addr;
   }
 
   if (have_v6) {
@@ -91,20 +112,26 @@ int main() {
 
        err = ks_dht_bind(&dht2, NULL, &addr, NULL);
        ok(err == KS_STATUS_SUCCESS);
+
+       err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6);
+       ok(err == KS_STATUS_SUCCESS);
+
+       err = ks_dht_bind(dht3, NULL, &addr, NULL);
+       ok(err == KS_STATUS_SUCCESS);
   }
 
-  diag("Custom type tests\n");
+  //diag("Custom type tests\n");
   
-  buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
-  memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
-  dht1->recv_buffer_length = buflen;
+  //buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
+  //memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
+  //dht1->recv_buffer_length = buflen;
 
-  err = ks_dht_process(dht1, ep1, &raddr);
-  ok(err == KS_STATUS_SUCCESS);
+  //err = ks_dht_process(dht1, ep1, &raddr);
+  //ok(err == KS_STATUS_SUCCESS);
 
-  ks_dht_pulse(dht1, 100);
+  //ks_dht_pulse(dht1, 100);
 
-  ks_dht_pulse(&dht2, 100);
+  //ks_dht_pulse(&dht2, 100);
 
   
   //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
@@ -115,20 +142,39 @@ int main() {
   //ok(err == KS_STATUS_SUCCESS);
 
   
-  diag("Ping tests\n");
+  diag("Ping test\n");
   
-  ks_dht_send_ping(dht1, ep1, &raddr);
+  ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue ping from dht2 to dht1
 
-  ks_dht_pulse(dht1, 100);
+  ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1
   
-  ks_dht_pulse(&dht2, 100);
+  ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
+
+  ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1
+
+  // Test blind find_node from dht3 to dht1 to find dht2 nodeid
+
+  diag("Find_Node test\n");
 
-  ks_dht_pulse(dht1, 100);
+  ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
 
+  ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+
+  ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+
+  ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL);
   
   diag("Cleanup\n");
   /* Cleanup and shutdown */
 
+  err = ks_dht_deinit(dht3);
+  ok(err == KS_STATUS_SUCCESS);
+
+  err = ks_dht_free(&dht3);
+  ok(err == KS_STATUS_SUCCESS);
+
   err = ks_dht_deinit(&dht2);
   ok(err == KS_STATUS_SUCCESS);