d->rt_ipv6 = NULL;
/**
- * Create the hash to store searches.
+ * Create the mutex to handle searches list.
*/
- ks_hash_create(&d->searches4_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
- ks_assert(d->searches4_hash);
- ks_hash_create(&d->searches6_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
- ks_assert(d->searches6_hash);
+ ks_mutex_create(&d->searches_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+ ks_assert(d->searches_mutex);
- /**
- * The searches hash uses arbitrary key size, which requires the key size be provided.
- */
- ks_hash_set_keysize(d->searches4_hash, KS_DHT_NODEID_SIZE);
- ks_hash_set_keysize(d->searches6_hash, KS_DHT_NODEID_SIZE);
-
/**
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_expiration = 0;
/**
- * Cleanup the search hash and it's contents if it is allocated.
+ * Cleanup the search mutex and searches if they are allocated.
*/
- if (d->searches6_hash) {
- for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *key = NULL;
- ks_dht_search_t *val = NULL;
- ks_hash_this(it, &key, NULL, (void **)&val);
- ks_dht_search_destroy(&val);
- }
- ks_hash_destroy(&d->searches6_hash);
- }
- if (d->searches4_hash) {
- for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *key = NULL;
- ks_dht_search_t *val = NULL;
- ks_hash_this(it, &key, NULL, (void **)&val);
- ks_dht_search_destroy(&val);
- }
- ks_hash_destroy(&d->searches4_hash);
+ if (d->searches_mutex) ks_mutex_destroy(&d->searches_mutex);
+ for (ks_dht_search_t *search = d->searches_first, *searchn = NULL; search; search = searchn) {
+ searchn = search->next;
+ ks_dht_search_destroy(&search);
}
/**
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
}
-KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *searches)
-{
- ks_hash_iterator_t *it = NULL;
- ks_time_t now = ks_time_now();
-
- ks_assert(dht);
- ks_assert(searches);
-
- ks_hash_write_lock(searches);
- for (it = ks_hash_first(searches, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *key = NULL;
- ks_dht_search_t *value = NULL;
- int32_t active = 0;
-
- ks_hash_this(it, &key, NULL, (void **)&value);
-
- ks_mutex_lock(value->mutex);
- for (ks_hash_iterator_t *i = ks_hash_first(value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
- const void *k = NULL;
- ks_dht_search_pending_t *v = NULL;
-
- ks_hash_this(i, &k, NULL, (void **)&v);
-
- if (v->finished) continue;
-
- if (v->expiration <= now) {
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- ks_log(KS_LOG_DEBUG,
- "Search for %s pending find_node to %s has expired without response\n",
- ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE));
- v->finished = KS_TRUE;
- continue;
- }
- active++;
- }
- ks_mutex_unlock(value->mutex);
-
- if (active == 0) {
- for (int32_t index = 0; index < value->callbacks_size; ++index) value->callbacks[index](dht, value);
- ks_hash_remove(searches, (void *)key);
- ks_dht_search_destroy(&value);
- }
- }
- ks_hash_write_unlock(searches);
-}
-
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
+ ks_dht_search_t *searches_first = NULL;
+ ks_dht_search_t *searches_last = NULL;
ks_time_t now = ks_time_now();
ks_assert(dht);
}
ks_hash_write_unlock(dht->transactions_hash);
- if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
- if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
+ ks_mutex_lock(dht->searches_mutex);
+ for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
+ ks_bool_t done = KS_FALSE;
+ searchn = search->next;
- // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
+ ks_mutex_lock(search->mutex);
+ done = ks_hash_count(search->searching) == 0;
+
+ if (done) {
+ if (!searchp && !searchn) dht->searches_first = dht->searches_last = NULL;
+ else if (!searchp) dht->searches_first = searchn;
+ else if (!searchn) {
+ dht->searches_last = searchp;
+ dht->searches_last->next = NULL;
+ }
+ else searchp->next = searchn;
+
+ search->next = NULL;
+ if (searches_last) searches_last = searches_last->next = search;
+ else searches_first = searches_last = search;
+ } else searchp = search;
+ ks_mutex_unlock(search->mutex);
+ }
+ ks_mutex_unlock(dht->searches_mutex);
+
+ for (ks_dht_search_t *search = searches_first, *searchn = NULL; search; search = searchn) {
+ searchn = search->next;
+ if (search->callback) search->callback(dht, search);
+ ks_dht_search_destroy(&search);
+ }
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
+
+ // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
}
KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
return KS_STATUS_FAIL;
}
- ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
+ ks_log(KS_LOG_DEBUG,
+ "Sending message to %s %d on %s %d\n",
+ message->raddr.host,
+ message->raddr.port,
+ message->endpoint->addr.host,
+ message->endpoint->addr.port);
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
ks_assert(thread);
ks_assert(data);
- ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", datagram->raddr.host, datagram->raddr.port);
+ ks_log(KS_LOG_DEBUG,
+ "Received message from %s %d on %s %d\n",
+ datagram->raddr.host,
+ datagram->raddr.port,
+ datagram->endpoint->addr.host,
+ datagram->endpoint->addr.port);
if (datagram->raddr.family != AF_INET && datagram->raddr.family != AF_INET6) {
ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n");
goto done;
KS_DHT_REMOTE,
message->raddr.host,
message->raddr.port,
- KS_DHTRT_CREATE_DEFAULT,
+ KS_DHTRT_CREATE_PING,
&node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
KS_DHT_REMOTE,
message->raddr.host,
message->raddr.port,
- KS_DHTRT_CREATE_DEFAULT,
+ KS_DHTRT_CREATE_TOUCH,
&node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
transaction->job->raddr.port);
} else {
transaction->job->response = message;
+ transaction->job->response_id = message->args_id;
message->transaction = transaction;
if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
return ret;
}
+KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_dht_node_t **nodes = NULL;
+ ks_size_t nodes_count = 0;
+ ks_dht_node_t *node = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(job);
+ ks_assert(job->search);
+
+ ks_mutex_lock(job->search->mutex);
+ ks_hash_remove(job->search->searching, job->response_id.id);
+
+ nodes = job->raddr.family == AF_INET ? job->response_nodes : job->response_nodes6;
+ nodes_count = job->raddr.family == AF_INET ? job->response_nodes_count : job->response_nodes6_count;
+
+ for (int32_t i = 0; i < nodes_count; ++i) {
+ ks_dht_nodeid_t distance;
+ int32_t results_index = -1;
+
+ node = nodes[i];
+
+ if (ks_hash_search(job->search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
+
+ ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &job->search->target);
+ if (job->search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
+ results_index = job->search->results_length;
+ job->search->results_length++;
+ } else {
+ for (int32_t index = 0; index < job->search->results_length; ++index) {
+ // Check if new node is closer than this previous result
+ if (memcmp(distance.id, job->search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
+ // If this is the first node that is further then keep it
+ // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
+ if (results_index < 0) results_index = index;
+ else if (memcmp(job->search->distances[index].id, job->search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
+ }
+ }
+ }
+
+ if (results_index >= 0) {
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+
+ ks_log(KS_LOG_DEBUG,
+ "Set closer node id %s (%s) in search of target id %s at results index %d\n",
+ ks_dht_hex(node->nodeid.id, id_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
+ results_index);
+ // @todo add lock on node
+ job->search->results[results_index] = node;
+ job->search->distances[results_index] = distance;
+
+ ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
+ ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
+
+ if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
+ }
+ }
+
+ done:
+ ks_mutex_unlock(job->search->mutex);
+
+ return ret;
+}
-KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
- int32_t family,
- ks_dht_nodeid_t *target,
- ks_dht_search_callback_t callback,
- ks_dht_search_t **search)
+KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
+ int32_t family,
+ ks_dht_nodeid_t *target,
+ ks_dht_search_callback_t callback,
+ ks_dht_search_t **search)
{
ks_bool_t locked_searches = KS_FALSE;
ks_bool_t locked_search = KS_FALSE;
- ks_hash_t *searches = NULL;
ks_dhtrt_routetable_t *rt = NULL;
ks_dht_search_t *s = NULL;
- ks_bool_t inserted = KS_FALSE;
- ks_bool_t allocated = KS_FALSE;
ks_dhtrt_querynodes_t query;
ks_status_t ret = KS_STATUS_SUCCESS;
ret = KS_STATUS_FAIL;
goto done;
}
- searches = dht->searches4_hash;
rt = dht->rt_ipv4;
} else {
if (!dht->rt_ipv6) {
ret = KS_STATUS_FAIL;
goto done;
}
- searches = dht->searches6_hash;
rt = dht->rt_ipv6;
}
- // check hash for target to see if search already exists
- ks_hash_write_lock(searches);
+ ks_mutex_lock(dht->searches_mutex);
locked_searches = KS_TRUE;
- s = ks_hash_search(searches, target->id, KS_UNLOCKED);
-
- // if search does not exist, create new search and store in hash by target
- if (!s) {
- if ((ret = ks_dht_search_create(&s, dht->pool, target)) != KS_STATUS_SUCCESS) goto done;
- allocated = KS_TRUE;
- } else inserted = KS_TRUE;
-
- // add callback regardless of whether the search is new or old
- if ((ret = ks_dht_search_callback_add(s, callback)) != KS_STATUS_SUCCESS) goto done;
-
- // if the search is old then bail out and return successfully
- if (!allocated) goto done;
+ if ((ret = ks_dht_search_create(&s, dht->pool, target, callback)) != KS_STATUS_SUCCESS) goto done;
- // everything past this point until final cleanup is only for when a search of the target does not already exist
-
- if ((ret = ks_hash_insert(searches, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
- inserted = KS_TRUE;
-
- // lock search before unlocking the searches_hash to prevent this search from being used before we finish setting it up
+ if (dht->searches_last) dht->searches_last = dht->searches_last->next = s;
+ else dht->searches_first = dht->searches_last = s;
+
ks_mutex_lock(s->mutex);
locked_search = KS_TRUE;
- // release searches_hash lock now, but search is still locked
- ks_hash_write_unlock(searches);
+ // release searches lock now, but search is still locked
+ ks_mutex_unlock(dht->searches_mutex);
locked_searches = KS_FALSE;
// find closest good nodes to target locally and store as the closest results
ks_dhtrt_findclosest_nodes(rt, &query);
for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *n = query.nodes[i];
- ks_dht_search_pending_t *pending = NULL;
+ ks_bool_t searched = KS_FALSE;
// always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet
- s->results[s->results_length] = n->nodeid;
+ s->results[s->results_length] = n;
ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target);
s->results_length++;
- pending = ks_hash_search(s->pending, n->nodeid.id, KS_UNLOCKED);
- if (pending) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
+ searched = ks_hash_search(s->searched, n->nodeid.id, KS_UNLOCKED) != 0;
+ if (searched) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
- // add to pending with expiration, if any of this fails it's almost catastrophic so just bail out and fail the entire search attempt
- // there are no probable causes for a failure but check them anyway
- if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
- ks_dht_search_pending_destroy(&pending);
- goto done;
- }
- if ((ret = ks_dht_findnode(dht, &n->addr, NULL, target)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
+ ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
+
+ if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done;
}
- ks_dhtrt_release_querynodes(&query);
+ //ks_dhtrt_release_querynodes(&query);
ks_mutex_unlock(s->mutex);
locked_search = KS_FALSE;
if (search) *search = s;
done:
- if (locked_searches) ks_hash_write_unlock(searches);
+ if (locked_searches) ks_mutex_unlock(dht->searches_mutex);
if (locked_search) ks_mutex_unlock(s->mutex);
if (ret != KS_STATUS_SUCCESS) {
- if (!inserted && s) ks_dht_search_destroy(&s);
+ //if (s) ks_dht_search_destroy(&s);
*search = NULL;
}
return ret;
{
ks_assert(dht);
ks_assert(job);
-
ks_mutex_lock(dht->jobs_mutex);
if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
else dht->jobs_first = dht->jobs_last = job;
for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
ks_bool_t done = KS_FALSE;
jobn = job->next;
+
if (job->state == KS_DHT_JOB_STATE_QUERYING) {
job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
if (done) {
if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
else if (!jobp) dht->jobs_first = jobn;
- else if (!jobn) dht->jobs_last = jobp;
+ else if (!jobn) {
+ dht->jobs_last = jobp;
+ dht->jobs_last->next = NULL;
+ }
else jobp->next = jobn;
job->next = NULL;
}
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
+ ks_dht_search_t *search,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ ks_dht_nodeid_t *target)
{
ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(target);
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
- ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target);
+ ks_dht_job_build_findnode(job, search, ks_dht_query_findnode, callback, target);
ks_dht_jobs_add(dht, job);
// next step in ks_dht_pulse_jobs with QUERYING state
size_t nodes6_size = 0;
size_t nodes_len = 0;
size_t nodes6_len = 0;
- ks_hash_t *searches = NULL;
- ks_dht_search_t *search = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
nodes6_size = ben_str_len(n);
}
- searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
-
- ks_hash_read_lock(searches);
- search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED);
- if (search) {
- ks_dht_search_pending_t *pending = NULL;
-
- ks_mutex_lock(search->mutex);
- pending = ks_hash_search(search->pending, job->response->args_id.id, KS_UNLOCKED);
- if (pending) pending->finished = KS_TRUE;
- }
- ks_hash_read_unlock(searches);
-
while (nodes_len < nodes_size) {
ks_dht_nodeid_t nid;
ks_sockaddr_t addr;
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
- ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+ ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
job->response_nodes[job->response_nodes_count++] = node;
-
- // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
- if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
- ks_dht_nodeid_t distance;
- int32_t results_index = -1;
-
- ks_dht_utility_nodeid_xor(&distance, &nid, &search->target);
- if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
- results_index = search->results_length;
- search->results_length++;
- } else {
- for (int32_t index = 0; index < search->results_length; ++index) {
- // Check if new node is closer than this previous result
- if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
- // If this is the first node that is further then keep it
- // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
- if (results_index < 0) results_index = index;
- else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
- }
- }
- }
-
- if (results_index >= 0) {
- char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- ks_dht_search_pending_t *pending = NULL;
-
- ks_log(KS_LOG_DEBUG,
- "Set closer node id %s (%s) in search of target id %s at results index %d\n",
- ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
- results_index);
- search->results[results_index] = nid;
- search->distances[results_index] = distance;
-
- if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
- ks_dht_search_pending_destroy(&pending);
- goto done;
- }
- if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
- }
- }
}
while (nodes6_len < nodes6_size) {
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
- ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+ ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
-
- // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
- if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
- ks_dht_nodeid_t distance;
- int32_t results_index = -1;
-
- ks_dht_utility_nodeid_xor(&distance, &nid, &search->target);
- if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
- results_index = search->results_length;
- search->results_length++;
- } else {
- for (int32_t index = 0; index < search->results_length; ++index) {
- // Check if new node is closer than this previous result
- if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
- // If this is the first node that is further then keep it
- // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
- if (results_index < 0) results_index = index;
- else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
- }
- }
- }
-
- if (results_index >= 0) {
- char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- ks_dht_search_pending_t *pending = NULL;
-
- ks_log(KS_LOG_DEBUG,
- "Set closer node id %s (%s) in search of target id %s at results index %d\n",
- ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
- results_index);
- search->results[results_index] = nid;
- search->distances[results_index] = distance;
-
- if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
- ks_dht_search_pending_destroy(&pending);
- goto done;
- }
- if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
- }
- }
}
//ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
done:
- if(search) ks_mutex_unlock(search->mutex);
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
+ ks_dht_search_t *search,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
ks_dht_nodeid_t *target,
ks_assert(target);
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
- ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
+ ks_dht_job_build_get(job, search, ks_dht_query_get, callback, target, salt, salt_length);
ks_dht_jobs_add(dht, job);
done:
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
- ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+ ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
job->response_nodes[job->response_nodes_count++] = node;
}
while (nodes6_len < nodes6_size) {
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
- ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node);
+ ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
}
return KS_STATUS_SUCCESS;
}
+ks_status_t dht2_search_findnode_callback(ks_dht_t *dht, ks_dht_search_t *search)
+{
+ diag("dht2_search_findnode_callback %d\n", search->results_length);
+ return KS_STATUS_SUCCESS;
+}
+
int main() {
//ks_size_t buflen;
ks_status_t err;
ks_sockaddr_t raddr1;
//ks_sockaddr_t raddr2;
//ks_sockaddr_t raddr3;
- ks_dht_nodeid_t target;
+ //ks_dht_nodeid_t target;
//ks_dht_storageitem_t *immutable = NULL;
//ks_dht_storageitem_t *mutable = NULL;
//const char *v = "Hello World!";
ok(err == KS_STATUS_SUCCESS);
}
- /*
diag("Ping test\n");
ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) {
- //diag("DHT 1\n");
ks_dht_pulse(dht1, 100);
- //diag("DHT 2\n");
ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
}
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
- */
+
+
+ ks_dht_ping(dht3, &raddr1, NULL); // (QUERYING)
+
+ ks_dht_pulse(dht3, 100); // Send queued ping from dht3 to dht1 (RESPONDING)
+ ks_dht_pulse(dht1, 100); // Receive and process ping query from dht3, queue and send ping response
+
+ ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+ ks_dht_pulse(dht3, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING)
+
+ ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
+
+ ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
+
+ diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ for (int i = 0; i < 20; ++i) {
+ ks_dht_pulse(dht1, 100);
+ ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
+ }
+ ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+
//diag("Get test\n");
ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
*/
+ /*
diag("Put test\n");
crypto_sign_keypair(pk.key, sk.key);
ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
- ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
+ ks_dht_get(dht2, NULL, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
ks_dht_pulse(dht2, 100); // send get query
ks_dht_pulse(dht2, 100); // receive put response
- ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING)
+ ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
for (int i = 0; i < 10; ++i) {
- //diag("DHT 1\n");
ks_dht_pulse(dht1, 100);
- //diag("DHT 2\n");
ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
}
+ */
// Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
- //diag("Find_Node test\n");
+ /*
+ diag("Find_Node test\n");
- //ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
+ ks_dht_findnode(dht3, NULL, &raddr1, NULL, &ep2->nodeid);
- //ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+ ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
- //ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+ ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
- //ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+ ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
- //ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+ ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+
+ ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
- //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+ ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
- //diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
- //for (int i = 0; i < 10; ++i) {
- //diag("DHT 1\n");
- //ks_dht_pulse(dht1, 100);
- //diag("DHT 2\n");
- //ks_dht_pulse(dht2, 100);
- //}
- //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+ diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ for (int i = 0; i < 10; ++i) {
+ ks_dht_pulse(dht1, 100);
+ ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
+ }
+ ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+ */
+ diag("Search test\n");
+ ks_dht_search_findnode(dht3, AF_INET, &ep2->nodeid, dht2_search_findnode_callback, NULL);
+ diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ for (int i = 0; i < 30; ++i) {
+ ks_dht_pulse(dht1, 100);
+ ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
+ }
/* Cleanup and shutdown */
diag("Cleanup\n");