]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: First tested pass on search functionality, not tested with deep searching...
authorShane Bryldt <astaelan@gmail.com>
Wed, 28 Dec 2016 00:52:10 +0000 (00:52 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_job.c
libs/libks/src/dht/ks_dht_search.c
libs/libks/test/testdht2.c

index 89e3643ee53bebbc818a6f8171a85dc2dbace9c4..fc71cddacf03400268de5e88359e3f5e8a71c62e 100644 (file)
@@ -258,10 +258,12 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
                                                                                  int32_t attempts);
 KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback);
 KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
+                                                                                  ks_dht_search_t *search,
                                                                                   ks_dht_job_callback_t query_callback,
                                                                                   ks_dht_job_callback_t finish_callback,
                                                                                   ks_dht_nodeid_t *target);
 KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
+                                                                         ks_dht_search_t *search,
                                                                          ks_dht_job_callback_t query_callback,
                                                                          ks_dht_job_callback_t finish_callback,
                                                                          ks_dht_nodeid_t *target,
@@ -316,14 +318,9 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target);
+KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback);
 KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search);
 
-KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
-KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pending, int32_t *active);
-
-KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid);
-KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending);
 
 /**
  *
index 28c981594711c39369a856d06db977a913303a43..056584943e690ce53ae9529116fc1667828aa04b 100644 (file)
@@ -154,19 +154,11 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        d->rt_ipv6 = NULL;
 
        /**
-        * Create the hash to store searches.
+        * Create the mutex to handle searches list.
         */
-       ks_hash_create(&d->searches4_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
-       ks_assert(d->searches4_hash);
-       ks_hash_create(&d->searches6_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
-       ks_assert(d->searches6_hash);
+       ks_mutex_create(&d->searches_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+       ks_assert(d->searches_mutex);
 
-       /**
-        * The searches hash uses arbitrary key size, which requires the key size be provided.
-        */
-       ks_hash_set_keysize(d->searches4_hash, KS_DHT_NODEID_SIZE);
-       ks_hash_set_keysize(d->searches6_hash, KS_DHT_NODEID_SIZE);
-       
        /**
         * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
         */
@@ -227,25 +219,12 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        d->token_secret_expiration = 0;
 
        /**
-        * Cleanup the search hash and it's contents if it is allocated.
+        * Cleanup the search mutex and searches if they are allocated.
         */
-       if (d->searches6_hash) {
-               for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       const void *key = NULL;
-                       ks_dht_search_t *val = NULL;
-                       ks_hash_this(it, &key, NULL, (void **)&val);
-                       ks_dht_search_destroy(&val);
-               }
-               ks_hash_destroy(&d->searches6_hash);
-       }
-       if (d->searches4_hash) {
-               for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       const void *key = NULL;
-                       ks_dht_search_t *val = NULL;
-                       ks_hash_this(it, &key, NULL, (void **)&val);
-                       ks_dht_search_destroy(&val);
-               }
-               ks_hash_destroy(&d->searches4_hash);
+       if (d->searches_mutex) ks_mutex_destroy(&d->searches_mutex);
+       for (ks_dht_search_t *search = d->searches_first, *searchn = NULL; search; search = searchn) {
+               searchn = search->next;
+               ks_dht_search_destroy(&search);
        }
 
        /**
@@ -623,57 +602,11 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
        if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
 }
 
-KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *searches)
-{
-       ks_hash_iterator_t *it = NULL;
-       ks_time_t now = ks_time_now();
-
-       ks_assert(dht);
-       ks_assert(searches);
-
-       ks_hash_write_lock(searches);
-       for (it = ks_hash_first(searches, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-               const void *key = NULL;
-               ks_dht_search_t *value = NULL;
-               int32_t active = 0;
-
-               ks_hash_this(it, &key, NULL, (void **)&value);
-
-               ks_mutex_lock(value->mutex);
-               for (ks_hash_iterator_t *i = ks_hash_first(value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
-                       const void *k = NULL;
-                       ks_dht_search_pending_t *v = NULL;
-
-                       ks_hash_this(i, &k, NULL, (void **)&v);
-
-                       if (v->finished) continue;
-
-                       if (v->expiration <= now) {
-                               char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                               char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                               ks_log(KS_LOG_DEBUG,
-                                          "Search for %s pending find_node to %s has expired without response\n",
-                                          ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE),
-                                          ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE));
-                               v->finished = KS_TRUE;
-                               continue;
-                       }
-                       active++;
-               }
-               ks_mutex_unlock(value->mutex);
-
-               if (active == 0) {
-                       for (int32_t index = 0; index < value->callbacks_size; ++index) value->callbacks[index](dht, value);
-                       ks_hash_remove(searches, (void *)key);
-                       ks_dht_search_destroy(&value);
-               }
-       }
-       ks_hash_write_unlock(searches);
-}
-
 KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
 {
        ks_hash_iterator_t *it = NULL;
+       ks_dht_search_t *searches_first = NULL;
+       ks_dht_search_t *searches_last = NULL;
        ks_time_t now = ks_time_now();
 
        ks_assert(dht);
@@ -702,16 +635,44 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
        }
        ks_hash_write_unlock(dht->transactions_hash);
 
-       if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
-       if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
+       ks_mutex_lock(dht->searches_mutex);
+       for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
+               ks_bool_t done = KS_FALSE;
+               searchn = search->next;
 
-       // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
+               ks_mutex_lock(search->mutex);
+               done = ks_hash_count(search->searching) == 0;
+               
+               if (done) {
+                       if (!searchp && !searchn) dht->searches_first = dht->searches_last = NULL;
+                       else if (!searchp) dht->searches_first = searchn;
+                       else if (!searchn) {
+                               dht->searches_last = searchp;
+                               dht->searches_last->next = NULL;
+                       }
+                       else searchp->next = searchn;
+
+                       search->next = NULL;
+                       if (searches_last) searches_last = searches_last->next = search;
+                       else searches_first = searches_last = search;
+               } else searchp = search;
+               ks_mutex_unlock(search->mutex);
+       }
+       ks_mutex_unlock(dht->searches_mutex);
+
+       for (ks_dht_search_t *search = searches_first, *searchn = NULL; search; search = searchn) {
+               searchn = search->next;
+               if (search->callback) search->callback(dht, search);
+               ks_dht_search_destroy(&search);
+       }
 
        if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
                dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
                dht->token_secret_previous = dht->token_secret_current;
                dht->token_secret_current = rand();
        }
+
+       // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
 }
 
 KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
@@ -1270,7 +1231,12 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
                return KS_STATUS_FAIL;
        }
 
-       ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
+       ks_log(KS_LOG_DEBUG,
+                  "Sending message to %s %d on %s %d\n",
+                  message->raddr.host,
+                  message->raddr.port,
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port);
        ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
 
        return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
@@ -1398,7 +1364,12 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data)
        ks_assert(thread);
        ks_assert(data);
 
-       ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", datagram->raddr.host, datagram->raddr.port);
+       ks_log(KS_LOG_DEBUG,
+                  "Received message from %s %d on %s %d\n",
+                  datagram->raddr.host,
+                  datagram->raddr.port,
+                  datagram->endpoint->addr.host,
+                  datagram->endpoint->addr.port);
        if (datagram->raddr.family != AF_INET && datagram->raddr.family != AF_INET6) {
                ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n");
                goto done;
@@ -1475,7 +1446,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
                                                                        KS_DHT_REMOTE,
                                                                        message->raddr.host,
                                                                        message->raddr.port,
-                                                                       KS_DHTRT_CREATE_DEFAULT,
+                                                                       KS_DHTRT_CREATE_PING,
                                                                        &node)) != KS_STATUS_SUCCESS) goto done;
        if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
 
@@ -1521,7 +1492,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                                                                        KS_DHT_REMOTE,
                                                                        message->raddr.host,
                                                                        message->raddr.port,
-                                                                       KS_DHTRT_CREATE_DEFAULT,
+                                                                       KS_DHTRT_CREATE_TOUCH,
                                                                        &node)) != KS_STATUS_SUCCESS) goto done;
        if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
        
@@ -1546,6 +1517,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                           transaction->job->raddr.port);
        } else {
                transaction->job->response = message;
+               transaction->job->response_id = message->args_id;
                message->transaction = transaction;
                if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
                else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
@@ -1558,20 +1530,85 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
        return ret;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_dht_node_t **nodes = NULL;
+       ks_size_t nodes_count = 0;
+       ks_dht_node_t *node = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(job);
+       ks_assert(job->search);
+
+       ks_mutex_lock(job->search->mutex);
+       ks_hash_remove(job->search->searching, job->response_id.id);
+
+       nodes = job->raddr.family == AF_INET ? job->response_nodes : job->response_nodes6;
+       nodes_count = job->raddr.family == AF_INET ? job->response_nodes_count : job->response_nodes6_count;
+       
+       for (int32_t i = 0; i < nodes_count; ++i) {
+               ks_dht_nodeid_t distance;
+               int32_t results_index = -1;
+               
+               node = nodes[i];
+
+               if (ks_hash_search(job->search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
+
+               ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &job->search->target);
+               if (job->search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
+                       results_index = job->search->results_length;
+                       job->search->results_length++;
+               } else {
+                       for (int32_t index = 0; index < job->search->results_length; ++index) {
+                               // Check if new node is closer than this previous result
+                               if (memcmp(distance.id, job->search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
+                                       // If this is the first node that is further then keep it
+                                       // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
+                                       if (results_index < 0) results_index = index;
+                                       else if (memcmp(job->search->distances[index].id, job->search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
+                               }
+                       }
+               }
+
+               if (results_index >= 0) {
+                       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+                       char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+                       char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+
+                       ks_log(KS_LOG_DEBUG,
+                                  "Set closer node id %s (%s) in search of target id %s at results index %d\n",
+                                  ks_dht_hex(node->nodeid.id, id_buf, KS_DHT_NODEID_SIZE),
+                                  ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+                                  ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
+                                  results_index);
+                       // @todo add lock on node
+                       job->search->results[results_index] = node;
+                       job->search->distances[results_index] = distance;
+
+                       ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
+                       ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
+
+                       if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
+               }
+       }
+       
+ done:
+       ks_mutex_unlock(job->search->mutex);
+
+       return ret;
+}
 
-KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
-                                                                         int32_t family,
-                                                                         ks_dht_nodeid_t *target,
-                                                                         ks_dht_search_callback_t callback,
-                                                                         ks_dht_search_t **search)
+KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
+                                                                                          int32_t family,
+                                                                                          ks_dht_nodeid_t *target,
+                                                                                          ks_dht_search_callback_t callback,
+                                                                                          ks_dht_search_t **search)
 {
        ks_bool_t locked_searches = KS_FALSE;
        ks_bool_t locked_search = KS_FALSE;
-       ks_hash_t *searches = NULL;
        ks_dhtrt_routetable_t *rt = NULL;
        ks_dht_search_t *s = NULL;
-       ks_bool_t inserted = KS_FALSE;
-       ks_bool_t allocated = KS_FALSE;
     ks_dhtrt_querynodes_t query;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
@@ -1586,46 +1623,28 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
                        ret = KS_STATUS_FAIL;
                        goto done;
                }
-               searches = dht->searches4_hash;
                rt = dht->rt_ipv4;
        } else {
                if (!dht->rt_ipv6) {
                        ret = KS_STATUS_FAIL;
                        goto done;
                }
-               searches = dht->searches6_hash;
                rt = dht->rt_ipv6;
        }
 
-       // check hash for target to see if search already exists
-       ks_hash_write_lock(searches);
+       ks_mutex_lock(dht->searches_mutex);
        locked_searches = KS_TRUE;
 
-       s = ks_hash_search(searches, target->id, KS_UNLOCKED);
-
-       // if search does not exist, create new search and store in hash by target
-       if (!s) {
-               if ((ret = ks_dht_search_create(&s, dht->pool, target)) != KS_STATUS_SUCCESS) goto done;
-               allocated = KS_TRUE;
-       } else inserted = KS_TRUE;
-
-       // add callback regardless of whether the search is new or old
-       if ((ret = ks_dht_search_callback_add(s, callback)) != KS_STATUS_SUCCESS) goto done;
-
-       // if the search is old then bail out and return successfully
-       if (!allocated) goto done;
+       if ((ret = ks_dht_search_create(&s, dht->pool, target, callback)) != KS_STATUS_SUCCESS) goto done;
 
-       // everything past this point until final cleanup is only for when a search of the target does not already exist
-
-       if ((ret = ks_hash_insert(searches, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
-       inserted = KS_TRUE;
-
-       // lock search before unlocking the searches_hash to prevent this search from being used before we finish setting it up
+    if (dht->searches_last) dht->searches_last = dht->searches_last->next = s;
+       else dht->searches_first = dht->searches_last = s;
+       
        ks_mutex_lock(s->mutex);
        locked_search = KS_TRUE;
 
-       // release searches_hash lock now, but search is still locked
-       ks_hash_write_unlock(searches);
+       // release searches lock now, but search is still locked
+       ks_mutex_unlock(dht->searches_mutex);
        locked_searches = KS_FALSE;
 
        // find closest good nodes to target locally and store as the closest results
@@ -1637,36 +1656,32 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
        ks_dhtrt_findclosest_nodes(rt, &query);
        for (int32_t i = 0; i < query.count; ++i) {
                ks_dht_node_t *n = query.nodes[i];
-               ks_dht_search_pending_t *pending = NULL;
+               ks_bool_t searched = KS_FALSE;
 
                // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet
-               s->results[s->results_length] = n->nodeid;
+               s->results[s->results_length] = n;
                ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target);
                s->results_length++;
 
-               pending = ks_hash_search(s->pending, n->nodeid.id, KS_UNLOCKED);
-               if (pending) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
+               searched = ks_hash_search(s->searched, n->nodeid.id, KS_UNLOCKED) != 0;
+               if (searched) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
 
-               // add to pending with expiration, if any of this fails it's almost catastrophic so just bail out and fail the entire search attempt
-               // there are no probable causes for a failure but check them anyway
-               if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
-               if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
-                       ks_dht_search_pending_destroy(&pending);
-                       goto done;
-               }
-               if ((ret = ks_dht_findnode(dht, &n->addr, NULL, target)) != KS_STATUS_SUCCESS) goto done;
+               ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
+               ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
+               
+               if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done;
        }
-       ks_dhtrt_release_querynodes(&query);
+       //ks_dhtrt_release_querynodes(&query);
        ks_mutex_unlock(s->mutex);
        locked_search = KS_FALSE;
 
        if (search) *search = s;
 
  done:
-       if (locked_searches) ks_hash_write_unlock(searches);
+       if (locked_searches) ks_mutex_unlock(dht->searches_mutex);
        if (locked_search) ks_mutex_unlock(s->mutex);
        if (ret != KS_STATUS_SUCCESS) {
-               if (!inserted && s) ks_dht_search_destroy(&s);
+               //if (s) ks_dht_search_destroy(&s);
                *search = NULL;
        }
        return ret;
@@ -1833,7 +1848,6 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
 {
        ks_assert(dht);
        ks_assert(job);
-
        ks_mutex_lock(dht->jobs_mutex);
     if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
        else dht->jobs_first = dht->jobs_last = job;
@@ -1851,6 +1865,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
        for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
                ks_bool_t done = KS_FALSE;
                jobn = job->next;
+
                if (job->state == KS_DHT_JOB_STATE_QUERYING) {
                        job->state = KS_DHT_JOB_STATE_RESPONDING;
                        if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
@@ -1865,7 +1880,10 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
                if (done) {
                        if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
                        else if (!jobp) dht->jobs_first = jobn;
-                       else if (!jobn) dht->jobs_last = jobp;
+                       else if (!jobn) {
+                               dht->jobs_last = jobp;
+                           dht->jobs_last->next = NULL;
+                       }
                        else jobp->next = jobn;
 
                        job->next = NULL;
@@ -1966,7 +1984,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t
 }
 
 
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
+                                                                               ks_dht_search_t *search,
+                                                                               const ks_sockaddr_t *raddr,
+                                                                               ks_dht_job_callback_t callback,
+                                                                               ks_dht_nodeid_t *target)
 {
        ks_dht_job_t *job = NULL;
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -1976,7 +1998,7 @@ KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *radd
        ks_assert(target);
 
        if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
-       ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target);
+       ks_dht_job_build_findnode(job, search, ks_dht_query_findnode, callback, target);
        ks_dht_jobs_add(dht, job);
 
        // next step in ks_dht_pulse_jobs with QUERYING state
@@ -2132,8 +2154,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
        size_t nodes6_size = 0;
        size_t nodes_len = 0;
        size_t nodes6_len = 0;
-       ks_hash_t *searches = NULL;
-       ks_dht_search_t *search = NULL;
        ks_dht_node_t *node = NULL;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -2154,19 +2174,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
                nodes6_size = ben_str_len(n);
        }
 
-       searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
-
-       ks_hash_read_lock(searches);
-       search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED);
-       if (search) {
-               ks_dht_search_pending_t *pending = NULL;
-
-               ks_mutex_lock(search->mutex);
-               pending = ks_hash_search(search->pending, job->response->args_id.id, KS_UNLOCKED);
-               if (pending) pending->finished = KS_TRUE;
-       }
-       ks_hash_read_unlock(searches);
-
        while (nodes_len < nodes_size) {
                ks_dht_nodeid_t nid;
                ks_sockaddr_t addr;
@@ -2181,52 +2188,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
                           addr.port);
 
                ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
-               ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+               ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
                job->response_nodes[job->response_nodes_count++] = node;
-
-               // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
-               if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
-                       ks_dht_nodeid_t distance;
-                       int32_t results_index = -1;
-                       
-                       ks_dht_utility_nodeid_xor(&distance, &nid, &search->target);
-                       if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
-                               results_index = search->results_length;
-                               search->results_length++;
-                       } else {
-                               for (int32_t index = 0; index < search->results_length; ++index) {
-                                       // Check if new node is closer than this previous result
-                                       if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
-                                               // If this is the first node that is further then keep it
-                                               // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
-                                               if (results_index < 0) results_index = index;
-                                               else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
-                                       }
-                               }
-                       }
-
-                       if (results_index >= 0) {
-                               char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                               char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                               ks_dht_search_pending_t *pending = NULL;
-
-                               ks_log(KS_LOG_DEBUG,
-                                          "Set closer node id %s (%s) in search of target id %s at results index %d\n",
-                                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
-                                          ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
-                                          ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
-                                          results_index);
-                               search->results[results_index] = nid;
-                               search->distances[results_index] = distance;
-
-                               if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
-                               if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
-                                       ks_dht_search_pending_destroy(&pending);
-                                       goto done;
-                               }
-                               if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
-                       }
-               }
        }
 
        while (nodes6_len < nodes6_size) {
@@ -2243,63 +2206,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
                           addr.port);
 
                ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
-               ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+               ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
                job->response_nodes6[job->response_nodes6_count++] = node;
-
-               // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
-               if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
-                       ks_dht_nodeid_t distance;
-                       int32_t results_index = -1;
-                       
-                       ks_dht_utility_nodeid_xor(&distance, &nid, &search->target);
-                       if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
-                               results_index = search->results_length;
-                               search->results_length++;
-                       } else {
-                               for (int32_t index = 0; index < search->results_length; ++index) {
-                                       // Check if new node is closer than this previous result
-                                       if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
-                                               // If this is the first node that is further then keep it
-                                               // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
-                                               if (results_index < 0) results_index = index;
-                                               else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
-                                       }
-                               }
-                       }
-
-                       if (results_index >= 0) {
-                               char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                               char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                               ks_dht_search_pending_t *pending = NULL;
-
-                               ks_log(KS_LOG_DEBUG,
-                                          "Set closer node id %s (%s) in search of target id %s at results index %d\n",
-                                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
-                                          ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
-                                          ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
-                                          results_index);
-                               search->results[results_index] = nid;
-                               search->distances[results_index] = distance;
-
-                               if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
-                               if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
-                                       ks_dht_search_pending_destroy(&pending);
-                                       goto done;
-                               }
-                               if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
-                       }
-               }
        }
 
        //ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
 
  done:
-       if(search) ks_mutex_unlock(search->mutex);
        return ret;
 }
 
 
 KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
+                                                                  ks_dht_search_t *search,
                                                                   const ks_sockaddr_t *raddr,
                                                                   ks_dht_job_callback_t callback,
                                                                   ks_dht_nodeid_t *target,
@@ -2314,7 +2233,7 @@ KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
        ks_assert(target);
 
        if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
-       ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
+       ks_dht_job_build_get(job, search, ks_dht_query_get, callback, target, salt, salt_length);
        ks_dht_jobs_add(dht, job);
 
  done:
@@ -2525,7 +2444,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
                           addr.port);
 
                ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
-               ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+               ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
                job->response_nodes[job->response_nodes_count++] = node;
        }
        while (nodes6_len < nodes6_size) {
@@ -2542,7 +2461,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
                           addr.port);
 
                ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
-               ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+               ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
                job->response_nodes6[job->response_nodes6_count++] = node;
        }
        
index 237495711ea19e4d474502b8522d4bc348f4a793..7e8598b53f5b7aba8fe5ea86185db46f75af88ce 100644 (file)
@@ -18,7 +18,7 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_DATAGRAM_BUFFER_SIZE 1000
 
 //#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
-#define KS_DHT_PULSE_EXPIRATIONS 10
+#define KS_DHT_PULSE_EXPIRATIONS 1
 
 #define KS_DHT_NODEID_SIZE 20
 
@@ -126,6 +126,8 @@ struct ks_dht_job_s {
 
        enum ks_dht_job_state_t state;
 
+       ks_dht_search_t *search;
+
        ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
        int32_t attempts;
 
@@ -134,7 +136,6 @@ struct ks_dht_job_s {
        ks_dht_job_callback_t finish_callback;
 
        ks_dht_message_t *response;
-       //ks_dht_nodeid_t response_id;
 
        // job specific query parameters
        ks_dht_nodeid_t query_target;
@@ -144,6 +145,7 @@ struct ks_dht_job_s {
        ks_dht_storageitem_t *query_storageitem;
 
        // job specific response parameters
+       ks_dht_nodeid_t response_id;
        ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
        ks_size_t response_nodes_count;
        ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
@@ -214,23 +216,17 @@ struct ks_dht_transaction_s {
 
 struct ks_dht_search_s {
        ks_pool_t *pool;
-       ks_mutex_t *mutex;
+       ks_dht_search_t *next;
        ks_dht_nodeid_t target;
-       ks_dht_search_callback_t *callbacks;
-       ks_size_t callbacks_size;
-       ks_hash_t *pending;
-       ks_dht_nodeid_t results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
+       ks_dht_search_callback_t callback;
+       ks_mutex_t *mutex;
+       ks_hash_t *searched;
+       ks_hash_t *searching;
+       ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
        ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
        ks_size_t results_length;
 };
 
-struct ks_dht_search_pending_s {
-       ks_pool_t *pool;
-       ks_dht_nodeid_t nodeid;
-       ks_time_t expiration;
-       ks_bool_t finished;
-};
-
 struct ks_dht_storageitem_s {
        ks_pool_t *pool;
        ks_dht_nodeid_t id;
@@ -283,8 +279,9 @@ struct ks_dht_s {
        ks_dhtrt_routetable_t *rt_ipv4;
        ks_dhtrt_routetable_t *rt_ipv6;
 
-       ks_hash_t *searches4_hash;
-       ks_hash_t *searches6_hash;
+       ks_mutex_t *searches_mutex;
+       ks_dht_search_t *searches_first;
+       ks_dht_search_t *searches_last;
 
        volatile uint32_t token_secret_current;
        volatile uint32_t token_secret_previous;
@@ -436,12 +433,17 @@ KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, k
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
+KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
+                                                                               ks_dht_search_t *search,
+                                                                               const ks_sockaddr_t *raddr,
+                                                                               ks_dht_job_callback_t callback,
+                                                                               ks_dht_nodeid_t *target);
 
 /**
  *
  */
 KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
+                                                                  ks_dht_search_t *search,
                                                                   const ks_sockaddr_t *raddr,
                                                                   ks_dht_job_callback_t callback,
                                                                   ks_dht_nodeid_t *target,
@@ -472,11 +474,11 @@ KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
  * @see ks_dht_search_pending_create
  * @see ks_dht_send_findnode
  */
-KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
-                                                                         int32_t family,
-                                                                         ks_dht_nodeid_t *target,
-                                                                         ks_dht_search_callback_t callback,
-                                                                         ks_dht_search_t **search);
+KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
+                                                                                          int32_t family,
+                                                                                          ks_dht_nodeid_t *target,
+                                                                                          ks_dht_search_callback_t callback,
+                                                                                          ks_dht_search_t **search);
 
 
 /**
index 97e30b460db0a794f67a69035380e877be9bb07b..9720282a7fe6e8ab95235ce272f19f79b9822666 100644 (file)
@@ -45,6 +45,7 @@ KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t
 }
 
 KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
+                                                                                  ks_dht_search_t *search,
                                                                                   ks_dht_job_callback_t query_callback,
                                                                                   ks_dht_job_callback_t finish_callback,
                                                                                   ks_dht_nodeid_t *target)
@@ -53,12 +54,14 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
        ks_assert(query_callback);
        ks_assert(target);
 
+       job->search = search;
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
        job->query_target = *target;
 }
 
 KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
+                                                                         ks_dht_search_t *search,
                                                                          ks_dht_job_callback_t query_callback,
                                                                          ks_dht_job_callback_t finish_callback,
                                                                          ks_dht_nodeid_t *target,
@@ -69,6 +72,7 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
        ks_assert(query_callback);
        ks_assert(target);
 
+       job->search = search;
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
        job->query_target = *target;
index 31e62062b0cd9db19c7631ba5a6f1fe7fdb91fbb..2417fd0c4b26174bc7a5a1b4dc8b014a1c313448 100644 (file)
@@ -2,7 +2,7 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
-KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback)
 {
        ks_dht_search_t *s;
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -21,9 +21,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
 
        memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE);
 
-       ks_hash_create(&s->pending, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK, s->pool);
-       ks_assert(s->pending);
-       ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE);
+       s->callback = callback;
+
+       ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
+       ks_assert(s->searched);
+       ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE);
+
+       ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
+       ks_assert(s->searching);
+       ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE);
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -35,85 +41,19 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
 KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
 {
        ks_dht_search_t *s;
-       ks_hash_iterator_t *it;
 
        ks_assert(search);
        ks_assert(*search);
 
        s = *search;
 
-       if (s->pending) {
-               for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       const void *key = NULL;
-                       ks_dht_search_pending_t *val = NULL;
-                       ks_hash_this(it, &key, NULL, (void **)&val);
-                       ks_dht_search_pending_destroy(&val);
-               }
-               ks_hash_destroy(&s->pending);
-       }
-       if (s->callbacks) {
-               ks_pool_free(s->pool, &s->callbacks);
-               s->callbacks = NULL;
-       }
+       if (s->searching) ks_hash_destroy(&s->searching);
+       if (s->searched) ks_hash_destroy(&s->searched);
        if (s->mutex) ks_mutex_destroy(&s->mutex);
 
        ks_pool_free(s->pool, search);
 }
 
-KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback)
-{
-       ks_assert(search);
-
-       if (callback) {
-               int32_t index;
-
-               ks_mutex_lock(search->mutex);
-               index = search->callbacks_size++;
-               search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
-                                                                                                                                          (void *)search->callbacks,
-                                                                                                                                          sizeof(ks_dht_search_callback_t) * search->callbacks_size);
-               ks_assert(search->callbacks);
-               search->callbacks[index] = callback;
-               ks_mutex_unlock(search->mutex);
-       }
-       return KS_STATUS_SUCCESS;
-}
-
-KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid)
-{
-       ks_dht_search_pending_t *p;
-       ks_status_t ret = KS_STATUS_SUCCESS;
-
-       ks_assert(pending);
-       ks_assert(pool);
-
-       *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t));
-       ks_assert(p);
-
-       p->pool = pool;
-       p->nodeid = *nodeid;
-       p->expiration = ks_time_now() + ((ks_time_t)KS_DHT_SEARCH_EXPIRATION * KS_USEC_PER_SEC);
-       p->finished = KS_FALSE;
-
-       // done:
-       if (ret != KS_STATUS_SUCCESS) {
-               if (p) ks_dht_search_pending_destroy(pending);
-       }
-       return ret;
-}
-
-KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending)
-{
-       ks_dht_search_pending_t *p;
-
-       ks_assert(pending);
-       ks_assert(*pending);
-
-       p = *pending;
-
-       ks_pool_free(p->pool, pending);
-}
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index 60d0d71de095de88f2fcc5dca372f49a131b211f..28b79126d4b330c77a8d57262fbbdbc228a2afcf 100644 (file)
@@ -32,6 +32,12 @@ ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job)
        return KS_STATUS_SUCCESS;
 }
 
+ks_status_t dht2_search_findnode_callback(ks_dht_t *dht, ks_dht_search_t *search)
+{
+       diag("dht2_search_findnode_callback %d\n", search->results_length);
+       return KS_STATUS_SUCCESS;
+}
+
 int main() {
        //ks_size_t buflen;
   ks_status_t err;
@@ -48,7 +54,7 @@ int main() {
   ks_sockaddr_t raddr1;
   //ks_sockaddr_t raddr2;
   //ks_sockaddr_t raddr3;
-  ks_dht_nodeid_t target;
+  //ks_dht_nodeid_t target;
   //ks_dht_storageitem_t *immutable = NULL;
   //ks_dht_storageitem_t *mutable = NULL;
   //const char *v = "Hello World!";
@@ -147,7 +153,6 @@ int main() {
        ok(err == KS_STATUS_SUCCESS);
   }
 
-  /*
   diag("Ping test\n");
   
   ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
@@ -166,14 +171,35 @@ int main() {
 
   diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
   for (int i = 0; i < 10; ++i) {
-         //diag("DHT 1\n");
          ks_dht_pulse(dht1, 100);
-         //diag("DHT 2\n");
          ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
   }
   ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
-  */
+
+  
+  ks_dht_ping(dht3, &raddr1, NULL); // (QUERYING)
+
+  ks_dht_pulse(dht3, 100); // Send queued ping from dht3 to dht1 (RESPONDING)
   
+  ks_dht_pulse(dht1, 100); // Receive and process ping query from dht3, queue and send ping response
+
+  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+  ks_dht_pulse(dht3, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING)
+
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
+
+  ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
+
+  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  for (int i = 0; i < 20; ++i) {
+         ks_dht_pulse(dht1, 100);
+         ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
+  }
+  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+
   //diag("Get test\n");
   
 
@@ -205,13 +231,14 @@ int main() {
   ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
   */
 
+  /*
   diag("Put test\n");
 
   crypto_sign_keypair(pk.key, sk.key);
 
   ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
 
-  ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
+  ks_dht_get(dht2, NULL, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
   
   ks_dht_pulse(dht2, 100); // send get query
 
@@ -225,40 +252,51 @@ int main() {
 
   ks_dht_pulse(dht2, 100); // receive put response
 
-  ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING)
+  ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
 
   for (int i = 0; i < 10; ++i) {
-         //diag("DHT 1\n");
          ks_dht_pulse(dht1, 100);
-         //diag("DHT 2\n");
          ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
   }
+  */
 
   // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
 
-  //diag("Find_Node test\n");
+  /*
+  diag("Find_Node test\n");
 
-  //ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
+  ks_dht_findnode(dht3, NULL, &raddr1, NULL, &ep2->nodeid);
 
-  //ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+  ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
 
-  //ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+  ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
 
-  //ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
 
-  //ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+  ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+  
+  ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
 
-  //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
   
-  //diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
-  //for (int i = 0; i < 10; ++i) {
-         //diag("DHT 1\n");
-         //ks_dht_pulse(dht1, 100);
-         //diag("DHT 2\n");
-         //ks_dht_pulse(dht2, 100);
-  //}
-  //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  for (int i = 0; i < 10; ++i) {
+         ks_dht_pulse(dht1, 100);
+         ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
+  }
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+  */
 
+  diag("Search test\n");
+  ks_dht_search_findnode(dht3, AF_INET, &ep2->nodeid, dht2_search_findnode_callback, NULL);
+  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  for (int i = 0; i < 30; ++i) {
+         ks_dht_pulse(dht1, 100);
+         ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
+  }
 
   /* Cleanup and shutdown */
   diag("Cleanup\n");