/**
* Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created.
*/
- if (pool_alloc && (ret = ks_pool_open(&pool)) != KS_STATUS_SUCCESS) goto done;
+ if (pool_alloc) {
+ ks_pool_open(&pool);
+ ks_assert(pool);
+ }
/**
* Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created.
*/
*dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t));
- if (!d) {
- ret = KS_STATUS_NO_MEM;
- goto done;
- }
+ ks_assert(d);
/**
* Keep track of the pool used for future allocations and cleanup.
d->tpool = tpool;
if (!tpool) {
d->tpool_alloc = KS_TRUE;
- if ((ret = ks_thread_pool_create(&d->tpool,
- KS_DHT_TPOOL_MIN,
- KS_DHT_TPOOL_MAX,
- KS_DHT_TPOOL_STACK,
- KS_PRI_NORMAL,
- KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) goto done;
+ ks_thread_pool_create(&d->tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+ ks_assert(d->tpool);
}
/**
/**
* Create the message type registry.
*/
- if ((ret = ks_hash_create(&d->registry_type,
- KS_HASH_MODE_DEFAULT,
- KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->registry_type);
/**
* Register the message type callbacks for query (q), response (r), and error (e)
/**
* Create the message query registry.
*/
- if ((ret = ks_hash_create(&d->registry_query,
- KS_HASH_MODE_DEFAULT,
- KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->registry_query);
/**
* Register the message query callbacks for ping, find_node, etc.
/**
* Create the message error registry.
*/
- if ((ret = ks_hash_create(&d->registry_error,
- KS_HASH_MODE_DEFAULT,
- KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->registry_error);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
/**
* This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
* This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
*/
- if ((ret = ks_hash_create(&d->endpoints_hash,
- KS_HASH_MODE_DEFAULT,
- KS_HASH_FLAG_RWLOCK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool);
+ ks_assert(d->endpoints_hash);
/**
* Default expirations to not be checked for one pulse.
*/
- d->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS;
+ d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000);
/**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
*/
- if ((ret = ks_q_create(&d->send_q, d->pool, 0)) != KS_STATUS_SUCCESS) goto done;
+ ks_q_create(&d->send_q, d->pool, 0);
+ ks_assert(d->send_q);
/**
* If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message.
/**
* Initialize the transaction id mutex, should use atomic increment instead
*/
- if ((ret = ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+ ks_assert(d->tid_mutex);
/**
* Initialize the first transaction id randomly, this doesn't really matter.
* Create the hash to track pending transactions on queries that are pending responses.
* It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
*/
- if ((ret = ks_hash_create(&d->transactions_hash,
- KS_HASH_MODE_INT,
- KS_HASH_FLAG_RWLOCK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool);
+ ks_assert(d->transactions_hash);
/**
* The internal route tables will be latent allocated when binding.
/**
* Create the hash to store searches.
*/
- if ((ret = ks_hash_create(&d->search_hash,
- KS_HASH_MODE_ARBITRARY,
- KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->search_hash);
+
/**
* The search hash uses arbitrary key size, which requires the key size be provided.
*/
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_current = d->token_secret_previous = rand();
- d->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+ d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
/**
* Create the hash to store arbitrary data for BEP44.
*/
- if ((ret = ks_hash_create(&d->storage_hash,
- KS_HASH_MODE_ARBITRARY,
- KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
- d->pool)) != KS_STATUS_SUCCESS) goto done;
+ ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->storage_hash);
+
/**
* The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
*/
ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE);
- done:
+ // done:
if (ret != KS_STATUS_SUCCESS) {
if (d) ks_dht_destroy(&d);
else if (pool_alloc && pool) ks_pool_close(&pool);
*/
pool = d->pool;
pool_alloc = d->pool_alloc;
-
+
/**
* Free the dht instance from the pool, after this the dht instance memory is invalid.
*/
ks_assert(value);
ks_assert(callback);
- return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+ return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
}
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
ks_assert(value);
ks_assert(callback);
- return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+ return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
}
KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
ks_assert(value);
ks_assert(callback);
- return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+ return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
}
if (endpoint) *endpoint = NULL;
ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
- if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret;
+ ks_hash_read_unlock(dht->endpoints_hash);
if (ep) {
ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
return KS_STATUS_FAIL;
/**
* Allocate the endpoint to track the local socket.
*/
- if ((ret = ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock);
+ ks_assert(ep);
/**
* Resize the endpoints array to take another endpoint pointer.
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
+ ks_assert(dht->endpoints);
dht->endpoints[epindex] = ep;
/**
* Add the new endpoint into the endpoints hash for quick lookups.
+ * @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed
*/
- if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) {
- ret = KS_STATUS_FAIL;
- goto done;
- }
+ if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done;
/**
* Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size);
+ ks_assert(dht->endpoints_poll);
dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
ep->addr.host,
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
- /**
- * Do not release the ep->node, keep it alive until cleanup
- */
} else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
ep->addr.host,
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
- /**
- * Do not release the ep->node, keep it alive until cleanup
- */
}
+ /**
+ * Do not release the ep->node, keep it alive until cleanup
+ */
/**
* If the endpoint output is being captured, assign it and return successfully.
*/
if (endpoint) *endpoint = ep;
- ret = KS_STATUS_SUCCESS;
-
done:
if (ret != KS_STATUS_SUCCESS) {
/**
* This will be done in ks_dht_endpoint_destroy only if the socket was assigned during a successful ks_dht_endpoint_create.
* Then return whatever failure condition resulted in landed here.
*/
- if (ep) ks_dht_endpoint_destroy(&ep);
+ if (ep) {
+ ks_hash_remove(dht->endpoints_hash, ep->addr.host);
+ ks_dht_endpoint_destroy(&ep);
+ }
else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock);
if (endpoint) *endpoint = NULL;
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{
ks_dht_datagram_t *datagram = NULL;
- int32_t result;
ks_sockaddr_t raddr;
ks_assert(dht);
- ks_assert (timeout > 0);
+ ks_assert(timeout > 0);
if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
- result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout);
- if (result > 0) {
+ // @todo confirm how poll/wsapoll react to zero size and NULL array
+ if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) {
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
-
+
raddr = (const ks_sockaddr_t){ 0 };
dht->recv_buffer_length = sizeof(dht->recv_buffer);
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue;
-
+
if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) {
ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port);
continue;
}
-
- if (ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr) == KS_STATUS_SUCCESS &&
- ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
+
+ ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr);
+ ks_assert(datagram);
+
+ if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
}
}
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
- ks_time_t now = ks_time_now_sec();
+ ks_time_t now = ks_time_now();
ks_assert(dht);
- if (dht->pulse_expirations <= now) {
- dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS;
- }
+ if (dht->pulse_expirations > now) return;
+ dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000);
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
}
ks_hash_write_unlock(dht->transactions_hash);
+ ks_hash_write_lock(dht->search_hash);
+ for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *search_key = NULL;
+ ks_dht_search_t *search_value = NULL;
+
+ ks_hash_this(it, &search_key, NULL, (void **)&search_value);
+
+ ks_hash_write_lock(search_value->pending);
+ for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
+ const void *pending_key = NULL;
+ ks_dht_search_pending_t *pending_value = NULL;
+ ks_bool_t pending_remove = KS_FALSE;
+
+ ks_hash_this(i, &pending_key, NULL, (void **)&pending_value);
+
+ if (pending_value->finished) pending_remove = KS_TRUE;
+ else if (pending_value->expiration <= now) {
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_log(KS_LOG_DEBUG,
+ "Search for %s pending find_node to %s has expired without response\n",
+ ks_dht_hexid(&search_value->target, id_buf),
+ ks_dht_hexid(&pending_value->nodeid, id2_buf));
+ pending_remove = KS_TRUE;
+ }
+ if (pending_remove) {
+ ks_hash_remove(search_value->pending, (void *)pending_key);
+ ks_dht_search_pending_destroy(&pending_value);
+ }
+ }
+ ks_hash_write_unlock(search_value->pending);
+ if (ks_hash_count(search_value->pending) == 0) {
+ for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value);
+ ks_hash_remove(dht->search_hash, (void *)search_key);
+ ks_dht_search_destroy(&search_value);
+ }
+ }
+ ks_hash_write_unlock(dht->search_hash);
+
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
- dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+ dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
*message = msg;
- if (!ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) {
- ret = KS_STATUS_FAIL;
- goto done;
- }
+ if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done;
if (transaction) *transaction = trans;
ks_dht_search_callback_t callback,
ks_dht_search_t **search)
{
+ ks_bool_t locked_search = KS_FALSE;
+ ks_bool_t locked_pending = KS_FALSE;
ks_dht_search_t *s = NULL;
- ks_status_t ret = KS_STATUS_SUCCESS;
ks_bool_t inserted = KS_FALSE;
ks_bool_t allocated = KS_FALSE;
ks_dhtrt_querynodes_t query;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(family == AF_INET || family == AF_INET6);
if (search) *search = NULL;
+ // @todo start write lock on search_hash and hold until after inserting
// check hash for target to see if search already exists
- s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED);
- ks_hash_read_unlock(dht->search_hash); // @todo hold lock until finished adding new entry?
+ ks_hash_write_lock(dht->search_hash);
+ locked_search = KS_TRUE;
+
+ s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED);
// if search does not exist, create new search and store in hash by target
if (!s) {
// if the search is old then bail out and return successfully
if (!allocated) goto done;
+ if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
+ inserted = KS_TRUE;
+
+ // lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up
+ ks_hash_write_lock(s->pending);
+ locked_pending = KS_TRUE;
+
+ // release search hash lock now, but pending is still locked
+ ks_hash_write_unlock(dht->search_hash);
+ locked_search = KS_FALSE;
+
// find closest good nodes to target locally and store as the closest results
query.nodeid = *target;
query.type = KS_DHT_REMOTE;
ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target);
// add to pending with expiration
if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
- if (!ks_hash_insert(s->pending, n->nodeid.id, pending)) {
+ if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
ks_dht_search_pending_destroy(&pending);
- ret = KS_STATUS_FAIL;
goto done;
}
if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
+ // increment here in case we end up bailing out; execute with what it has or destroy the search?
+ s->results_length++;
}
- s->results_length = query.count;
// @todo release query nodes
-
- // @todo if entry has been added since we checked above this may fail, try adding callback instead of failing? or retain lock from earlier
- if (!ks_hash_insert(dht->search_hash, s->target.id, s)) {
- ret = KS_STATUS_FAIL;
- goto done;
- }
- inserted = KS_TRUE;
+ ks_hash_write_unlock(s->pending);
+ locked_pending = KS_FALSE;
if (search) *search = s;
done:
- if (ret != KS_STATUS_SUCCESS && !inserted && s) ks_dht_search_destroy(&s);
+ if (locked_search) ks_hash_write_unlock(dht->search_hash);
+ if (locked_pending) ks_hash_write_unlock(s->pending);
+ if (ret != KS_STATUS_SUCCESS) {
+ if (!inserted && s) ks_dht_search_destroy(&s);
+ *search = NULL;
+ }
return ret;
}
{
ks_dht_message_t *error = NULL;
struct bencode *e = NULL;
- ks_status_t ret = KS_STATUS_FAIL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(errorstr);
- if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ks_q_push(dht->send_q, (void *)error);
- ret = KS_STATUS_SUCCESS;
-
done:
if (ret != KS_STATUS_SUCCESS && error) ks_dht_message_destroy(&error);
return ret;
ks_dht_transaction_t *transaction;
uint32_t *tid;
uint32_t transactionid;
- ks_status_t ret = KS_STATUS_FAIL;
+ ks_dht_message_callback_t callback;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
es_len = ben_str_len(es);
if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len);
- return KS_STATUS_FAIL;
+ ret = KS_STATUS_FAIL;
+ goto done;
}
errorcode = ben_int_val(ec);
et = ben_str_val(es);
if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
- } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message error rejected due to spoofing from %s %d, expected %s %d\n",
message->raddr.host,
message->raddr.port,
transaction->raddr.host,
transaction->raddr.port);
- } else {
- ks_dht_message_callback_t callback;
- transaction->finished = KS_TRUE;
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
- callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
- ks_hash_read_unlock(dht->registry_error);
+ transaction->finished = KS_TRUE;
- if (callback) ret = callback(dht, message);
- else {
- ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
- ret = KS_STATUS_SUCCESS;
- }
- }
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
+ ks_hash_read_unlock(dht->registry_error);
+
+ if (callback) ret = callback(dht, message);
+ else ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
+ done:
return ret;
}
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
- if (ks_dht_setup_query(dht,
- ep,
- raddr,
- "ping",
- ks_dht_process_response_ping,
- NULL,
- &message,
- &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_setup_query(dht,
+ ep,
+ raddr,
+ "ping",
+ ks_dht_process_response_ping,
+ NULL,
+ &message,
+ &a)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
- if (ks_dht_setup_response(dht,
- message->endpoint,
- &message->raddr,
- message->transactionid,
- message->transactionid_length,
- &response,
- &r) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if ((ret = ks_dht_setup_response(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ &response,
+ &r)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response);
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
ks_dht_transaction_t *transaction = NULL;
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(targetid);
- if (ks_dht_setup_query(dht,
- ep,
- raddr,
- "find_node",
- ks_dht_process_response_findnode,
- &transaction,
- &message,
- &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_setup_query(dht,
+ ep,
+ raddr,
+ "find_node",
+ ks_dht_process_response_findnode,
+ &transaction,
+ &message,
+ &a)) != KS_STATUS_SUCCESS) goto done;
memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE);
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
ks_dht_node_t *node = NULL;
ks_dhtrt_querynodes_t query;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
want = ben_dict_get_by_str(message->args, "want");
if (want) {
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *qn = query.nodes[i];
- if (ks_dht_utility_compact_nodeinfo(&qn->nodeid,
- &qn->addr,
- buffer4,
- &buffer4_length,
- sizeof(buffer4)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+ &qn->addr,
+ buffer4,
+ &buffer4_length,
+ sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *qn = query.nodes[i];
- if (ks_dht_utility_compact_nodeinfo(&qn->nodeid,
- &qn->addr,
- buffer6,
- &buffer6_length,
- sizeof(buffer6)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+ &qn->addr,
+ buffer6,
+ &buffer6_length,
+ sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
}
- if (ks_dht_setup_response(dht,
- message->endpoint,
- &message->raddr,
- message->transactionid,
- message->transactionid_length,
- &response,
- &r) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_setup_response(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ &response,
+ &r)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response);
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_search_t *search = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->transaction);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
n = ben_dict_get_by_str(message->args, "nodes");
if (n) {
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
- search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_READLOCKED);
+ ks_hash_read_lock(dht->search_hash);
+ search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED);
ks_hash_read_unlock(dht->search_hash);
if (search) {
ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED);
ks_sockaddr_t addr;
addr.family = AF_INET;
- if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
search->results[results_index] = nid;
search->distances[results_index] = distance;
- if (ks_dht_search_pending_create(&pending, search->pool, &nid) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (!ks_hash_insert(search->pending, nid.id, pending)) {
+ if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
ks_dht_search_pending_destroy(&pending);
- return KS_STATUS_FAIL;
+ goto done;
}
- if (ks_dht_send_findnode(dht, NULL, &addr, &search->target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
}
}
}
ks_sockaddr_t addr;
addr.family = AF_INET6;
- if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
// @todo compact ipv4 and ipv6 nodes into separate buffers
- if (ks_dht_setup_response(dht,
- message->endpoint,
- &message->raddr,
- message->transactionid,
- message->transactionid_length,
- &response,
- &r) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if ((ret = ks_dht_setup_response(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ &response,
+ &r)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message response get\n");
ks_q_push(dht->send_q, (void *)response);
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
// @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
// @todo add extract function for mutable ks_dht_storageitem_key_t
// @todo add extract function for mutable ks_dht_storageitem_signature_t
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
// @todo add/touch bucket entries for other nodes/nodes6 returned
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
- if (ks_dht_setup_response(dht,
- message->endpoint,
- &message->raddr,
- message->transactionid,
- message->transactionid_length,
- &response,
- &r) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if ((ret = ks_dht_setup_response(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ &response,
+ &r)) != KS_STATUS_SUCCESS) goto done;
//ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message response put\n");
ks_q_push(dht->send_q, (void *)response);
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
- return KS_STATUS_SUCCESS;
+ done:
+ return ret;
}
/* For Emacs: