*/
d->recv_buffer_length = 0;
+ /**
+ * Initialize the jobs mutex
+ */
+ ks_mutex_create(&d->jobs_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+ ks_assert(d->jobs_mutex);
+
+ d->jobs_first = NULL;
+ d->jobs_last = NULL;
+
/**
* Initialize the transaction id mutex, should use atomic increment instead
*/
/**
* Create the hash to store arbitrary data for BEP44.
*/
- ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
- ks_assert(d->storage_hash);
+ ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->storageitems_hash);
/**
- * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
+ * The storageitems hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
*/
- ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE);
+ ks_hash_set_keysize(d->storageitems_hash, KS_DHT_NODEID_SIZE);
// done:
if (ret != KS_STATUS_SUCCESS) {
d = *dht;
/**
- * Cleanup the storage hash and it's contents if it is allocated.
+ * Cleanup the storageitems hash and it's contents if it is allocated.
*/
- if (d->storage_hash) {
- for (it = ks_hash_first(d->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ if (d->storageitems_hash) {
+ for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_dht_storageitem_t *val;
ks_hash_this_val(it, (void **)&val);
ks_dht_storageitem_destroy(&val);
}
- ks_hash_destroy(&d->storage_hash);
+ ks_hash_destroy(&d->storageitems_hash);
}
/**
if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex);
if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
+ /**
+ * Cleanup the jobs mutex and jobs if they are allocated.
+ */
+ for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
+ jobn = job->next;
+ ks_dht_job_destroy(&job);
+ }
+ if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
+
/**
* Probably don't need this, recv_buffer_length is temporary and may change
*/
dht->autoroute_port = port;
}
-KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
+KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
{
// @todo lookup standard def for IPV6 max size
char ip[48 + 1];
}
}
+ ks_dht_pulse_jobs(dht);
+
ks_dht_pulse_send(dht);
ks_dht_pulse_expirations(dht);
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) remove = KS_TRUE;
else if (value->expiration <= now) {
+ // if the transaction expires, so does the attached job, it may try again with a new transaction
+ value->job->state = KS_DHT_JOB_STATE_EXPIRING;
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
}
-KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
+KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
{
SHA_CTX sha;
uint16_t port = 0;
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
+KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
{
ks_dht_token_t tok;
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
- // @todo calculate max IPV6 payload size?
- char buf[1001];
+ char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1];
ks_size_t buf_len;
ks_assert(dht);
}
-KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
- ks_dht_endpoint_t *ep,
- ks_sockaddr_t *raddr,
+KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
+ ks_dht_job_t *job,
const char *query,
- ks_dht_message_callback_t callback,
+ ks_dht_job_callback_t callback,
ks_dht_transaction_t **transaction,
ks_dht_message_t **message,
struct bencode **args)
{
+ ks_dht_endpoint_t *ep = NULL;
uint32_t transactionid;
ks_dht_transaction_t *trans = NULL;
ks_dht_message_t *msg = NULL;
- ks_status_t ret = KS_STATUS_FAIL;
+ struct bencode *a = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(raddr);
+ ks_assert(job);
ks_assert(query);
ks_assert(callback);
ks_assert(message);
if (transaction) *transaction = NULL;
*message = NULL;
- if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
+ if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
// @todo atomic increment
ks_mutex_lock(dht->tid_mutex);
transactionid = dht->transactionid_next++;
ks_mutex_unlock(dht->tid_mutex);
- if ((ret = ks_dht_transaction_create(&trans, dht->pool, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
+ // if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
+ transactionid = htonl(transactionid);
+
+ ben_dict_set(msg->data, ben_blob("t", 1), ben_blob((uint8_t *)&transactionid, sizeof(uint32_t)));
+ ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("q", 1));
+ ben_dict_set(msg->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
+
+ // @note a joins msg->data and will be freed with it
+ a = ben_dict();
+ ks_assert(a);
+ ben_dict_set(msg->data, ben_blob("a", 1), a);
+
+ if (args) *args = a;
+
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
*message = msg;
if (transaction) *transaction = trans;
- ret = KS_STATUS_SUCCESS;
-
done:
if (ret != KS_STATUS_SUCCESS) {
if (trans) ks_dht_transaction_destroy(&trans);
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
+KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
- ks_sockaddr_t *raddr,
+ const ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
ks_dht_message_t **message,
struct bencode **args)
{
ks_dht_message_t *msg = NULL;
- ks_status_t ret = KS_STATUS_FAIL;
+ struct bencode *r = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
+ //if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
+ ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
+ ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
- *message = msg;
+ // @note r joins msg->data and will be freed with it
+ r = ben_dict();
+ ks_assert(r);
+ ben_dict_set(msg->data, ben_blob("r", 1), r);
- ret = KS_STATUS_SUCCESS;
+ if (args) *args = r;
+
+ ben_dict_set(r, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
+
+ *message = msg;
done:
if (ret != KS_STATUS_SUCCESS) {
struct bencode *a;
const char *qv;
ks_size_t qv_len;
+ ks_dht_nodeid_t *id;
+ ks_dht_node_t *node;
char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE];
ks_dht_message_callback_t callback;
- ks_status_t ret = KS_STATUS_FAIL;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
- // @todo start of ks_dht_message_parse_query
q = ben_dict_get_by_str(message->data, "q");
if (!q) {
ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
qv_len = ben_str_len(q);
if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len);
- return KS_STATUS_FAIL;
+ ret = KS_STATUS_FAIL;
+ goto done;
}
memcpy(query, qv, qv_len);
a = ben_dict_get_by_str(message->data, "a");
if (!a) {
ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n");
- return KS_STATUS_FAIL;
+ ret = KS_STATUS_FAIL;
+ goto done;
}
- // @todo end of ks_dht_message_parse_query
message->args = a;
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+ message->args_id = *id;
+
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+ if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
+ *id,
+ KS_DHT_REMOTE,
+ message->raddr.host,
+ message->raddr.port,
+ &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
+
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
ks_hash_read_unlock(dht->registry_query);
if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
else ret = callback(dht, message);
+ done:
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message)
{
struct bencode *r;
+ ks_dht_nodeid_t *id;
+ ks_dht_node_t *node;
ks_dht_transaction_t *transaction;
uint32_t *tid;
uint32_t transactionid;
- ks_status_t ret = KS_STATUS_FAIL;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
- // @todo start of ks_dht_message_parse_response
r = ben_dict_get_by_str(message->data, "r");
if (!r) {
ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n");
- return KS_STATUS_FAIL;
+ ret = KS_STATUS_FAIL;
+ goto done;
}
- // @todo end of ks_dht_message_parse_response
-
+
message->args = r;
+
+ if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+ message->args_id = *id;
+
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+ if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
+ *id,
+ KS_DHT_REMOTE,
+ message->raddr.host,
+ message->raddr.port,
+ &node)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
+
+ ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+ if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
+
tid = (uint32_t *)message->transactionid;
transactionid = ntohl(*tid);
ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
- else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+ else if (!ks_addr_cmp(&message->raddr, &transaction->job->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message response rejected due to spoofing from %s %d, expected %s %d\n",
message->raddr.host,
message->raddr.port,
- transaction->raddr.host,
- transaction->raddr.port);
+ transaction->job->raddr.host,
+ transaction->job->raddr.port);
} else {
+ transaction->job->response = message;
+ transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
message->transaction = transaction;
- ret = transaction->callback(dht, message);
+ if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
transaction->finished = KS_TRUE;
}
+ done:
return ret;
}
ks_dht_search_pending_destroy(&pending);
goto done;
}
- if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_findnode(dht, &n->addr, NULL, target)) != KS_STATUS_SUCCESS) goto done;
}
- // @todo release closest local query node locks
+ ks_dhtrt_release_querynodes(&query);
ks_mutex_unlock(s->mutex);
locked_search = KS_FALSE;
}
-KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
- ks_dht_endpoint_t *ep,
- ks_sockaddr_t *raddr,
- uint8_t *transactionid,
- ks_size_t transactionid_length,
- long long errorcode,
- const char *errorstr)
+KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
+ ks_dht_endpoint_t *ep,
+ const ks_sockaddr_t *raddr,
+ uint8_t *transactionid,
+ ks_size_t transactionid_length,
+ long long errorcode,
+ const char *errorstr)
{
ks_dht_message_t *error = NULL;
struct bencode *e = NULL;
if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
+ //if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
+ ben_dict_set(error->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
+ ben_dict_set(error->data, ben_blob("y", 1), ben_blob("e", 1));
+
+ // @note e joins error->data and will be freed with it
+ e = ben_list();
+ ks_assert(e);
+ ben_dict_set(error->data, ben_blob("e", 1), e);
+
ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ks_assert(dht);
ks_assert(message);
- // @todo start of ks_dht_message_parse_error
e = ben_dict_get_by_str(message->data, "e");
if (!e) {
ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n");
memcpy(error, et, es_len);
error[es_len] = '\0';
- // @todo end of ks_dht_message_parse_error
message->args = e;
goto done;
}
- if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+ if (!ks_addr_cmp(&message->raddr, &transaction->job->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message error rejected due to spoofing from %s %d, expected %s %d\n",
message->raddr.host,
message->raddr.port,
- transaction->raddr.host,
- transaction->raddr.port);
+ transaction->job->raddr.host,
+ transaction->job->raddr.port);
ret = KS_STATUS_FAIL;
goto done;
}
return ret;
}
+KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_assert(dht);
+ ks_assert(job);
+
+ ks_mutex_lock(dht->jobs_mutex);
+ if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
+ else dht->jobs_first = dht->jobs_last = job;
+ ks_mutex_unlock(dht->jobs_mutex);
+}
-KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
+KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
{
- ks_dht_message_t *message = NULL;
- struct bencode *a = NULL;
+ ks_assert(dht);
+
+ ks_mutex_lock(dht->jobs_mutex);
+ for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
+ ks_bool_t remove = KS_FALSE;
+ jobn = job->next;
+ switch (job->state) {
+ case KS_DHT_JOB_STATE_QUERYING:
+ if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
+ break;
+ case KS_DHT_JOB_STATE_RESPONDING:
+ break;
+ case KS_DHT_JOB_STATE_EXPIRING:
+ job->attempts--;
+ if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
+ else {
+ if (job->finish_callback) job->finish_callback(dht, job);
+ remove = KS_TRUE;
+ }
+ break;
+ case KS_DHT_JOB_STATE_PROCESSING:
+ break;
+ case KS_DHT_JOB_STATE_COMPLETING:
+ if (job->finish_callback) job->finish_callback(dht, job);
+ remove = KS_TRUE;
+ break;
+ default: break;
+ }
+
+ if (remove) {
+ if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
+ else if (!jobp) dht->jobs_first = jobn;
+ else if (!jobn) dht->jobs_last = jobp;
+ else jobp->next = jobn;
+ ks_dht_job_destroy(&job);
+ } else jobp = job;
+ }
+ ks_mutex_unlock(dht->jobs_mutex);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
+{
+ ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
- if ((ret = ks_dht_setup_query(dht,
- ep,
- raddr,
+ if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
+ ks_dht_jobs_add(dht, job);
+
+ // next step in ks_dht_pulse_jobs with QUERYING state
+
+ done:
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_dht_message_t *message = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(job);
+
+ if ((ret = ks_dht_query_setup(dht,
+ job,
"ping",
ks_dht_process_response_ping,
NULL,
&message,
- &a)) != KS_STATUS_SUCCESS) goto done;
-
- ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ NULL)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
{
- ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL;
- struct bencode *r = NULL;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
- if ((ret = ks_dht_setup_response(dht,
+ if ((ret = ks_dht_response_setup(dht,
message->endpoint,
&message->raddr,
message->transactionid,
message->transactionid_length,
&response,
- &r)) != KS_STATUS_SUCCESS) goto done;
-
- ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ NULL)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response);
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_dht_nodeid_t *id;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(message);
+ ks_assert(job);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+ ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
- routetable = message->endpoint->node->table;
+ job->state = KS_DHT_JOB_STATE_COMPLETING;
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
- ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
+ // done:
+ return ret;
+}
- ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
+
+KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target)
+{
+ ks_dht_job_t *job = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(target);
+
+ if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target);
+ ks_dht_jobs_add(dht, job);
+
+ // next step in ks_dht_pulse_jobs with QUERYING state
done:
return ret;
}
-
-KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_transaction_t *transaction = NULL;
ks_dht_message_t *message = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(raddr);
- ks_assert(targetid);
+ ks_assert(job);
- if ((ret = ks_dht_setup_query(dht,
- ep,
- raddr,
+ if ((ret = ks_dht_query_setup(dht,
+ job,
"find_node",
ks_dht_process_response_findnode,
&transaction,
&message,
&a)) != KS_STATUS_SUCCESS) goto done;
- memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE);
+ //memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE);
+ transaction->target = job->target;
- ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
- ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
// Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
- if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, targetid->id, KS_DHT_NODEID_SIZE)) {
+ if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) {
struct bencode *want = ben_list();
ben_list_append_str(want, "n4");
ben_list_append_str(want, "n6");
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
{
- ks_dht_nodeid_t *id;
ks_dht_nodeid_t *target;
struct bencode *want;
ks_bool_t want4 = KS_FALSE;
uint8_t buffer6[1000];
ks_size_t buffer4_length = 0;
ks_size_t buffer6_length = 0;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
ks_dhtrt_querynodes_t query;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(message);
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
want = ben_dict_get_by_str(message->args, "want");
want6 = message->raddr.family == AF_INET6;
}
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
- // @todo release query nodes
+ ks_dhtrt_release_querynodes(&query);
}
if (want6) {
query.family = AF_INET6;
ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
+ ks_dhtrt_release_querynodes(&query);
}
- if ((ret = ks_dht_setup_response(dht,
+ if ((ret = ks_dht_response_setup(dht,
message->endpoint,
&message->raddr,
message->transactionid,
&response,
&r)) != KS_STATUS_SUCCESS) goto done;
- ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_dht_nodeid_t *id;
struct bencode *n;
//ks_bool_t n4 = KS_FALSE;
//ks_bool_t n6 = KS_FALSE;
size_t nodes6_size = 0;
size_t nodes_len = 0;
size_t nodes6_len = 0;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_hash_t *searches = NULL;
ks_dht_search_t *search = NULL;
+ ks_dht_node_t *node = NULL;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(message);
- ks_assert(message->transaction);
+ ks_assert(job);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
- n = ben_dict_get_by_str(message->args, "nodes");
+ n = ben_dict_get_by_str(job->response->args, "nodes");
if (n) {
//n4 = KS_TRUE;
nodes = (const uint8_t *)ben_str_val(n);
nodes_size = ben_str_len(n);
}
- n = ben_dict_get_by_str(message->args, "nodes6");
+ n = ben_dict_get_by_str(job->response->args, "nodes6");
if (n) {
//n6 = KS_TRUE;
nodes6 = (const uint8_t *)ben_str_val(n);
nodes6_size = ben_str_len(n);
}
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
- ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
-
- searches = message->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
+ searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
ks_hash_read_lock(searches);
- search = ks_hash_search(searches, message->transaction->target.id, KS_UNLOCKED);
+ search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED);
if (search) {
ks_dht_search_pending_t *pending = NULL;
ks_mutex_lock(search->mutex);
- pending = ks_hash_search(search->pending, id->id, KS_UNLOCKED);
+ pending = ks_hash_search(search->pending, job->response->args_id.id, KS_UNLOCKED);
if (pending) pending->finished = KS_TRUE;
}
ks_hash_read_unlock(searches);
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
ks_dhtrt_release_node(node);
- if (search && message->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
+ if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
ks_dht_search_pending_destroy(&pending);
goto done;
}
- if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
}
}
}
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
ks_dhtrt_release_node(node);
- if (search && message->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
+ if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
ks_dht_search_pending_destroy(&pending);
goto done;
}
- if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
}
}
}
return ret;
}
+// @todo ks_dht_get
-KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
- ks_assert(raddr);
- ks_assert(targetid);
+ ks_assert(job);
- if (ks_dht_setup_query(dht,
- ep,
- raddr,
+ if (ks_dht_query_setup(dht,
+ job,
"get",
ks_dht_process_response_get,
NULL,
&message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
- ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message)
{
- ks_dht_nodeid_t *id;
ks_dht_nodeid_t *target;
struct bencode *seq;
int64_t sequence = -1;
ks_dht_storageitem_t *item = NULL;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
+ ks_dhtrt_querynodes_t query;
+ uint8_t buffer4[1000];
+ uint8_t buffer6[1000];
+ ks_size_t buffer4_length = 0;
+ ks_size_t buffer6_length = 0;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(message);
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
- item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED);
- ks_hash_read_unlock(dht->storage_hash);
+ item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED);
+ ks_hash_read_unlock(dht->storageitems_hash);
sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
- // @todo if sequence is provided then requester has the data so if the local sequence is lower, maybe create job to update local data from the requester?
+ // @todo if sequence is provided then requester has the data, so if the local sequence is lower maybe send a get to the requester to update local data?
+
+ query.nodeid = *target;
+ query.type = KS_DHT_REMOTE;
+ query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
+ if (dht->rt_ipv4) {
+ query.family = AF_INET;
+
+ ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
+ for (int32_t i = 0; i < query.count; ++i) {
+ ks_dht_node_t *qn = query.nodes[i];
+
+ if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+ &qn->addr,
+ buffer4,
+ &buffer4_length,
+ sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
- // @todo find closest ipv4 and ipv6 nodes to target
+ ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+ }
+ ks_dhtrt_release_querynodes(&query);
+ }
+ if (dht->rt_ipv6) {
+ query.family = AF_INET6;
- // @todo compact ipv4 and ipv6 nodes into separate buffers
+ ks_dhtrt_findclosest_nodes(dht->rt_ipv6, &query);
+ for (int32_t i = 0; i < query.count; ++i) {
+ ks_dht_node_t *qn = query.nodes[i];
- if ((ret = ks_dht_setup_response(dht,
+ if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+ &qn->addr,
+ buffer6,
+ &buffer6_length,
+ sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
+
+ ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+ }
+ ks_dhtrt_release_querynodes(&query);
+ }
+
+
+ if ((ret = ks_dht_response_setup(dht,
message->endpoint,
&message->raddr,
message->transactionid,
&response,
&r)) != KS_STATUS_SUCCESS) goto done;
- ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE));
if (item) {
if (item->mutable) {
}
if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
}
- // @todo nodes, nodes6
+ if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
+ if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
ks_log(KS_LOG_DEBUG, "Sending message response get\n");
ks_q_push(dht->send_q, (void *)response);
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_dht_nodeid_t *id;
ks_dht_token_t *token;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(message);
+ ks_assert(job);
// @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
- if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
// @todo add extract function for mutable ks_dht_storageitem_key_t
// @todo add extract function for mutable ks_dht_storageitem_signature_t
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
- ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
// @todo add/touch bucket entries for other nodes/nodes6 returned
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
return ret;
}
-
-// @todo ks_dht_send_put
+// @todo ks_dht_put
+// @todo ks_dht_query_put
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
- ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
- if ((ret = ks_dht_setup_response(dht,
+ if ((ret = ks_dht_response_setup(dht,
message->endpoint,
&message->raddr,
message->transactionid,
&response,
&r)) != KS_STATUS_SUCCESS) goto done;
- //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-
ks_log(KS_LOG_DEBUG, "Sending message response put\n");
ks_q_push(dht->send_q, (void *)response);
return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_dht_nodeid_t *id;
- ks_dhtrt_routetable_t *routetable = NULL;
- ks_dht_node_t *node = NULL;
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
- ks_assert(message);
-
- if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
- routetable = message->endpoint->node->table;
-
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
- if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
- ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
- if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
+ ks_assert(job);
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
- done:
+ // done:
return ret;
}