/**
* Default expirations to not be checked for one pulse.
*/
- d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000);
+ d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
/**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_current = d->token_secret_previous = rand();
- d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
+ d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
/**
* Create the hash to store arbitrary data for BEP44.
*/
- ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->storageitems_hash);
/**
ks_assert(dht);
if (dht->pulse_expirations > now) return;
- dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000);
+ dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) remove = KS_TRUE;
else if (value->expiration <= now) {
- // if the transaction expires, so does the attached job, it may try again with a new transaction
+ // if the transaction expires, so does the attached job, but the job may try again with a new transaction
value->job->state = KS_DHT_JOB_STATE_EXPIRING;
- ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+ ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration);
remove = KS_TRUE;
}
if (remove) {
if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
+ // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
+
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
- dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
+ dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args,
+ ks_bool_t optional,
+ const char *key,
+ ks_dht_storageitem_key_t **sikey)
+{
+ struct bencode *k;
+ const char *kv;
+ ks_size_t kv_len;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(args);
+ ks_assert(key);
+ ks_assert(sikey);
+
+ *sikey = NULL;
+
+ k = ben_dict_get_by_str(args, key);
+ if (!k) {
+ if (!optional) {
+ ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
+ ret = KS_STATUS_ARG_INVALID;
+ }
+ goto done;
+ }
+
+ kv = ben_str_val(k);
+ kv_len = ben_str_len(k);
+ if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len);
+ return KS_STATUS_ARG_INVALID;
+ }
+
+ *sikey = (ks_dht_storageitem_key_t *)kv;
+
+ done:
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_signature(struct bencode *args,
+ ks_bool_t optional,
+ const char *key,
+ ks_dht_storageitem_signature_t **signature)
+{
+ struct bencode *sig;
+ const char *sigv;
+ ks_size_t sigv_len;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(args);
+ ks_assert(key);
+ ks_assert(signature);
+
+ *signature = NULL;
+
+ sig = ben_dict_get_by_str(args, key);
+ if (!sig) {
+ if (!optional) {
+ ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
+ ret = KS_STATUS_ARG_INVALID;
+ }
+ goto done;
+ }
+
+ sigv = ben_str_val(sig);
+ sigv_len = ben_str_len(sig);
+ if (sigv_len != KS_DHT_STORAGEITEM_SIGNATURE_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, sigv_len);
+ return KS_STATUS_ARG_INVALID;
+ }
+
+ *signature = (ks_dht_storageitem_signature_t *)sigv;
+
+ done:
+ return ret;
+}
+
KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
{
return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
}
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target)
+{
+ SHA_CTX sha;
+ const uint8_t *v;
+ size_t v_len;
+
+ ks_assert(value);
+ ks_assert(target);
+
+ v = (const uint8_t *)ben_str_val(value);
+ v_len = ben_str_len(value);
+
+ if (!SHA1_Init(&sha) ||
+ !SHA1_Update(&sha, v, v_len) ||
+ !SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+
+ return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target)
+{
+ SHA_CTX sha;
+
+ ks_assert(k);
+ ks_assert(target);
+
+
+ if (!SHA1_Init(&sha) ||
+ !SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL;
+ if (salt) {
+ const uint8_t *s = (const uint8_t *)ben_str_val(salt);
+ size_t s_len = ben_str_len(salt);
+ if (s_len > 0 && !SHA1_Update(&sha, s, s_len)) return KS_STATUS_FAIL;
+ }
+ if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+
+ return KS_STATUS_SUCCESS;
+}
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_mutex_unlock(dht->tid_mutex);
if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
-
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
// if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
tid = (uint32_t *)message->transactionid;
transactionid = ntohl(*tid);
+ ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
message->transaction = transaction;
if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
+ transaction->job->response = NULL; // message is destroyed after we return, stop using it
transaction->finished = KS_TRUE;
}
jobn = job->next;
switch (job->state) {
case KS_DHT_JOB_STATE_QUERYING:
+ job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
break;
case KS_DHT_JOB_STATE_RESPONDING:
&message,
&a)) != KS_STATUS_SUCCESS) goto done;
- //memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE);
- transaction->target = job->target;
+ //memcpy(transaction->target.id, job->query_target.id, KS_DHT_NODEID_SIZE);
+ //transaction->target = job->query_target;
- ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
// Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
- if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) {
+ if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->query_target.id, KS_DHT_NODEID_SIZE)) {
struct bencode *want = ben_list();
ben_list_append_str(want, "n4");
ben_list_append_str(want, "n6");
ks_assert(job);
n = ben_dict_get_by_str(job->response->args, "nodes");
- if (n) {
+ if (n && dht->rt_ipv4) {
//n4 = KS_TRUE;
nodes = (const uint8_t *)ben_str_val(n);
nodes_size = ben_str_len(n);
}
n = ben_dict_get_by_str(job->response->args, "nodes6");
- if (n) {
+ if (n && dht->rt_ipv6) {
//n6 = KS_TRUE;
nodes6 = (const uint8_t *)ben_str_val(n);
nodes6_size = ben_str_len(n);
searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
ks_hash_read_lock(searches);
- search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED);
+ search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED);
if (search) {
ks_dht_search_pending_t *pending = NULL;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
- ks_dhtrt_release_node(node);
+ job->response_nodes[job->response_nodes_count++] = node;
+ // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
- ks_dhtrt_release_node(node);
+ job->response_nodes6[job->response_nodes6_count++] = node;
+ // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
+ job->state = KS_DHT_JOB_STATE_COMPLETING;
+
done:
if(search) ks_mutex_unlock(search->mutex);
return ret;
}
-// @todo ks_dht_get
+
+KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ ks_dht_nodeid_t *target,
+ uint8_t *salt,
+ ks_size_t salt_length)
+{
+ ks_dht_job_t *job = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(target);
+
+ if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
+ ks_dht_jobs_add(dht, job);
+
+ done:
+ return ret;
+}
KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
{
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
- ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
- item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED);
+ ks_hash_read_lock(dht->storageitems_hash);
+ item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
ks_hash_read_unlock(dht->storageitems_hash);
sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job)
{
- ks_dht_token_t *token;
+ ks_dht_storageitem_t *item = NULL;
+ ks_dht_token_t *token = NULL;
+ ks_dht_storageitem_key_t *k = NULL;
+ ks_dht_storageitem_signature_t *sig = NULL;
+ struct bencode *seq;
+ int64_t sequence = -1;
+ struct bencode *v = NULL;
+ //ks_size_t v_len = 0;
+ struct bencode *n;
+ ks_dht_node_t *node = NULL;
+ const uint8_t *nodes = NULL;
+ const uint8_t *nodes6 = NULL;
+ size_t nodes_size = 0;
+ size_t nodes6_size = 0;
+ size_t nodes_len = 0;
+ size_t nodes6_len = 0;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_bool_t storageitems_locked = KS_FALSE;
+ ks_dht_storageitem_t *olditem = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(job);
- // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+ job->response_token = *token;
+
+ if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
+
+ seq = ben_dict_get_by_str(job->response->args, "seq");
+ if (seq) sequence = ben_int_val(seq);
+
+ if (seq && ((k && !sig) || (!k && sig))) {
+ ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data");
+ ret = KS_STATUS_ARG_INVALID;
+ goto done;
+ }
- // @todo add extract function for mutable ks_dht_storageitem_key_t
- // @todo add extract function for mutable ks_dht_storageitem_signature_t
+ v = ben_dict_get_by_str(job->response->args, "v");
+ //if (v) v_len = ben_str_len(v);
- // @todo add/touch bucket entries for other nodes/nodes6 returned
+ n = ben_dict_get_by_str(job->response->args, "nodes");
+ if (n && dht->rt_ipv4) {
+ nodes = (const uint8_t *)ben_str_val(n);
+ nodes_size = ben_str_len(n);
+ }
+ n = ben_dict_get_by_str(job->response->args, "nodes6");
+ if (n && dht->rt_ipv6) {
+ nodes6 = (const uint8_t *)ben_str_val(n);
+ nodes6_size = ben_str_len(n);
+ }
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+ while (nodes_len < nodes_size) {
+ ks_dht_nodeid_t nid;
+ ks_sockaddr_t addr;
+
+ addr.family = AF_INET;
+ if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
+
+ ks_log(KS_LOG_DEBUG,
+ "Expanded ipv4 nodeinfo for %s (%s %d)\n",
+ ks_dht_hexid(&nid, id_buf),
+ addr.host,
+ addr.port);
+
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+ ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
+ job->response_nodes[job->response_nodes_count++] = node;
+ }
+ while (nodes6_len < nodes6_size) {
+ ks_dht_nodeid_t nid;
+ ks_sockaddr_t addr;
+
+ addr.family = AF_INET6;
+ if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
+
+ ks_log(KS_LOG_DEBUG,
+ "Expanded ipv6 nodeinfo for %s (%s %d)\n",
+ ks_dht_hexid(&nid, id_buf),
+ addr.host,
+ addr.port);
+
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+ ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
+ job->response_nodes6[job->response_nodes6_count++] = node;
+ }
+
+ ks_hash_write_lock(dht->storageitems_hash);
+ storageitems_locked = KS_TRUE;
+ olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+
+ if (v) {
+ ks_dht_nodeid_t tmptarget;
+
+ if (!seq) {
+ // immutable
+ if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+ if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
+ ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+ if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+ else if ((ret = ks_dht_storageitem_create_immutable(&item,
+ dht->pool,
+ &tmptarget,
+ v)) != KS_STATUS_SUCCESS) goto done;
+ } else {
+ // mutable
+ struct bencode *tmp = NULL;
+ uint8_t *tmpsig = NULL;
+ size_t tmpsig_len = 0;
+ int32_t res = 0;
+
+ if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+ if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
+ ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ tmp = ben_dict();
+ if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt));
+ ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq));
+ ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v));
+ tmpsig = ben_encode(&tmpsig_len, tmp);
+ ben_free(tmp);
+
+ res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key);
+
+ free(tmpsig);
+
+ if (res) {
+ ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n");
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ if (olditem) {
+ if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+ else {
+ ks_hash_remove(dht->storageitems_hash, olditem->id.id);
+ olditem = NULL;
+ }
+ }
+ if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item,
+ dht->pool,
+ &tmptarget,
+ v,
+ k,
+ job->query_salt,
+ sequence,
+ sig)) != KS_STATUS_SUCCESS) goto done;
+ }
+ if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
+ } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+
+ if (item) job->response_storageitem = item;
+ else if (olditem) job->response_storageitem = olditem;
+
+ job->state = KS_DHT_JOB_STATE_COMPLETING;
+
+ done:
+ if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
+ if (ret != KS_STATUS_SUCCESS) {
+ if (item) ks_dht_storageitem_destroy(&item);
+ }
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
+ const ks_sockaddr_t *raddr,
+ ks_dht_job_callback_t callback,
+ ks_dht_nodeid_t *target,
+ uint8_t *salt,
+ ks_size_t salt_length)
+{
+ ks_dht_job_t *job = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(target);
+
+ if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+ ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length);
+ ks_dht_jobs_add(dht, job);
+
done:
return ret;
}
-// @todo ks_dht_put
-// @todo ks_dht_query_put
+KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ ks_dht_message_t *message = NULL;
+ struct bencode *a = NULL;
+
+ ks_assert(dht);
+ ks_assert(job);
+
+ if (ks_dht_query_setup(dht,
+ job,
+ "put",
+ ks_dht_process_response_put,
+ NULL,
+ &message,
+ &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+
+
+ ks_log(KS_LOG_DEBUG, "Sending message query put\n");
+ ks_q_push(dht->send_q, (void *)message);
+
+ return KS_STATUS_SUCCESS;
+}
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
+ job->state = KS_DHT_JOB_STATE_COMPLETING;
+
// done:
return ret;
}