d->rt_ipv6 = NULL;
/**
- * Create the mutex to handle searches list.
+ * Default tokens expirations to not be checked for one pulse.
*/
- ks_mutex_create(&d->searches_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
- ks_assert(d->searches_mutex);
-
+ d->tokens_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC);
+
/**
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_current = d->token_secret_previous = rand();
d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
+ /**
+ * Default storageitems expirations to not be checked for one pulse.
+ */
+ d->storageitems_pulse = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEMS_PULSE * KS_USEC_PER_SEC);
+
/**
* Create the hash to store arbitrary data for BEP44.
*/
* Cleanup the storageitems hash and it's contents if it is allocated.
*/
if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
+ d->storageitems_pulse = 0;
/**
* Zero out the opaque write token variables.
d->token_secret_current = 0;
d->token_secret_previous = 0;
d->token_secret_expiration = 0;
-
- /**
- * Cleanup the search mutex and searches if they are allocated.
- */
- if (d->searches_mutex) ks_mutex_destroy(&d->searches_mutex);
- for (ks_dht_search_t *search = d->searches_first, *searchn = NULL; search; search = searchn) {
- searchn = search->next;
- ks_dht_search_destroy(&search);
- }
+ d->tokens_pulse = 0;
/**
* Cleanup the route tables if they are allocated.
d->transactionid_next = 0;
if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
+ d->transactions_pulse = 0;
/**
* Cleanup the jobs mutex and jobs if they are allocated.
/**
* Probably don't need this
*/
- d->transactions_pulse = 0;
-
d->endpoints_length = 0;
d->endpoints_size = 0;
if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
- ks_dht_pulse_searches(dht);
-
- // @todo pulse_storageitems for keepalive and expiration
- // hold keepalive counter on items to determine what to reannounce vs expire
+ ks_dht_pulse_storageitems(dht);
ks_dht_pulse_jobs(dht);
ks_dht_pulse_tokens(dht);
}
-KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht)
+KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht)
{
- ks_dht_search_t *searches_first = NULL;
- ks_dht_search_t *searches_last = NULL;
-
+ ks_hash_iterator_t *it = NULL;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_time_t now = ks_time_now();
+
ks_assert(dht);
- ks_mutex_lock(dht->searches_mutex);
- for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
- ks_bool_t done = KS_FALSE;
- searchn = search->next;
+ if (dht->storageitems_pulse > now) return;
+ dht->storageitems_pulse = now + ((ks_time_t)KS_DHT_STORAGEITEMS_PULSE * KS_USEC_PER_SEC);
- ks_mutex_lock(search->mutex);
- done = ks_hash_count(search->searching) == 0;
-
- if (done) {
- if (!searchp && !searchn) dht->searches_first = dht->searches_last = NULL;
- else if (!searchp) dht->searches_first = searchn;
- else if (!searchn) {
- dht->searches_last = searchp;
- dht->searches_last->next = NULL;
+ ks_hash_write_lock(dht->storageitems_hash);
+ for (it = ks_hash_first(dht->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *key = NULL;
+ ks_dht_storageitem_t *value = NULL;
+ ks_bool_t remove = KS_FALSE;
+
+ ks_hash_this(it, &key, NULL, (void **)&value);
+
+ ks_mutex_lock(value->mutex);
+
+ if (value->keepalive <= now) {
+ value->keepalive = now + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC);
+ if (value->refc > 0) {
+ value->expiration = now + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+ ks_log(KS_LOG_DEBUG, "Item keepalive %s\n", ks_dht_hex(value->id.id, id_buf, KS_DHT_NODEID_SIZE));
+ if (dht->rt_ipv4) ks_dht_distribute(dht, NULL, NULL, dht->rt_ipv4, 0, value);
+ if (dht->rt_ipv6) ks_dht_distribute(dht, NULL, NULL, dht->rt_ipv6, 0, value);
}
- else searchp->next = searchn;
+ }
+
+ remove = value->refc == 0 && value->expiration <= now;
+ if (remove) ks_hash_remove(dht->storageitems_hash, (void *)key);
- search->next = NULL;
- if (searches_last) searches_last = searches_last->next = search;
- else searches_first = searches_last = search;
- } else searchp = search;
- ks_mutex_unlock(search->mutex);
- }
- ks_mutex_unlock(dht->searches_mutex);
+ ks_mutex_unlock(value->mutex);
- for (ks_dht_search_t *search = searches_first, *searchn = NULL; search; search = searchn) {
- searchn = search->next;
- if (search->callback) search->callback(dht, search);
- ks_dht_search_destroy(&search);
+ if (remove) {
+ ks_log(KS_LOG_DEBUG, "Item expired %s\n", ks_dht_hex(value->id.id, id_buf, KS_DHT_NODEID_SIZE));
+ ks_dht_storageitem_destroy(&value);
+ }
}
+ ks_hash_write_unlock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_assert(dht);
+ ks_assert(job);
+ ks_mutex_lock(dht->jobs_mutex);
+ if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
+ else dht->jobs_first = dht->jobs_last = job;
+ ks_mutex_unlock(dht->jobs_mutex);
}
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
ks_mutex_lock(dht->jobs_mutex);
for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
- ks_bool_t done = KS_FALSE;
jobn = job->next;
if (job->state == KS_DHT_JOB_STATE_QUERYING) {
job->state = KS_DHT_JOB_STATE_RESPONDING;
- if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
+ if (job->query_callback(dht, job) != KS_STATUS_SUCCESS) {
+ job->result = KS_DHT_JOB_RESULT_FAILURE;
+ job->state = KS_DHT_JOB_STATE_COMPLETING;
+ }
}
if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
job->attempts--;
if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
- else done = KS_TRUE;
+ else {
+ job->result = KS_DHT_JOB_RESULT_EXPIRED;
+ job->state = KS_DHT_JOB_STATE_COMPLETING;
+ }
}
- if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
-
- if (done) {
+ if (job->state == KS_DHT_JOB_STATE_COMPLETING) {
if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
else if (!jobp) dht->jobs_first = jobn;
else if (!jobn) {
transactionid = dht->transactionid_next++;
ks_mutex_unlock(dht->transactionid_mutex);
- if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback);
+ ks_assert(trans);
+
+ ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE);
+ ks_assert(msg);
// if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
transactionid = htonl(transactionid);
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) {
- ks_dht_error(dht,
- ep,
- raddr,
- transactionid,
- transactionid_length,
- 202,
- "Internal message create error");
- goto done;
- }
+ ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE);
+ ks_assert(msg);
ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
// @todo blacklist check for bad actor nodes
- if (ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE);
+ ks_assert(message);
if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done;
message->raddr.port,
KS_DHTRT_CREATE_TOUCH,
&node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
transaction->job->raddr.host,
transaction->job->raddr.port);
} else {
+ ks_dhtrt_sharelock_node(node);
transaction->job->response = message;
- transaction->job->response_id = message->args_id;
+ transaction->job->response_id = node;
message->transaction = transaction;
- if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
- else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
+ if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->result = KS_DHT_JOB_RESULT_FAILURE;
+ transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
transaction->job->response = NULL; // message is destroyed after we return, stop using it
transaction->finished = KS_TRUE;
}
done:
+ if (node) ks_dhtrt_release_node(node);
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
+ ks_dht_search_t *search = NULL;
ks_dht_node_t **nodes = NULL;
ks_size_t nodes_count = 0;
- ks_dht_node_t *node = NULL;
+ ks_dht_nodeid_t distance;
+ int32_t results_index = -1;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(job);
- ks_assert(job->search);
+ ks_assert(job->data);
- ks_mutex_lock(job->search->mutex);
- ks_hash_remove(job->search->searching, job->response_id.id);
+ search = (ks_dht_search_t *)job->data;
+
+ ks_mutex_lock(search->mutex);
+ search->searching--;
+
+ if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done;
+ ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target);
+ if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
+ results_index = search->results_length;
+ search->results_length++;
+ } else {
+ for (int32_t index = 0; index < search->results_length; ++index) {
+ // Check if responding node is closer than the current result
+ if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
+ // Only existing results which are further from the target than the responding node are considered for replacement
+
+ // If this is the first node that is further then keep it and keep looking for existing results which are further than this result
+ // If additional results are further, and the current result is further than a previous result, use the current result as furthest to replace
+ if (results_index < 0) results_index = index;
+ else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
+ }
+ }
+ }
+
+ if (results_index >= 0) {
+ // The results are either not full yet, or this responding node is closer than the furthest existing result
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+
+ ks_log(KS_LOG_DEBUG,
+ "Set closer node id %s (%s) in search of target id %s at results index %d\n",
+ ks_dht_hex(job->response_id->nodeid.id, id_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
+ results_index);
+
+ if (search->results[results_index]) ks_dhtrt_release_node(search->results[results_index]);
+ ks_dhtrt_sharelock_node(job->response_id);
+
+ search->results[results_index] = job->response_id;
+ search->distances[results_index] = distance;
+ }
+
nodes = job->raddr.family == AF_INET ? job->response_nodes : job->response_nodes6;
nodes_count = job->raddr.family == AF_INET ? job->response_nodes_count : job->response_nodes6_count;
for (int32_t i = 0; i < nodes_count; ++i) {
- ks_dht_nodeid_t distance;
- int32_t results_index = -1;
-
- node = nodes[i];
+ ks_bool_t closer = KS_FALSE;
+ ks_dht_node_t *node = nodes[i];
- if (ks_hash_search(job->search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
+ // skip duplicates already searched
+ if (ks_hash_search(search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
- ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &job->search->target);
- if (job->search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
- results_index = job->search->results_length;
- job->search->results_length++;
- } else {
- for (int32_t index = 0; index < job->search->results_length; ++index) {
- // Check if new node is closer than this previous result
- if (memcmp(distance.id, job->search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
- // If this is the first node that is further then keep it
- // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
- if (results_index < 0) results_index = index;
- else if (memcmp(job->search->distances[index].id, job->search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
- }
- }
- }
+ // calculate distance of new node from target
+ ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &search->target);
- if (results_index >= 0) {
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ // if the results are not full, or the new node is closer than any result then the new node should be checked
+ if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) closer = KS_TRUE;
+ for (int32_t index = 0; !closer && index < search->results_length; ++index) {
+ // Check if new node is closer than this current result
+ closer = memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0;
+ }
- ks_log(KS_LOG_DEBUG,
- "Set closer node id %s (%s) in search of target id %s at results index %d\n",
- ks_dht_hex(node->nodeid.id, id_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
- ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
- results_index);
-
- if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]);
- job->search->results[results_index] = node;
- job->search->distances[results_index] = distance;
-
- ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
- ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
+ if (closer) {
+ // track new node as searched and searching then send off a findnode query to validate it as a result and end up back here for new closer nodes
+ ks_hash_insert(search->searched, node->nodeid.id, (void *)KS_TRUE);
+ search->searching++;
- ks_dhtrt_sharelock_node(node);
-
- if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
}
}
-
+
done:
- ks_mutex_unlock(job->search->mutex);
+ ks_mutex_unlock(search->mutex);
+
+ if (search->searching == 0) {
+ if (search->callback) search->callback(dht, job);
+ ks_dht_search_destroy(&search);
+ }
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
- int32_t family,
- ks_dht_nodeid_t *target,
- ks_dht_search_callback_t callback,
- ks_dht_search_t **search)
+KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_bool_t locked_searches = KS_FALSE;
ks_bool_t locked_search = KS_FALSE;
- ks_dhtrt_routetable_t *rt = NULL;
- ks_dht_search_t *s = NULL;
+ ks_dht_search_t *search = NULL;
ks_dhtrt_querynodes_t query;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(family == AF_INET || family == AF_INET6);
- ks_assert(target);
-
- if (search) *search = NULL;
-
- if (family == AF_INET) {
- if (!dht->rt_ipv4) {
- ret = KS_STATUS_FAIL;
- goto done;
- }
- rt = dht->rt_ipv4;
- } else {
- if (!dht->rt_ipv6) {
- ret = KS_STATUS_FAIL;
- goto done;
- }
- rt = dht->rt_ipv6;
- }
-
- ks_mutex_lock(dht->searches_mutex);
- locked_searches = KS_TRUE;
+ ks_assert(job);
+ ks_assert(job->data);
- if ((ret = ks_dht_search_create(&s, dht->pool, target, callback)) != KS_STATUS_SUCCESS) goto done;
+ search = (ks_dht_search_t *)job->data;
- if (dht->searches_last) dht->searches_last = dht->searches_last->next = s;
- else dht->searches_first = dht->searches_last = s;
-
- ks_mutex_lock(s->mutex);
+ ks_mutex_lock(search->mutex);
locked_search = KS_TRUE;
- // release searches lock now, but search is still locked
- ks_mutex_unlock(dht->searches_mutex);
- locked_searches = KS_FALSE;
-
// find closest good nodes to target locally and store as the closest results
- query.nodeid = *target;
+ query.nodeid = search->target;
query.type = KS_DHT_REMOTE;
query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE;
- query.family = family;
+ query.family = search->table == dht->rt_ipv4 ? AF_INET : AF_INET6;
query.count = 0;
- ks_dhtrt_findclosest_nodes(rt, &query);
+ ks_dhtrt_findclosest_nodes(search->table, &query);
for (int32_t i = 0; i < query.count; ++i) {
- ks_dht_node_t *n = query.nodes[i];
- ks_bool_t searched = KS_FALSE;
+ ks_dht_node_t *node = query.nodes[i];
- // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet
- s->results[s->results_length] = n;
- ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target);
- s->results_length++;
+ // skip duplicates already searched, this really shouldn't happen on a new search but we sanity check
+ if (ks_hash_search(search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
+
+ ks_hash_insert(search->searched, node->nodeid.id, (void *)KS_TRUE);
+ search->searching++;
- searched = ks_hash_search(s->searched, n->nodeid.id, KS_UNLOCKED) != 0;
- if (searched) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
+ ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
+ }
+ ks_dhtrt_release_querynodes(&query);
- ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
- ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
+ // done:
+ if (locked_search) ks_mutex_unlock(search->mutex);
- ks_dhtrt_sharelock_node(n);
-
- if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) {
- ks_dhtrt_release_querynodes(&query);
- goto done;
- }
+ if (search->searching == 0) {
+ if (search->callback) search->callback(dht, job);
+ ks_dht_search_destroy(&search);
}
- ks_dhtrt_release_querynodes(&query);
- ks_mutex_unlock(s->mutex);
- locked_search = KS_FALSE;
+
+ return ret;
+}
+
+KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dhtrt_routetable_t *table,
+ ks_dht_nodeid_t *target)
+{
+ ks_dht_search_t *search = NULL;
+ ks_dht_job_t *job = NULL;
+
+ ks_assert(dht);
+ ks_assert(table);
+ ks_assert(target);
+
+ ks_dht_search_create(&search, dht->pool, table, target, callback, data);
+ ks_assert(search);
+
+ ks_dht_job_create(&job, dht->pool, NULL, 3, search);
+ ks_assert(job);
+
+ ks_dht_job_build_search(job, ks_dht_query_search, NULL);
+ ks_dht_jobs_add(dht, job);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_dht_publish_t *publish = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(job);
+ ks_assert(job->data);
+
+ publish = (ks_dht_publish_t *)job->data;
- if (search) *search = s;
+ // @todo callbacks need job to contain cascaded publish->data before calling
+ if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
+ job->data = publish->data;
+ if (publish->callback) publish->callback(dht, job);
+ goto done;
+ }
+
+ if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) {
+ ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item);
+ } else if (publish->callback) {
+ job->data = publish->data;
+ publish->callback(dht, job);
+ }
done:
- if (locked_searches) ks_mutex_unlock(dht->searches_mutex);
- if (locked_search) ks_mutex_unlock(s->mutex);
- if (ret != KS_STATUS_SUCCESS) {
- //if (s) ks_dht_search_destroy(&s);
- *search = NULL;
+
+ ks_dht_publish_destroy(&publish);
+ return ret;
+}
+
+KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ int64_t cas,
+ ks_dht_storageitem_t *item)
+{
+ ks_dht_publish_t *publish = NULL;
+ const uint8_t *salt = NULL;
+ size_t salt_length = 0;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(cas >= 0);
+ ks_assert(item);
+
+ if (item->salt) {
+ salt = (const uint8_t *)ben_str_val(item->salt);
+ salt_length = ben_str_len(item->salt);
+ }
+
+ ks_dht_publish_create(&publish, dht->pool, callback, data, cas, item);
+ ks_assert(publish);
+
+ ks_dht_get(dht, raddr, ks_dht_publish_get_callback, publish, &item->id, salt, salt_length);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_distribute_publish_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_dht_distribute_t *distribute = NULL;
+ ks_bool_t finished = KS_FALSE;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(job);
+ ks_assert(job->data);
+
+ distribute = (ks_dht_distribute_t *)job->data;
+ ks_mutex_lock(distribute->mutex);
+ distribute->publishing--;
+ finished = distribute->publishing == 0;
+ ks_mutex_unlock(distribute->mutex);
+
+ if (finished) {
+ if (distribute->callback) distribute->callback(dht, distribute->item);
+ ks_dht_distribute_destroy(&distribute);
}
+
return ret;
}
+KS_DECLARE(ks_status_t) ks_dht_distribute_search_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_dht_search_t *search = NULL;
+ ks_dht_distribute_t *distribute = NULL;
+ ks_bool_t finished = KS_FALSE;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(job);
+ ks_assert(job->data);
+
+ search = (ks_dht_search_t *)job->data;
+ ks_assert(search->data);
+
+ distribute = (ks_dht_distribute_t *)search->data;
+
+ ks_mutex_lock(distribute->mutex);
+ for (int32_t index = 0; index < search->results_length; ++index) {
+ ks_dht_node_t *node = search->results[index];
+ if (node->type == KS_DHT_LOCAL) continue;
+
+ distribute->publishing++;
+ ks_dht_publish(dht, &node->addr, ks_dht_distribute_publish_callback, distribute, distribute->cas, distribute->item);
+ }
+ finished = distribute->publishing == 0;
+ ks_mutex_unlock(distribute->mutex);
+
+ if (finished) {
+ if (distribute->callback) distribute->callback(dht, distribute->item);
+ ks_dht_distribute_destroy(&distribute);
+ }
+
+ ks_dht_search_destroy(&search);
+
+ return ret;
+}
+
+KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
+ ks_dht_storageitem_callback_t callback,
+ void *data,
+ ks_dhtrt_routetable_t *table,
+ int64_t cas,
+ ks_dht_storageitem_t *item)
+{
+ ks_dht_distribute_t *distribute = NULL;
+
+ ks_assert(dht);
+ ks_assert(table);
+ ks_assert(cas >= 0);
+ ks_assert(item);
+
+ ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item);
+ ks_assert(distribute);
+
+ ks_dht_search(dht, ks_dht_distribute_search_callback, distribute, table, &item->id);
+}
+
KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht)
{
ks_assert(dht);
KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target)
{
+ ks_dht_storageitem_t *item = NULL;
+
ks_assert(dht);
ks_assert(target);
- return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+ item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+ if (item) ks_dht_storageitem_reference(item);
+
+ return item;
}
KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item)
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE);
+ ks_assert(error);
- //if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(error->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(error->data, ben_blob("y", 1), ben_blob("e", 1));
- // @note e joins error->data and will be freed with it
e = ben_list();
ks_assert(e);
ben_dict_set(error->data, ben_blob("e", 1), e);
ret = KS_STATUS_FAIL;
goto done;
}
-
+ transaction->job->result = KS_DHT_JOB_RESULT_ERROR;
+ transaction->job->error_code = errorcode;
+ transaction->job->error_description = ben_clone(es);
+ transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
transaction->finished = KS_TRUE;
ks_hash_read_lock(dht->registry_error);
return ret;
}
-KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
-{
- ks_assert(dht);
- ks_assert(job);
- ks_mutex_lock(dht->jobs_mutex);
- if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
- else dht->jobs_first = dht->jobs_last = job;
- ks_mutex_unlock(dht->jobs_mutex);
-}
-
-KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
+KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data)
{
ks_dht_job_t *job = NULL;
- ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
//ks_log(KS_LOG_DEBUG, "Starting ping!\n");
- if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+ ks_assert(job);
+
ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
ks_dht_jobs_add(dht, job);
-
- // next step in ks_dht_pulse_jobs with QUERYING state
-
- done:
- return ret;
}
KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
}
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
- ks_dht_search_t *search,
- const ks_sockaddr_t *raddr,
- ks_dht_job_callback_t callback,
- ks_dht_nodeid_t *target)
+KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dht_nodeid_t *target)
{
ks_dht_job_t *job = NULL;
- ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(target);
- if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
- ks_dht_job_build_findnode(job, search, ks_dht_query_findnode, callback, target);
+ ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+ ks_assert(job);
+
+ ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target);
ks_dht_jobs_add(dht, job);
-
- // next step in ks_dht_pulse_jobs with QUERYING state
-
- done:
- return ret;
}
KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
}
-KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
- ks_dht_search_t *search,
- const ks_sockaddr_t *raddr,
- ks_dht_job_callback_t callback,
- ks_dht_nodeid_t *target,
- uint8_t *salt,
- ks_size_t salt_length)
+KS_DECLARE(void) ks_dht_get(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dht_nodeid_t *target,
+ const uint8_t *salt,
+ ks_size_t salt_length)
{
ks_dht_job_t *job = NULL;
- ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(target);
- if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
- ks_dht_job_build_get(job, search, ks_dht_query_get, callback, target, salt, salt_length);
+ ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+ ks_assert(job);
+
+ ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
ks_dht_jobs_add(dht, job);
-
- done:
- return ret;
}
KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_hash_read_lock(dht->storageitems_hash);
- item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+ item = ks_dht_storageitems_find(dht, &job->query_target);
+ if (item) ks_mutex_lock(item->mutex);
ks_hash_read_unlock(dht->storageitems_hash);
if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
//ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+ if (item) {
+ ks_dht_storageitem_dereference(item);
+ ks_mutex_unlock(item->mutex);
+ }
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
ks_hash_read_lock(dht->storageitems_hash);
- item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+ item = ks_dht_storageitems_find(dht, target);
+ if (item) {
+ ks_mutex_lock(item->mutex);
+ item->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+ }
ks_hash_read_unlock(dht->storageitems_hash);
+ // If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back
sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
- // @todo if sequence is provided then requester has the data, so if the local sequence is lower maybe send a get to the requester to update local data?
+ // @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower
+ // maybe send a get query to the requester to update the local data
query.nodeid = *target;
query.type = KS_DHT_REMOTE;
ks_q_push(dht->send_q, (void *)response);
done:
+ if (item) {
+ ks_dht_storageitem_dereference(item);
+ ks_mutex_unlock(item->mutex);
+ }
return ret;
}
if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
seq = ben_dict_get_by_str(job->response->args, "seq");
- if (seq) sequence = ben_int_val(seq);
+ if (seq) {
+ sequence = ben_int_val(seq);
+ job->response_hasitem = KS_TRUE;
+ job->response_seq = sequence;
+ }
if (seq && ((k && !sig) || (!k && sig))) {
ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data");
}
v = ben_dict_get_by_str(job->response->args, "v");
+ if (v) job->response_hasitem = KS_TRUE;
//if (v) v_len = ben_str_len(v);
n = ben_dict_get_by_str(job->response->args, "nodes");
ks_hash_write_lock(dht->storageitems_hash);
storageitems_locked = KS_TRUE;
- olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+ olditem = ks_dht_storageitems_find(dht, &job->query_target);
+ if (olditem) ks_mutex_lock(olditem->mutex);
if (v) {
ks_dht_nodeid_t tmptarget;
goto done;
}
if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
- else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
- dht->pool,
- &tmptarget,
- v,
- KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ else {
+ ks_dht_storageitem_create_immutable_internal(&item, dht->pool, &tmptarget, v, KS_TRUE);
+ ks_assert(item);
+ }
} else {
// mutable
if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (ben_cmp(olditem->v, v) != 0) {
goto done;
}
- } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+ } else {
+ ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+ if (olditem->callback) olditem->callback(dht, olditem);
+ }
olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
}
- else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
- dht->pool,
- &tmptarget,
- v,
- KS_TRUE,
- k,
- job->query_salt,
- KS_TRUE,
- sequence,
- sig)) != KS_STATUS_SUCCESS) goto done;
+ else {
+ ks_dht_storageitem_create_mutable_internal(&item, dht->pool, &tmptarget, v, KS_TRUE, k, job->query_salt, KS_TRUE, sequence, sig);
+ ks_assert(item);
+ }
}
if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
} else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
if (item) job->response_storageitem = item;
else if (olditem) job->response_storageitem = olditem;
+ if (job->response_storageitem) ks_dht_storageitem_reference(job->response_storageitem);
+
done:
+ if (olditem) {
+ ks_dht_storageitem_dereference(olditem);
+ ks_mutex_unlock(olditem->mutex);
+ }
+ if (item) ks_dht_storageitem_dereference(item);
if (ret != KS_STATUS_SUCCESS) {
}
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
return ret;
}
-// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
-KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
- const ks_sockaddr_t *raddr,
- ks_dht_job_callback_t callback,
- ks_dht_token_t *token,
- int64_t cas,
- ks_dht_storageitem_t *item)
+KS_DECLARE(void) ks_dht_put(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dht_token_t *token,
+ int64_t cas,
+ ks_dht_storageitem_t *item)
{
ks_dht_job_t *job = NULL;
- ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(token);
ks_assert(item);
- if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+ ks_assert(job);
+
ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item);
ks_dht_jobs_add(dht, job);
-
- done:
- return ret;
}
KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
ks_hash_write_lock(dht->storageitems_hash);
storageitems_locked = KS_TRUE;
- olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
+ olditem = ks_dht_storageitems_find(dht, &target);
+ if (olditem) ks_mutex_lock(olditem->mutex);
if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
ks_log(KS_LOG_DEBUG, "Invalid token\n");
if (!seq) {
// immutable
if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
- else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
- dht->pool,
- &target,
- v,
- KS_TRUE)) != KS_STATUS_SUCCESS) {
- ks_dht_error(dht,
- message->endpoint,
- &message->raddr,
- message->transactionid,
- message->transactionid_length,
- 202,
- "Internal storage item create immutable error");
- goto done;
+ else {
+ ks_dht_storageitem_create_immutable_internal(&item, dht->pool, &target, v, KS_TRUE);
+ ks_assert(item);
}
} else {
// mutable
"Message query put sequence is equal to current but values are different");
goto done;
}
- } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+ } else {
+ ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+ if (olditem->callback) olditem->callback(dht, olditem);
+ }
olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
}
- else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
- dht->pool,
- &target,
- v,
- KS_TRUE,
- k,
- salt,
- KS_TRUE,
- sequence,
- sig)) != KS_STATUS_SUCCESS) {
- ks_dht_error(dht,
- message->endpoint,
- &message->raddr,
- message->transactionid,
- message->transactionid_length,
- 202,
- "Internal storage item create mutable error");
- goto done;
+ else {
+ ks_dht_storageitem_create_mutable_internal(&item, dht->pool, &target, v, KS_TRUE, k, salt, KS_TRUE, sequence, sig);
+ ks_assert(item);
}
}
if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
//ks_log(KS_LOG_DEBUG, "Sending message response put\n");
ks_q_push(dht->send_q, (void *)response);
+
+ //if (dht->rt_ipv4) ks_dht_distribute(dht, AF_INET, NULL, NULL, 0, olditem ? olditem : item);
+ //if (dht->rt_ipv6) ks_dht_distribute(dht, AF_INET6, NULL, NULL, 0, olditem ? olditem : item);
done:
+ if (olditem) {
+ ks_dht_storageitem_dereference(olditem);
+ ks_mutex_unlock(olditem->mutex);
+ }
+ if (item) ks_dht_storageitem_dereference(item);
if (ret != KS_STATUS_SUCCESS) {
if (item) ks_hash_remove(dht->storageitems_hash, item->id.id);
}
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job)
-{
- return ks_dht_search_findnode(dht,
- job->query_family,
- &job->query_target,
- NULL,
- NULL);
-}
-
-KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
- ks_dhtrt_routetable_t *rt,
- ks_dht_nodeid_t *target,
- ks_dht_job_callback_t callback)
-{
- ks_dht_job_t *job = NULL;
- ks_status_t ret = KS_STATUS_SUCCESS;
-
- ks_assert(dht);
- ks_assert(rt);
- ks_assert(target);
-
- ks_sockaddr_t taddr; /* just to satisfy the api */
-
- if ((ret = ks_dht_job_create(&job, dht->pool, &taddr, 3)) == KS_STATUS_SUCCESS) {
-
- int32_t family = AF_INET;
-
- if (rt == dht->rt_ipv6) {
- family = AF_INET6;
- }
-
- ks_dht_job_build_search_findnode(job, target, family, ks_dht_exec_search_findnode, callback);
- }
-
- return ret;
-}
-
/* For Emacs:
* Local Variables:
* mode:c
#define KS_DHT_TRANSACTION_EXPIRATION 10
#define KS_DHT_TRANSACTIONS_PULSE 1
-#define KS_DHT_SEARCH_EXPIRATION 10
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
#define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
#define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
#define KS_DHT_STORAGEITEM_EXPIRATION 7200
+#define KS_DHT_STORAGEITEM_KEEPALIVE 300
+#define KS_DHT_STORAGEITEMS_PULSE 10
#define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
#define KS_DHT_TOKEN_EXPIRATION 300
typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
typedef struct ks_dht_transaction_s ks_dht_transaction_t;
typedef struct ks_dht_search_s ks_dht_search_t;
-typedef struct ks_dht_search_pending_s ks_dht_search_pending_t;
+typedef struct ks_dht_publish_s ks_dht_publish_t;
+typedef struct ks_dht_distribute_s ks_dht_distribute_t;
typedef struct ks_dht_node_s ks_dht_node_t;
typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t;
typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job);
typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
-typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
+//typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
+typedef ks_status_t (*ks_dht_storageitem_callback_t)(ks_dht_t *dht, ks_dht_storageitem_t *item);
+
struct ks_dht_datagram_s {
ks_pool_t *pool;
KS_DHT_JOB_STATE_COMPLETING,
};
-//enum ks_dht_job_type_t {
-// KS_DHT_JOB_TYPE_NONE = 0,
-// KS_DHT_JOB_TYPE_PING,
-// KS_DHT_JOB_TYPE_FINDNODE,
-//};
+enum ks_dht_job_result_t {
+ KS_DHT_JOB_RESULT_SUCCESS = 0,
+ KS_DHT_JOB_RESULT_EXPIRED,
+ KS_DHT_JOB_RESULT_ERROR,
+ KS_DHT_JOB_RESULT_FAILURE,
+};
struct ks_dht_job_s {
ks_pool_t *pool;
ks_dht_job_t *next;
enum ks_dht_job_state_t state;
-
- ks_dht_search_t *search;
+ enum ks_dht_job_result_t result;
ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
int32_t attempts;
ks_dht_job_callback_t query_callback;
ks_dht_job_callback_t finish_callback;
+ void *data;
ks_dht_message_t *response;
// job specific query parameters
int64_t query_cas;
ks_dht_token_t query_token;
ks_dht_storageitem_t *query_storageitem;
- uint32_t query_family;
+ int32_t query_family;
+ // error response parameters
+ int64_t error_code;
+ struct bencode *error_description;
+
// job specific response parameters
- ks_dht_nodeid_t response_id;
+ ks_dht_node_t *response_id;
ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes_count;
ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes6_count;
+
ks_dht_token_t response_token;
+ int64_t response_seq;
+ ks_bool_t response_hasitem;
ks_dht_storageitem_t *response_storageitem;
};
struct ks_dht_search_s {
ks_pool_t *pool;
- ks_dht_search_t *next;
+ ks_dhtrt_routetable_t *table;
ks_dht_nodeid_t target;
- ks_dht_search_callback_t callback;
+ ks_dht_job_callback_t callback;
+ void *data;
ks_mutex_t *mutex;
ks_hash_t *searched;
- ks_hash_t *searching;
+ int32_t searching;
ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
ks_size_t results_length;
};
+struct ks_dht_publish_s {
+ ks_pool_t *pool;
+ ks_dht_job_callback_t callback;
+ void *data;
+ int64_t cas;
+ ks_dht_storageitem_t *item;
+};
+
+struct ks_dht_distribute_s {
+ ks_pool_t *pool;
+ ks_dht_storageitem_callback_t callback;
+ void *data;
+ ks_mutex_t *mutex;
+ int32_t publishing;
+ int64_t cas;
+ ks_dht_storageitem_t *item;
+};
+
struct ks_dht_storageitem_s {
ks_pool_t *pool;
ks_dht_nodeid_t id;
ks_time_t expiration;
+ ks_time_t keepalive;
struct bencode *v;
-
- ks_bool_t mutable;
+
ks_mutex_t *mutex;
+ volatile int32_t refc;
+ ks_dht_storageitem_callback_t callback;
+
+ ks_bool_t mutable;
ks_dht_storageitem_pkey_t pk;
ks_dht_storageitem_skey_t sk;
struct bencode *salt;
ks_dhtrt_routetable_t *rt_ipv4;
ks_dhtrt_routetable_t *rt_ipv6;
- ks_mutex_t *searches_mutex;
- ks_dht_search_t *searches_first;
- ks_dht_search_t *searches_last;
-
ks_time_t tokens_pulse;
volatile uint32_t token_secret_current;
volatile uint32_t token_secret_previous;
ks_time_t token_secret_expiration;
+ ks_time_t storageitems_pulse;
ks_hash_t *storageitems_hash;
};
int64_t sequence,
const uint8_t *value,
ks_size_t value_length);
-
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback);
+
/**
*
*/
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
+KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data);
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
- ks_dht_search_t *search,
- const ks_sockaddr_t *raddr,
- ks_dht_job_callback_t callback,
- ks_dht_nodeid_t *target);
+KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dht_nodeid_t *target);
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
- ks_dht_search_t *search,
- const ks_sockaddr_t *raddr,
- ks_dht_job_callback_t callback,
- ks_dht_nodeid_t *target,
- uint8_t *salt,
- ks_size_t salt_length);
+KS_DECLARE(void) ks_dht_get(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dht_nodeid_t *target,
+ const uint8_t *salt,
+ ks_size_t salt_length);
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
- const ks_sockaddr_t *raddr,
- ks_dht_job_callback_t callback,
- ks_dht_token_t *token,
- int64_t cas,
- ks_dht_storageitem_t *item);
+KS_DECLARE(void) ks_dht_put(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dht_token_t *token,
+ int64_t cas,
+ ks_dht_storageitem_t *item);
/**
* Create a network search of the closest nodes to a target.
* @param target pointer to the nodeid for the target to be searched
* @param callback an optional callback to add to the search when it is finished
* @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output
- * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
* @see ks_dht_search_create
- * @see ks_dht_search_callback_add
* @see ks_hash_insert
- * @see ks_dht_search_pending_create
- * @see ks_dht_send_findnode
+ * @see ks_dht_findnode
*/
-KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
- int32_t family,
- ks_dht_nodeid_t *target,
- ks_dht_search_callback_t callback,
- ks_dht_search_t **search);
-
-KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
- ks_dhtrt_routetable_t *rt,
- ks_dht_nodeid_t *target,
- ks_dht_job_callback_t callback);
-
-KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job);
-
-
+KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
+ ks_dht_job_callback_t callback,
+ void *data,
+ ks_dhtrt_routetable_t *table,
+ ks_dht_nodeid_t *target);
+
+KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ void *data,
+ int64_t cas,
+ ks_dht_storageitem_t *item);
+
+KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
+ ks_dht_storageitem_callback_t callback,
+ void *data,
+ ks_dhtrt_routetable_t *table,
+ int64_t cas,
+ ks_dht_storageitem_t *item);
/**
* route table methods
ks_dht_storageitem_skey_t sk;
ks_dht_storageitem_pkey_t pk;
+ks_status_t dht2_updated_callback(ks_dht_t *dht, ks_dht_storageitem_t *item)
+{
+ diag("dht2_updated_callback\n");
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t dht2_distribute_callback(ks_dht_t *dht, ks_dht_storageitem_t *item)
+{
+ diag("dht2_distribute_callback\n");
+ return KS_STATUS_SUCCESS;
+}
+
ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
diag("dht2_put_callback\n");
mutable->sk = sk;
ks_dht_storageitems_insert(dht, mutable);
- ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable);
+ ks_dht_put(dht, &job->raddr, dht2_put_callback, NULL, &job->response_token, 0, mutable);
return KS_STATUS_SUCCESS;
}
-ks_status_t dht2_search_findnode_callback(ks_dht_t *dht, ks_dht_search_t *search)
+ks_status_t dht2_search_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
- diag("dht2_search_findnode_callback %d\n", search->results_length);
+ ks_dht_search_t *search = (ks_dht_search_t *)job->data;
+ diag("dht2_search_callback %d\n", search->results_length);
return KS_STATUS_SUCCESS;
}
ks_sockaddr_t raddr1;
//ks_sockaddr_t raddr2;
//ks_sockaddr_t raddr3;
- //ks_dht_nodeid_t target;
+ ks_dht_nodeid_t target;
//ks_dht_storageitem_t *immutable = NULL;
- //ks_dht_storageitem_t *mutable = NULL;
- //const char *v = "Hello World!";
- //size_t v_len = strlen(v);
+ ks_dht_storageitem_t *mutable1 = NULL;
+ ks_dht_storageitem_t *mutable2 = NULL;
+ const char *v = "Hello World!";
+ size_t v_len = strlen(v);
//ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b,
//0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d,
//0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22,
//0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } };
//uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE];
//uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE];
- //ks_dht_storageitem_signature_t sig;
+ ks_dht_storageitem_signature_t sig;
//char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1];
//char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1];
//const char *test1vector = "3:seqi1e1:v12:Hello World!";
diag("Ping test\n");
- ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
+ ks_dht_ping(dht2, &raddr1, NULL, NULL); // (QUERYING)
ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
- ks_dht_ping(dht3, &raddr1, NULL); // (QUERYING)
+ ks_dht_ping(dht3, &raddr1, NULL, NULL); // (QUERYING)
ks_dht_pulse(dht3, 100); // Send queued ping from dht3 to dht1 (RESPONDING)
ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
- for (int i = 0; i < 20; ++i) {
+ for (int i = 0; i < 10; ++i) {
ks_dht_pulse(dht1, 100);
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+ // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
+
+ /*
+ diag("Find_Node test\n");
+
+ ks_dht_findnode(dht3, NULL, &raddr1, NULL, NULL, &ep2->nodeid);
+
+ ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+
+ ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+
+ ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+ ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+
+ ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
+
+ ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+ diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ for (int i = 0; i < 10; ++i) {
+ ks_dht_pulse(dht1, 100);
+ ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
+ }
+ ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+ */
+
+ diag("Search test\n");
+
+ ks_dht_search(dht3, dht2_search_callback, NULL, dht3->rt_ipv4, &ep2->nodeid);
+ diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ for (int i = 0; i < 20; ++i) {
+ ks_dht_pulse(dht1, 100);
+ ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
+ }
+
//diag("Get test\n");
mutable->sk = sk;
ks_dht_storageitems_insert(dht1, mutable);
- ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0);
+ ks_dht_get(dht2, &raddr1, dht2_get_callback, NULL, &target, NULL, 0);
ks_dht_pulse(dht2, 100); // send get query
ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
- ks_dht_get(dht2, NULL, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
+ ks_dht_get(dht2, &raddr1, dht2_get_token_callback, NULL, &target, NULL, 0); // create job
- ks_dht_pulse(dht2, 100); // send get query
-
- ks_dht_pulse(dht1, 100); // receive get query and send get response
-
- ks_dht_pulse(dht2, 100); // receive get response
-
- ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query
-
- ks_dht_pulse(dht1, 100); // receive put query and send put response
-
- ks_dht_pulse(dht2, 100); // receive put response
-
- ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
-
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 20; ++i) {
ks_dht_pulse(dht1, 100);
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
*/
- // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
-
/*
- diag("Find_Node test\n");
-
- ks_dht_findnode(dht3, NULL, &raddr1, NULL, &ep2->nodeid);
-
- ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
-
- ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
-
- ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
-
- ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+ diag("Publish test\n");
- ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
+ crypto_sign_keypair(pk.key, sk.key);
- ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+ ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
- diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
- for (int i = 0; i < 10; ++i) {
+ ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+
+ ks_dht_storageitem_create_mutable(&mutable, dht2->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+ mutable->sk = sk;
+ ks_dht_storageitems_insert(dht2, mutable);
+
+ ks_dht_publish(dht2, &raddr1, dht2_put_callback, NULL, 0, mutable); // create job
+
+ for (int i = 0; i < 20; ++i) {
ks_dht_pulse(dht1, 100);
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
- ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
*/
- diag("Search test\n");
- ks_dht_search_findnode(dht3, AF_INET, &ep2->nodeid, dht2_search_findnode_callback, NULL);
- diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+
+ diag("Distribute test\n");
+
+ crypto_sign_keypair(pk.key, sk.key);
+
+ ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
+
+ ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+
+ ks_dht_storageitem_create_mutable(&mutable2, dht2->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+ mutable2->sk = sk;
+ ks_dht_storageitems_insert(dht2, mutable2);
+
+ ks_dht_distribute(dht2, dht2_distribute_callback, NULL, dht2->rt_ipv4, 0, mutable2); // create job
+
for (int i = 0; i < 30; ++i) {
ks_dht_pulse(dht1, 100);
ks_dht_pulse(dht2, 100);
ks_dht_pulse(dht3, 100);
}
+ ks_dht_storageitem_dereference(mutable2);
+ ok(mutable2->refc == 0);
+
+ mutable1 = ks_dht_storageitems_find(dht1, &target);
+ ok(mutable1 != NULL);
+
+ ks_dht_storageitem_callback(mutable1, dht2_updated_callback);
+ ks_dht_storageitem_callback(mutable2, dht2_updated_callback);
+
+ ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 2, (uint8_t *)v, v_len);
+ mutable1->seq = 2;
+ mutable1->sig = sig;
+
+ //ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 2, (uint8_t *)v, v_len);
+ //mutable2->seq = 2;
+ //mutable2->sig = sig;
+ ks_dht_distribute(dht2, dht2_distribute_callback, NULL, dht2->rt_ipv4, 0, mutable2);
+ for (int i = 0; i < 30; ++i) {
+ ks_dht_pulse(dht1, 100);
+ ks_dht_pulse(dht2, 100);
+ ks_dht_pulse(dht3, 100);
+ }
+ ks_dht_storageitem_dereference(mutable1);
+
/* Cleanup and shutdown */
diag("Cleanup\n");