}
-KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht)
+KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(dht->pool);
+ /**
+ * Create a new internally managed thread pool if one wasn't provided.
+ */
+ if (!tpool) {
+ if ((ret = ks_thread_pool_create(&tpool,
+ KS_DHT_TPOOL_MIN,
+ KS_DHT_TPOOL_MAX,
+ KS_DHT_TPOOL_STACK,
+ KS_PRI_NORMAL,
+ KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) return ret;
+ dht->tpool_alloc = KS_TRUE;
+ }
+ dht->tpool = tpool;
+
/**
* Default autorouting to disabled.
*/
dht->autoroute = KS_FALSE;
dht->autoroute_port = 0;
+ /**
+ * If the thread pool was allocated internally, destroy it.
+ * If this fails, something catastrophically bad happened like memory corruption.
+ */
+ if (dht->tpool_alloc && (ret = ks_thread_pool_destroy(&dht->tpool)) != KS_STATUS_SUCCESS) return ret;
+ dht->tpool_alloc = KS_FALSE;
+
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
{
// @todo lookup standard def for IPV6 max size
- char ip[48];
+ char ip[48 + 1];
ks_dht_endpoint_t *ep = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
/**
* Check if the endpoint has already been bound for the address we want to route through.
- * @todo ip:port for key to allow a single ip with multiple endpoints on different ports
*/
ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
*/
if (endpoint) *endpoint = NULL;
+ ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
+ if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
+ if (ep) {
+ ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
+ return KS_STATUS_FAIL;
+ }
+
/**
* Legacy code, this can probably go away
*/
/**
* Add the new endpoint into the endpoints hash for quick lookups.
- * @todo ip:port for key to allow a single ip with multiple endpoints on different ports
*/
if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) {
ret = KS_STATUS_FAIL;
ep->addr.host,
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
+ /**
+ * Do not release the ep->node, keep it alive until cleanup
+ */
} else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
ep->addr.host,
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
+ /**
+ * Do not release the ep->node, keep it alive until cleanup
+ */
}
/**
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{
+ ks_dht_datagram_t *datagram = NULL;
int32_t result;
ks_assert(dht);
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (dht->endpoints_poll[i].revents & POLLIN) {
ks_sockaddr_t raddr = KS_SA_INIT;
- dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE;
+ dht->recv_buffer_length = sizeof(dht->recv_buffer);
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
- // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool
- ks_dht_process(dht, dht->endpoints[i], &raddr);
+ if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) {
+ ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port);
+ } else {
+ // @todo check for recycled datagrams
+ if (ks_dht_datagram_alloc(&datagram, dht->pool) == KS_STATUS_SUCCESS) {
+ if (ks_dht_datagram_init(datagram, dht, dht->endpoints[i], &raddr) != KS_STATUS_SUCCESS) {
+ // @todo add to recycled datagrams
+ ks_dht_datagram_free(&datagram);
+ } else if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) {
+ // @todo add to recycled datagrams
+ ks_dht_datagram_deinit(datagram);
+ ks_dht_datagram_free(&datagram);
+ }
+ }
+ }
}
}
}
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
+ // @todo change parameters to dereferenced pointer and forward buffer pointer directly
+
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
+ // @todo change parameters to dereferenced pointer and forward buffer pointer directly
+
addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM;
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
+ // @todo change parameters to dereferenced pointer and forward buffer pointer directly
+
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_NO_MEM;
ks_assert(address);
ks_assert(address->family == AF_INET ||address->family == AF_INET6);
+ // @todo change parameters to dereferenced pointer and forward buffer pointer directly
+
if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE);
// @todo blacklist check
+ // @todo use different encode function to check if all data was encoded, do not send large incomplete messages
buf_len = ben_encode2(buf, sizeof(buf), message->data);
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
ks_sockaddr_t *raddr,
const char *query,
ks_dht_message_callback_t callback,
+ ks_dht_transaction_t **transaction,
ks_dht_message_t **message,
struct bencode **args)
{
ks_assert(callback);
ks_assert(message);
+ if (transaction) *transaction = NULL;
*message = NULL;
if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
goto done;
}
+ if (transaction) *transaction = trans;
+
ret = KS_STATUS_SUCCESS;
done:
}
-KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
+KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data)
+{
+ ks_dht_datagram_t *datagram = (ks_dht_datagram_t *)data;
+ ks_dht_message_t message;
+ ks_dht_message_callback_t callback;
+
+ ks_assert(thread);
+ ks_assert(data);
+
+ ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", datagram->raddr.host, datagram->raddr.port);
+ if (datagram->raddr.family != AF_INET && datagram->raddr.family != AF_INET6) {
+ ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n");
+ return NULL;
+ }
+
+ // @todo blacklist check for bad actor nodes
+
+ if (ks_dht_message_prealloc(&message, datagram->dht->pool) != KS_STATUS_SUCCESS) return NULL;
+
+ if (ks_dht_message_init(&message, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) return NULL;
+
+ if (ks_dht_message_parse(&message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done;
+
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message.type, KS_READLOCKED);
+ ks_hash_read_unlock(datagram->dht->registry_type);
+
+ if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
+ else callback(datagram->dht, &message);
+
+ done:
+ ks_dht_message_deinit(&message);
+
+ // @todo recycle datagram
+ ks_dht_datagram_deinit(datagram);
+ ks_dht_datagram_free(&datagram);
+
+ return NULL;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
{
ks_dht_message_t message;
ks_dht_message_callback_t callback;
}
-KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback)
+KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
+ int family,
+ ks_dht_nodeid_t *target,
+ ks_dht_search_callback_t callback,
+ ks_dht_search_t **search)
{
- ks_assert(dht);
- ks_assert(id);
-
- // @todo check hash for id to see if search already exists
-
- // @todo if search does not exist, create new search and store in hash by id
-
- // @todo queue callback into search, if multiple tasks are searching the same id they can all be notified of results
-
- // @todo if search existed already and is already running then bail out and let it run
-
- // @todo find closest nodes to id locally, store as closest results, and queue in search pending a find_node call for closer nodes
-
- // @todo pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
-
+ ks_dht_search_t *s = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ ks_bool_t inserted = KS_FALSE;
+ ks_bool_t allocated = KS_FALSE;
+ ks_dhtrt_querynodes_t query;
- // @todo upon receiving response to find_node, check for an existing search by the id
+ ks_assert(dht);
+ ks_assert(family == AF_INET || family == AF_INET6);
+ ks_assert(target);
- // @todo keep track of the closest K(8) nodes found to the id
+ if (search) *search = NULL;
- // @todo if there is closer node(s) in response, update furthest search result(s) and queue find_node calls for closer nodes
+ // check hash for target to see if search already exists
+ s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED);
+ ks_hash_read_unlock(dht->search_hash);
- // @todo if search queue is empty, call callbacks
+ // if search does not exist, create new search and store in hash by target
+ if (!s) {
+ if ((ret = ks_dht_search_alloc(&s, dht->pool)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_search_init(s, target)) != KS_STATUS_SUCCESS) goto done;
+ allocated = KS_TRUE;
+ } else inserted = KS_TRUE;
- // @todo otherwise pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
+ // add callback regardless of whether the search is new or old
+ if ((ret = ks_dht_search_callback_add(s, callback)) != KS_STATUS_SUCCESS) goto done;
+ // if the search is old then bail out and return successfully
+ if (!allocated) goto done;
- // @todo during pulse iterate searches and check for last popped timeout where find_node received no reply
+ // find closest good nodes to target locally and store as the closest results
+ query.nodeid = *target;
+ query.type = KS_DHT_REMOTE;
+ query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE;
+ query.family = family;
+ ks_dhtrt_findclosest_nodes(family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, &query);
+ for (int32_t i = 0; i < query.count; ++i) {
+ ks_dht_node_t *n = query.nodes[i];
+ ks_dht_search_pending_t *pending = NULL;
+ s->results[i] = n;
+ // add to pending with expiration
+ if ((ret = ks_dht_search_pending_alloc(&pending, s->pool)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_search_pending_init(pending, n)) != KS_STATUS_SUCCESS) {
+ ks_dht_search_pending_free(&pending);
+ goto done;
+ }
+ if (!ks_hash_insert(s->pending, n->nodeid.id, n)) {
+ ks_dht_search_pending_deinit(pending);
+ ks_dht_search_pending_free(&pending);
+ goto done;
+ }
+ // @todo call send_findnode, but transactions need to track the target id from a find_node query since find_node response does not contain it
+ }
+ s->results_length = query.count;
+
+ if (!ks_hash_insert(dht->search_hash, s->target.id, s)) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+ inserted = KS_TRUE;
- // @todo pop a pending find_node call, or call callbacks if empty
+ if (search) *search = s;
+ ret = KS_STATUS_SUCCESS;
- return KS_STATUS_SUCCESS;
+ done:
+ if (ret != KS_STATUS_SUCCESS && !inserted && s) {
+ ks_dht_search_deinit(s);
+ ks_dht_search_free(&s);
+ }
+ return ret;
}
ks_assert(dht);
ks_assert(raddr);
- if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dht_setup_query(dht,
+ ep,
+ raddr,
+ "ping",
+ ks_dht_process_response_ping,
+ NULL,
+ &message,
+ &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
{
+ ks_dht_transaction_t *transaction = NULL;
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(raddr);
ks_assert(targetid);
- if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dht_setup_query(dht,
+ ep,
+ raddr,
+ "find_node",
+ ks_dht_process_response_findnode,
+ &transaction,
+ &message,
+ &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE);
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+ // @todo produce "want" value if both families are bound
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
want = ben_dict_get_by_str(message->args, "want");
if (want) {
+ // @todo use ben_list_for_each
size_t want_len = ben_list_len(want);
for (size_t i = 0; i < want_len; ++i) {
struct bencode *iv = ben_list_get(want, i);
- if (!ben_cmp_with_str(iv, "n4")) want4 = KS_TRUE;
- if (!ben_cmp_with_str(iv, "n6")) want6 = KS_TRUE;
+ if (!ben_cmp_with_str(iv, "n4") && dht->rt_ipv4) want4 = KS_TRUE;
+ if (!ben_cmp_with_str(iv, "n6") && dht->rt_ipv6) want6 = KS_TRUE;
}
}
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
ks_dhtrt_findclosest_nodes(routetable, &query);
for (int32_t i = 0; i < query.count; ++i) {
- if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
- &query.nodes[i]->addr,
+ ks_dht_node_t *qn = query.nodes[i];
+
+ if (ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+ &qn->addr,
buffer4,
&buffer4_length,
- sizeof(buffer4)) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
- ks_log(KS_LOG_DEBUG,
- "Compacted ipv4 nodeinfo for %s (%s %d)\n",
- ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
- query.nodes[i]->addr.host,
- query.nodes[i]->addr.port);
+ sizeof(buffer4)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
}
if (want6) {
ks_dhtrt_findclosest_nodes(routetable, &query);
for (int32_t i = 0; i < query.count; ++i) {
- if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
- &query.nodes[i]->addr,
+ ks_dht_node_t *qn = query.nodes[i];
+
+ if (ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+ &qn->addr,
buffer6,
&buffer6_length,
- sizeof(buffer6)) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
- ks_log(KS_LOG_DEBUG,
- "Compacted ipv6 nodeinfo for %s (%s %d)\n",
- ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
- query.nodes[i]->addr.host,
- query.nodes[i]->addr.port);
+ sizeof(buffer6)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
}
message->transactionid,
message->transactionid_length,
&response,
- &r) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ &r) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
ks_assert(dht);
ks_assert(message);
+ // @todo pass in the ks_dht_transaction_t from the original query, available one call higher, to get the target id for search updating
+ // @todo make a utility function to produce a xor of two nodeid's for distance checks based on memcmp on the existing results and new response nodes
+ // @todo lookup search by target from transaction, lookup responding node id in search pending hash, set entry to finished for purging
+ // @todo check response nodes for closer nodes than results contain, skip duplicates, add pending and call send_findnode for new closer results
+
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
n = ben_dict_get_by_str(message->args, "nodes");
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
+ ks_dhtrt_release_node(node);
}
while (nodes6_len < nodes6_size) {
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
+ ks_dhtrt_release_node(node);
}
// @todo repeat above for ipv6 table
ks_assert(raddr);
ks_assert(targetid);
- if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dht_setup_query(dht,
+ ep,
+ raddr,
+ "get",
+ ks_dht_process_response_get,
+ NULL,
+ &message,
+ &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;