d = *dht;
+ // @todo ks_dht_shutdown to stop further incoming data and pulse to finish processing and flushing data
+
/**
- * Cleanup the storageitems hash and it's contents if it is allocated.
- */
- if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
- d->storageitems_pulse = 0;
-
- /**
- * Zero out the opaque write token variables.
+ * Cleanup the type, query, and error registries if they have been allocated.
+ * No dependancies during destruction, entries are function pointers with no cleanup required.
*/
- d->token_secret_current = 0;
- d->token_secret_previous = 0;
- d->token_secret_expiration = 0;
- d->tokens_pulse = 0;
-
+ if (d->registry_type) ks_hash_destroy(&d->registry_type);
+ if (d->registry_query) ks_hash_destroy(&d->registry_query);
+ if (d->registry_error) ks_hash_destroy(&d->registry_error);
+
/**
- * Cleanup the route tables if they are allocated.
+ * Cleanup the endpoint management and entries if they have been allocated.
+ * No dependancies during destruction, entries are destroyed through hash destructor.
+ * Sockets are closed during entry destruction, and route table references to local nodes are released.
*/
- if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
- if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
+ if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
+ if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
+ if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
/**
- * Cleanup the transactions mutex and hash if they are allocated.
+ * Cleanup the transaction management and entries if they have been allocated.
+ * No dependancies during destruction, entries are destroyed through hash destructor.
*/
- d->transactionid_next = 0;
if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
- d->transactions_pulse = 0;
-
- /**
- * Cleanup the jobs mutex and jobs if they are allocated.
- */
- for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
- jobn = job->next;
- ks_dht_job_destroy(&job);
- }
- if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
-
- /**
- * Probably don't need this, recv_buffer_length is temporary and may change
- */
- d->recv_buffer_length = 0;
/**
- * Cleanup the send queue and it's contents if it is allocated.
+ * Cleanup the message send queue and entries if they have been allocated.
+ * No dependancies during destruction, entries must be destroyed here.
*/
if (d->send_q) {
ks_dht_message_t *msg;
while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg);
ks_q_destroy(&d->send_q);
}
-
- /**
- * Cleanup the cached popped message if it is set.
- */
if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent);
/**
- * Probably don't need this
- */
- d->endpoints_length = 0;
- d->endpoints_size = 0;
-
- /**
- * Cleanup the array of endpoint pointers if it is allocated.
+ * Cleanup the jobs management and entries if they have been allocated.
+ * Route table node and storage item references are released, entries must be destroyed here.
*/
- if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
-
- /**
- * Cleanup the array of endpoint polling data if it is allocated.
- */
- if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
-
+ for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
+ jobn = job->next;
+ ks_dht_job_destroy(&job);
+ }
+ if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
+
/**
- * Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated.
+ * Cleanup the storageitems hash and it's contents if it is allocated.
+ * No dependancies during destruction, entries are destroyed through hash destructor.
*/
- if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
+ if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
/**
- * Cleanup the type, query, and error registries if they have been allocated.
+ * Cleanup the route tables if they are allocated.
+ * No nodes should be referenced anymore by this point.
*/
- if (d->registry_type) ks_hash_destroy(&d->registry_type);
- if (d->registry_query) ks_hash_destroy(&d->registry_query);
- if (d->registry_error) ks_hash_destroy(&d->registry_error);
+ if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
+ if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
- /**
- * Probably don't need this
- */
- d->autoroute = KS_FALSE;
- d->autoroute_port = 0;
/**
* If the thread pool was allocated internally, destroy it.
- * If this fails, something catastrophically bad happened like memory corruption.
*/
if (d->tpool_alloc) ks_thread_pool_destroy(&d->tpool);
- d->tpool_alloc = KS_FALSE;
+
/**
* Temporarily store the allocator level variables because freeing the dht instance will invalidate it.
*/
/**
* Free the dht instance from the pool, after this the dht instance memory is invalid.
*/
- ks_pool_free(d->pool, &d);
-
- /**
- * At this point dht instance is invalidated so NULL the pointer.
- */
- *dht = d = NULL;
+ ks_pool_free(d->pool, dht);
+ d = NULL;
/**
* If the pool was allocated internally, destroy it using the temporary variables stored earlier.
- * If this fails, something catastrophically bad happened like memory corruption.
*/
if (pool_alloc) ks_pool_close(&pool);
}
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{
- ks_dht_datagram_t *datagram = NULL;
- ks_sockaddr_t raddr;
-
ks_assert(dht);
ks_assert(timeout >= 0 && timeout <= 1000);
// this should be called with a timeout of less than 1000ms, preferrably around 100ms
+ ks_dht_pulse_endpoints(dht, timeout);
+
+ if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
+ if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
+
+ ks_dht_pulse_storageitems(dht);
+
+ ks_dht_pulse_jobs(dht);
+
+ ks_dht_pulse_send(dht);
+
+ ks_dht_pulse_transactions(dht);
+
+ ks_dht_pulse_tokens(dht);
+}
+
+KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout)
+{
+ ks_assert(dht);
+ ks_assert(timeout >= 0 && timeout <= 1000);
+
if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) {
for (int32_t i = 0; i < dht->endpoints_length; ++i) {
+ ks_dht_datagram_t *datagram = NULL;
+ ks_sockaddr_t raddr = (const ks_sockaddr_t){ 0 };
if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
- raddr = (const ks_sockaddr_t){ 0 };
dht->recv_buffer_length = sizeof(dht->recv_buffer);
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue;
if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
}
}
-
- if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
- if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
-
- ks_dht_pulse_storageitems(dht);
-
- ks_dht_pulse_jobs(dht);
-
- ks_dht_pulse_send(dht);
-
- ks_dht_pulse_transactions(dht);
-
- ks_dht_pulse_tokens(dht);
}
KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht)
ks_size_t nodes_count = 0;
ks_dht_nodeid_t distance;
int32_t results_index = -1;
+ ks_bool_t finished = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_mutex_lock(search->mutex);
search->searching--;
- if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done;
+ if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
+ finished = KS_TRUE;
+ goto done;
+ }
ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target);
if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
}
}
+ finished = search->searching == 0;
done:
ks_mutex_unlock(search->mutex);
- if (search->searching == 0) {
+ if (finished) {
if (search->callback) search->callback(dht, job);
ks_dht_search_destroy(&search);
}
KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_bool_t locked_search = KS_FALSE;
ks_dht_search_t *search = NULL;
ks_dhtrt_querynodes_t query;
+ ks_bool_t finished = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
search = (ks_dht_search_t *)job->data;
ks_mutex_lock(search->mutex);
- locked_search = KS_TRUE;
// find closest good nodes to target locally and store as the closest results
query.nodeid = search->target;
ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
}
ks_dhtrt_release_querynodes(&query);
+ finished = search->searching == 0;
// done:
- if (locked_search) ks_mutex_unlock(search->mutex);
+ ks_mutex_unlock(search->mutex);
- if (search->searching == 0) {
+ if (finished) {
if (search->callback) search->callback(dht, job);
ks_dht_search_destroy(&search);
}
ks_dhtrt_routetable_t *table,
ks_dht_nodeid_t *target)
{
+ char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_search_t *search = NULL;
ks_dht_job_t *job = NULL;
ks_assert(table);
ks_assert(target);
+ ks_log(KS_LOG_INFO, "[%s] Searching\n", ks_dht_hex(target->id, target_buf, KS_DHT_NODEID_SIZE));
+
ks_dht_search_create(&search, dht->pool, table, target, callback, data);
ks_assert(search);
KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_publish_t *publish = NULL;
+ ks_bool_t finished = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
publish = (ks_dht_publish_t *)job->data;
- // @todo callbacks need job to contain cascaded publish->data before calling
if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
- job->data = publish->data;
- if (publish->callback) publish->callback(dht, job);
+ finished = KS_TRUE;
goto done;
}
if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) {
ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item);
- } else if (publish->callback) {
- job->data = publish->data;
- publish->callback(dht, job);
- }
+ } else finished = KS_TRUE;
done:
-
+ if (finished) {
+ job->data = publish->data;
+ if (publish->callback) publish->callback(dht, job);
+ }
ks_dht_publish_destroy(&publish);
return ret;
}
int64_t cas,
ks_dht_storageitem_t *item)
{
+ char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_publish_t *publish = NULL;
const uint8_t *salt = NULL;
size_t salt_length = 0;
ks_assert(cas >= 0);
ks_assert(item);
+ ks_log(KS_LOG_INFO, "[%s] Publishing to %s %d\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE), raddr->host, raddr->port);
+
if (item->salt) {
salt = (const uint8_t *)ben_str_val(item->salt);
salt_length = ben_str_len(item->salt);
ks_dht_distribute_destroy(&distribute);
}
- ks_dht_search_destroy(&search);
-
return ret;
}
int64_t cas,
ks_dht_storageitem_t *item)
{
+ char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_distribute_t *distribute = NULL;
ks_assert(dht);
ks_assert(table);
ks_assert(cas >= 0);
ks_assert(item);
+
+ ks_log(KS_LOG_INFO, "[%s] Distributing\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE));
ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item);
ks_assert(distribute);
&message,
NULL)) != KS_STATUS_SUCCESS) goto done;
- //ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Ping query to %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
ks_q_push(dht->send_q, (void *)message);
done:
ks_assert(message);
ks_assert(message->args);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Ping query from %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
//ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
if ((ret = ks_dht_response_setup(dht,
ks_assert(dht);
ks_assert(job);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Ping response from %s %d\n",
+ job->response->endpoint->addr.host,
+ job->response->endpoint->addr.port,
+ job->response->raddr.host,
+ job->response->raddr.port);
+
//ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
// done:
ben_dict_set(a, ben_blob("want", 4), want);
}
- //ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Findnode query to %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
ks_q_push(dht->send_q, (void *)message);
done:
ks_assert(message);
ks_assert(message->args);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Findnode query from %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
ks_assert(dht);
ks_assert(job);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Findnode response from %s %d\n",
+ job->response->endpoint->addr.host,
+ job->response->endpoint->addr.port,
+ job->response->raddr.host,
+ job->response->raddr.port);
+
n = ben_dict_get_by_str(job->response->args, "nodes");
if (n && dht->rt_ipv4) {
//n4 = KS_TRUE;
if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
- //ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Get query to %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
if (item) {
ks_dht_storageitem_dereference(item);
ks_mutex_unlock(item->mutex);
ks_assert(message);
ks_assert(message->args);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Get query from %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
}
ks_hash_read_unlock(dht->storageitems_hash);
- // If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back
sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
- // @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower
- // maybe send a get query to the requester to update the local data
query.nodeid = *target;
query.type = KS_DHT_REMOTE;
ks_assert(dht);
ks_assert(job);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Get response from %s %d\n",
+ job->response->endpoint->addr.host,
+ job->response->endpoint->addr.port,
+ job->response->raddr.host,
+ job->response->raddr.port);
+
if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
job->response_token = *token;
ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE));
ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v));
- //ks_log(KS_LOG_DEBUG, "Sending message query put\n");
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Put query to %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
+
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
ks_assert(message);
ks_assert(message->args);
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Put query from %s %d\n",
+ message->endpoint->addr.host,
+ message->endpoint->addr.port,
+ message->raddr.host,
+ message->raddr.port);
if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
ks_assert(dht);
ks_assert(job);
- ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
+ ks_log(KS_LOG_INFO,
+ "[%s %d] Put response from %s %d\n",
+ job->response->endpoint->addr.host,
+ job->response->endpoint->addr.port,
+ job->response->raddr.host,
+ job->response->raddr.port);
+
+ //ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
// done:
return ret;