]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Started working on "put", ran into a bug in job states which is fixed now...
authorShane Bryldt <astaelan@gmail.com>
Tue, 20 Dec 2016 22:07:11 +0000 (22:07 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:37 +0000 (14:59 -0600)
libs/libks/Makefile.am
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/src/dht/ks_dht_job.c
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_search.c
libs/libks/src/dht/ks_dht_storageitem.c
libs/libks/src/dht/ks_dht_transaction.c
libs/libks/test/testdht2.c

index ce21d3d898028e4a08729a4d26e229333b122c46..6d8c5c466fe4a56bb761b4a4e18d54e488435be4 100644 (file)
@@ -3,7 +3,7 @@ EXTRA_DIST =
 SUBDIRS = . test
 AUTOMAKE_OPTIONS = subdir-objects
 
-AM_CFLAGS    += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt
+AM_CFLAGS    += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt -O0
 AM_CPPFLAGS  = $(AM_CFLAGS)
 
 lib_LTLIBRARIES          = libks.la
index 166af867c1357c285369199204049cd22f309feb..0c475a16ffff0a035b486effbf56a7f485f04201 100644 (file)
@@ -218,6 +218,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht);
 KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job);
 KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job);
 KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job);
+KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job);
 
 KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data);
 
@@ -260,6 +261,18 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
                                                                                   ks_dht_job_callback_t query_callback,
                                                                                   ks_dht_job_callback_t finish_callback,
                                                                                   ks_dht_nodeid_t *target);
+KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
+                                                                         ks_dht_job_callback_t query_callback,
+                                                                         ks_dht_job_callback_t finish_callback,
+                                                                         ks_dht_nodeid_t *target,
+                                                                         uint8_t *salt,
+                                                                         ks_size_t salt_length);
+KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
+                                                                         ks_dht_job_callback_t query_callback,
+                                                                         ks_dht_job_callback_t finish_callback,
+                                                                         ks_dht_nodeid_t *target,
+                                                                         uint8_t *salt,
+                                                                         ks_size_t salt_length);
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
 
 
@@ -290,13 +303,13 @@ KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v);
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v);
 KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
                                                                                                                  ks_pool_t *pool,
+                                                                                                                 ks_dht_nodeid_t *target,
                                                                                                                  struct bencode *v,
                                                                                                                  ks_dht_storageitem_key_t *k,
-                                                                                                                 uint8_t *salt,
-                                                                                                                 ks_size_t salt_length,
+                                                                                                                 struct bencode *salt,
                                                                                                                  int64_t sequence,
                                                                                                                  ks_dht_storageitem_signature_t *signature);
 KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);
index ff457c4078fa4433b7ef36df8721a0965f548fac..b14a151237dd312cb170727a45a5336f48ff2d6e 100644 (file)
@@ -102,7 +102,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Default expirations to not be checked for one pulse.
         */
-       d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000);
+       d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
 
        /**
         * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
@@ -171,12 +171,12 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
         */
        d->token_secret_current = d->token_secret_previous = rand();
-       d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
+       d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
 
        /**
         * Create the hash to store arbitrary data for BEP44.
         */
-       ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
        ks_assert(d->storageitems_hash);
 
        /**
@@ -678,7 +678,7 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
        ks_assert(dht);
 
        if (dht->pulse_expirations > now) return;
-       dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000);
+       dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
 
        ks_hash_write_lock(dht->transactions_hash);
        for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
@@ -689,9 +689,9 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
                ks_hash_this(it, &key, NULL, (void **)&value);
                if (value->finished) remove = KS_TRUE;
                else if (value->expiration <= now) {
-                       // if the transaction expires, so does the attached job, it may try again with a new transaction
+                       // if the transaction expires, so does the attached job, but the job may try again with a new transaction
                        value->job->state = KS_DHT_JOB_STATE_EXPIRING;
-                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration);
                        remove = KS_TRUE;
                }
                if (remove) {
@@ -704,8 +704,10 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
        if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
        if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
 
+       // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
+
        if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
-               dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
+               dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
                dht->token_secret_previous = dht->token_secret_current;
                dht->token_secret_current = rand();
        }
@@ -931,6 +933,82 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args,
+                                                                                                                          ks_bool_t optional,
+                                                                                                                          const char *key,
+                                                                                                                          ks_dht_storageitem_key_t **sikey)
+{
+       struct bencode *k;
+       const char *kv;
+       ks_size_t kv_len;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(args);
+       ks_assert(key);
+       ks_assert(sikey);
+
+       *sikey = NULL;
+
+       k = ben_dict_get_by_str(args, key);
+       if (!k) {
+               if (!optional) {
+                       ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
+                       ret = KS_STATUS_ARG_INVALID;
+               }
+               goto done;
+       }
+
+    kv = ben_str_val(k);
+       kv_len = ben_str_len(k);
+    if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) {
+               ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len);
+               return KS_STATUS_ARG_INVALID;
+       }
+
+       *sikey = (ks_dht_storageitem_key_t *)kv;
+
+ done:
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_signature(struct bencode *args,
+                                                                                                                                        ks_bool_t optional,
+                                                                                                                                        const char *key,
+                                                                                                                                        ks_dht_storageitem_signature_t **signature)
+{
+       struct bencode *sig;
+       const char *sigv;
+       ks_size_t sigv_len;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(args);
+       ks_assert(key);
+       ks_assert(signature);
+
+       *signature = NULL;
+
+       sig = ben_dict_get_by_str(args, key);
+       if (!sig) {
+               if (!optional) {
+                       ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
+                       ret = KS_STATUS_ARG_INVALID;
+               }
+               goto done;
+       }
+
+    sigv = ben_str_val(sig);
+       sigv_len = ben_str_len(sig);
+    if (sigv_len != KS_DHT_STORAGEITEM_SIGNATURE_SIZE) {
+               ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, sigv_len);
+               return KS_STATUS_ARG_INVALID;
+       }
+
+       *signature = (ks_dht_storageitem_signature_t *)sigv;
+
+ done:
+       return ret;
+}
+
 
 KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
 {
@@ -968,6 +1046,44 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *ra
        return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target)
+{
+       SHA_CTX sha;
+       const uint8_t *v;
+       size_t v_len;
+
+       ks_assert(value);
+       ks_assert(target);
+
+       v = (const uint8_t *)ben_str_val(value);
+       v_len = ben_str_len(value);
+       
+       if (!SHA1_Init(&sha) ||
+               !SHA1_Update(&sha, v, v_len) ||
+               !SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target)
+{
+       SHA_CTX sha;
+
+       ks_assert(k);
+       ks_assert(target);
+
+       
+       if (!SHA1_Init(&sha) ||
+               !SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL;
+       if (salt) {
+               const uint8_t *s = (const uint8_t *)ben_str_val(salt);
+               size_t s_len = ben_str_len(salt);
+               if (s_len > 0 && !SHA1_Update(&sha, s, s_len)) return KS_STATUS_FAIL;
+       }
+       if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+
+       return KS_STATUS_SUCCESS;
+}
 
 KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
 {
@@ -1026,7 +1142,6 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
        ks_mutex_unlock(dht->tid_mutex);
 
        if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
-
        if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
 
        //      if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
@@ -1248,6 +1363,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
        tid = (uint32_t *)message->transactionid;
        transactionid = ntohl(*tid);
 
+       ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid);
        transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
        ks_hash_read_unlock(dht->transactions_hash);
 
@@ -1264,6 +1380,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
                message->transaction = transaction;
                if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
+               transaction->job->response = NULL; // message is destroyed after we return, stop using it
                transaction->finished = KS_TRUE;
        }
 
@@ -1523,6 +1640,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
                jobn = job->next;
                switch (job->state) {
                case KS_DHT_JOB_STATE_QUERYING:
+                       job->state = KS_DHT_JOB_STATE_RESPONDING;
                        if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
                        break;
                case KS_DHT_JOB_STATE_RESPONDING:
@@ -1675,12 +1793,12 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
                                                                  &message,
                                                                  &a)) != KS_STATUS_SUCCESS) goto done;
 
-       //memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE);
-       transaction->target = job->target;
+       //memcpy(transaction->target.id, job->query_target.id, KS_DHT_NODEID_SIZE);
+       //transaction->target = job->query_target;
 
-       ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
        // Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
-       if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) {
+       if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->query_target.id, KS_DHT_NODEID_SIZE)) {
                struct bencode *want = ben_list();
                ben_list_append_str(want, "n4");
                ben_list_append_str(want, "n6");
@@ -1812,13 +1930,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
        ks_assert(job);
 
        n = ben_dict_get_by_str(job->response->args, "nodes");
-       if (n) {
+       if (n && dht->rt_ipv4) {
                //n4 = KS_TRUE;
                nodes = (const uint8_t *)ben_str_val(n);
                nodes_size = ben_str_len(n);
        }
        n = ben_dict_get_by_str(job->response->args, "nodes6");
-       if (n) {
+       if (n && dht->rt_ipv6) {
                //n6 = KS_TRUE;
                nodes6 = (const uint8_t *)ben_str_val(n);
                nodes6_size = ben_str_len(n);
@@ -1827,7 +1945,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
        searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
 
        ks_hash_read_lock(searches);
-       search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED);
+       search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED);
        if (search) {
                ks_dht_search_pending_t *pending = NULL;
 
@@ -1852,8 +1970,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
                ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
                ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
-               ks_dhtrt_release_node(node);
+               job->response_nodes[job->response_nodes_count++] = node;
 
+               // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
                if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
                        ks_dht_nodeid_t distance;
                        int32_t results_index = -1;
@@ -1913,8 +2032,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
                ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
                ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
-               ks_dhtrt_release_node(node);
+               job->response_nodes6[job->response_nodes6_count++] = node;
 
+               // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
                if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
                        ks_dht_nodeid_t distance;
                        int32_t results_index = -1;
@@ -1961,12 +2081,35 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
        ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
 
+       job->state = KS_DHT_JOB_STATE_COMPLETING;
+
  done:
        if(search) ks_mutex_unlock(search->mutex);
        return ret;
 }
 
-// @todo ks_dht_get
+
+KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
+                                                                  const ks_sockaddr_t *raddr,
+                                                                  ks_dht_job_callback_t callback,
+                                                                  ks_dht_nodeid_t *target,
+                                                                  uint8_t *salt,
+                                                                  ks_size_t salt_length)
+{
+       ks_dht_job_t *job = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(target);
+
+       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
+       ks_dht_jobs_add(dht, job);
+
+ done:
+       return ret;
+}
 
 KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
 {
@@ -1985,7 +2128,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
                                                   &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
-       ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message query get\n");
        ks_q_push(dht->send_q, (void *)message);
@@ -2024,7 +2167,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
 
        ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
 
-       item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED);
+       ks_hash_read_lock(dht->storageitems_hash);
+       item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
        ks_hash_read_unlock(dht->storageitems_hash);
 
        sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
@@ -2100,28 +2244,224 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
 
 KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_dht_token_t *token;
+       ks_dht_storageitem_t *item = NULL;
+       ks_dht_token_t *token = NULL;
+       ks_dht_storageitem_key_t *k = NULL;
+       ks_dht_storageitem_signature_t *sig = NULL;
+       struct bencode *seq;
+       int64_t sequence = -1;
+       struct bencode *v = NULL;
+       //ks_size_t v_len = 0;
+       struct bencode *n;
+       ks_dht_node_t *node = NULL;
+       const uint8_t *nodes = NULL;
+       const uint8_t *nodes6 = NULL;
+       size_t nodes_size = 0;
+       size_t nodes6_size = 0;
+       size_t nodes_len = 0;
+       size_t nodes6_len = 0;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_bool_t storageitems_locked = KS_FALSE;
+       ks_dht_storageitem_t *olditem = NULL;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(job);
 
-       // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
        if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+       job->response_token = *token;
+
+       if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
+
+       seq = ben_dict_get_by_str(job->response->args, "seq");
+       if (seq) sequence = ben_int_val(seq);
+
+       if (seq && ((k && !sig) || (!k && sig))) {
+               ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data");
+               ret = KS_STATUS_ARG_INVALID;
+               goto done;
+       }
 
-       // @todo add extract function for mutable ks_dht_storageitem_key_t
-       // @todo add extract function for mutable ks_dht_storageitem_signature_t
+       v = ben_dict_get_by_str(job->response->args, "v");
+       //if (v) v_len = ben_str_len(v);
 
-       // @todo add/touch bucket entries for other nodes/nodes6 returned
+       n = ben_dict_get_by_str(job->response->args, "nodes");
+       if (n && dht->rt_ipv4) {
+               nodes = (const uint8_t *)ben_str_val(n);
+               nodes_size = ben_str_len(n);
+       }
+       n = ben_dict_get_by_str(job->response->args, "nodes6");
+       if (n && dht->rt_ipv6) {
+               nodes6 = (const uint8_t *)ben_str_val(n);
+               nodes6_size = ben_str_len(n);
+       }
 
        ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
 
+       while (nodes_len < nodes_size) {
+               ks_dht_nodeid_t nid;
+               ks_sockaddr_t addr;
+
+               addr.family = AF_INET;
+               if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
+
+               ks_log(KS_LOG_DEBUG,
+                          "Expanded ipv4 nodeinfo for %s (%s %d)\n",
+                          ks_dht_hexid(&nid, id_buf),
+                          addr.host,
+                          addr.port);
+
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
+               job->response_nodes[job->response_nodes_count++] = node;
+       }
+       while (nodes6_len < nodes6_size) {
+               ks_dht_nodeid_t nid;
+               ks_sockaddr_t addr;
+
+               addr.family = AF_INET6;
+               if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
+
+               ks_log(KS_LOG_DEBUG,
+                          "Expanded ipv6 nodeinfo for %s (%s %d)\n",
+                          ks_dht_hexid(&nid, id_buf),
+                          addr.host,
+                          addr.port);
+
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
+               job->response_nodes6[job->response_nodes6_count++] = node;
+       }
+       
+       ks_hash_write_lock(dht->storageitems_hash);
+       storageitems_locked = KS_TRUE;
+       olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+
+       if (v) {
+               ks_dht_nodeid_t tmptarget;
+
+               if (!seq) {
+                       // immutable
+                       if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+                       if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
+                               ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
+                               ret = KS_STATUS_FAIL;
+                               goto done;
+                       }
+                       if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+                       else if ((ret = ks_dht_storageitem_create_immutable(&item,
+                                                                                                                                 dht->pool,
+                                                                                                                                 &tmptarget,
+                                                                                                                                 v)) != KS_STATUS_SUCCESS) goto done;
+               } else {
+                       // mutable
+                       struct bencode *tmp = NULL;
+                       uint8_t *tmpsig = NULL;
+                       size_t tmpsig_len = 0;
+                       int32_t res = 0;
+
+                       if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+                       if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
+                               ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
+                               ret = KS_STATUS_FAIL;
+                               goto done;
+                       }
+
+                       tmp = ben_dict();
+                       if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt));
+                       ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq));
+                       ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v));
+                       tmpsig = ben_encode(&tmpsig_len, tmp);
+                       ben_free(tmp);
+
+                       res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key);
+
+                       free(tmpsig);
+
+                       if (res) {
+                               ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n");
+                               ret = KS_STATUS_FAIL;
+                               goto done;
+                       }
+                       
+                       if (olditem) {
+                               if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+                               else {
+                                       ks_hash_remove(dht->storageitems_hash, olditem->id.id);
+                                       olditem = NULL;
+                               }
+                       }
+                       if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item,
+                                                                                                                                        dht->pool,
+                                                                                                                                        &tmptarget,
+                                                                                                                                        v,
+                                                                                                                                        k,
+                                                                                                                                        job->query_salt,
+                                                                                                                                        sequence,
+                                                                                                                                        sig)) != KS_STATUS_SUCCESS) goto done;
+               }
+               if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
+       } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+
+       if (item) job->response_storageitem = item;
+       else if (olditem) job->response_storageitem = olditem;
+
+       job->state = KS_DHT_JOB_STATE_COMPLETING;
+
+ done:
+       if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
+       if (ret != KS_STATUS_SUCCESS) {
+               if (item) ks_dht_storageitem_destroy(&item);
+       }
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
+                                                                  const ks_sockaddr_t *raddr,
+                                                                  ks_dht_job_callback_t callback,
+                                                                  ks_dht_nodeid_t *target,
+                                                                  uint8_t *salt,
+                                                                  ks_size_t salt_length)
+{
+       ks_dht_job_t *job = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(target);
+
+       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length);
+       ks_dht_jobs_add(dht, job);
+
  done:
        return ret;
 }
 
-// @todo ks_dht_put
-// @todo ks_dht_query_put
+KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_dht_message_t *message = NULL;
+       struct bencode *a = NULL;
+
+       ks_assert(dht);
+       ks_assert(job);
+
+       if (ks_dht_query_setup(dht,
+                                                  job,
+                                                  "put",
+                                                  ks_dht_process_response_put,
+                                                  NULL,
+                                                  &message,
+                                                  &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+
+
+       ks_log(KS_LOG_DEBUG, "Sending message query put\n");
+       ks_q_push(dht->send_q, (void *)message);
+
+       return KS_STATUS_SUCCESS;
+}
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
 {
@@ -2159,6 +2499,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
 
        ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
+       job->state = KS_DHT_JOB_STATE_COMPLETING;
+
        // done:
        return ret;
 }
index 4e4f07456b345d1d7b2fccef90429e6daa40649b..64fd8937af0eeb957a1de89e7de5e7b9313efd64 100644 (file)
@@ -22,6 +22,8 @@ KS_BEGIN_EXTERN_C
 
 #define KS_DHT_NODEID_SIZE 20
 
+#define KS_DHT_RESPONSE_NODES_MAX_SIZE 8
+
 #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
@@ -34,6 +36,7 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
 #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
 #define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
+#define KS_DHT_STORAGEITEM_EXPIRATION 7200
 
 #define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
 #define KS_DHT_TOKENSECRET_EXPIRATION 300
@@ -92,6 +95,10 @@ struct ks_dht_node_s {
     ks_rwl_t        *reflock;          
 };
 
+struct ks_dht_token_s {
+       uint8_t token[KS_DHT_TOKEN_SIZE];
+};
+
 enum ks_dht_job_state_t {
        KS_DHT_JOB_STATE_QUERYING,
        KS_DHT_JOB_STATE_RESPONDING,
@@ -124,7 +131,16 @@ struct ks_dht_job_s {
        //ks_dht_nodeid_t response_id;
 
        // job specific query parameters
-       ks_dht_nodeid_t target;
+       ks_dht_nodeid_t query_target;
+       struct bencode *query_salt;
+
+       // job specific response parameters
+       ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
+       ks_size_t response_nodes_count;
+       ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
+       ks_size_t response_nodes6_count;
+       ks_dht_token_t response_token;
+       ks_dht_storageitem_t *response_storageitem;
 };
 
 struct ks_dhtrt_routetable_s {
@@ -142,10 +158,6 @@ struct ks_dhtrt_querynodes_s {
     ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
 };
 
-struct ks_dht_token_s {
-       uint8_t token[KS_DHT_TOKEN_SIZE];
-};
-
 struct ks_dht_storageitem_key_s {
        uint8_t key[KS_DHT_STORAGEITEM_KEY_SIZE];
 };
@@ -181,7 +193,7 @@ struct ks_dht_transaction_s {
        ks_pool_t *pool;
        ks_dht_job_t *job;
        uint32_t transactionid;
-       ks_dht_nodeid_t target; // @todo look at moving this into job now
+       //ks_dht_nodeid_t target; // @todo look at moving this into job now
        ks_dht_job_callback_t callback;
        ks_time_t expiration;
        ks_bool_t finished;
@@ -209,14 +221,13 @@ struct ks_dht_search_pending_s {
 struct ks_dht_storageitem_s {
        ks_pool_t *pool;
        ks_dht_nodeid_t id;
-       // @todo ks_time_t expiration;
+       ks_time_t expiration;
        struct bencode *v;
        
        ks_bool_t mutable;
        ks_dht_storageitem_key_t pk;
        ks_dht_storageitem_key_t sk;
-       uint8_t salt[KS_DHT_STORAGEITEM_SALT_MAX_SIZE];
-       ks_size_t salt_length;
+       struct bencode *salt;
        int64_t seq;
        ks_dht_storageitem_signature_t sig;
 };
@@ -352,6 +363,12 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
 
 KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
 KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
+KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
+                                                                  const ks_sockaddr_t *raddr,
+                                                                  ks_dht_job_callback_t callback,
+                                                                  ks_dht_nodeid_t *target,
+                                                                  uint8_t *salt,
+                                                                  ks_size_t salt_length);
                                                
 /**
  * Create a network search of the closest nodes to a target.
index a04717ada8d347ac0901c107b522a4189b44ba25..f0d2bbadd0336dd1853d1738609b0887a2ebd883 100644 (file)
@@ -1469,6 +1469,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
                   ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
 #endif
     ks_dht_node_t* node = entry->gptr;
+       ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port);
     ks_dht_ping(internal->dht, &node->addr, NULL);
 
        return;
index c99556aa39a46637c63b4fa6d79d69a5e1860324..d92b5a62a730c771fc57770bdb2362d5a92897d8 100644 (file)
@@ -38,6 +38,7 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
 KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback)
 {
        ks_assert(job);
+       ks_assert(query_callback);
 
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
@@ -49,11 +50,46 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
                                                                                   ks_dht_nodeid_t *target)
 {
        ks_assert(job);
+       ks_assert(query_callback);
        ks_assert(target);
 
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
-       job->target = *target;
+       job->query_target = *target;
+}
+
+KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
+                                                                         ks_dht_job_callback_t query_callback,
+                                                                         ks_dht_job_callback_t finish_callback,
+                                                                         ks_dht_nodeid_t *target,
+                                                                         uint8_t *salt,
+                                                                         ks_size_t salt_length)
+{
+       ks_assert(job);
+       ks_assert(query_callback);
+       ks_assert(target);
+
+       job->query_callback = query_callback;
+       job->finish_callback = finish_callback;
+       job->query_target = *target;
+       if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
+}
+
+KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
+                                                                         ks_dht_job_callback_t query_callback,
+                                                                         ks_dht_job_callback_t finish_callback,
+                                                                         ks_dht_nodeid_t *target,
+                                                                         uint8_t *salt,
+                                                                         ks_size_t salt_length)
+{
+       ks_assert(job);
+       ks_assert(query_callback);
+       ks_assert(target);
+
+       job->query_callback = query_callback;
+       job->finish_callback = finish_callback;
+       job->query_target = *target;
+       if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
 }
 
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
@@ -65,6 +101,10 @@ KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
 
        j = *job;
 
+       if (j->query_salt) ben_free(j->query_salt);
+       for (int32_t i = 0; i < j->response_nodes_count; ++i) ks_dhtrt_release_node(j->response_nodes[i]);
+       for (int32_t i = 0; i < j->response_nodes6_count; ++i) ks_dhtrt_release_node(j->response_nodes6[i]);
+
        ks_pool_free(j->pool, job);
 }
 
index 7997b41c0480ca5946ca30c79cf53f9eeff83061..e9385cb74be0e8cdb4f098871e0a0fb37f6fffa0 100644 (file)
@@ -88,7 +88,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
        memcpy(message->transactionid, tv, tv_len);
        message->transactionid_length = tv_len;
        // @todo hex output of transactionid
-       //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid);
+       //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", message->transactionid);
 
     y = ben_dict_get_by_str(message->data, "y");
        if (!y) {
index e15d726f3ba296769c8fd66cbca1f76e49333f74..3dc838545c66c2cd3e38e3a0590c5c24e7eb6709 100644 (file)
@@ -92,7 +92,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p
 
        p->pool = pool;
        p->nodeid = *nodeid;
-       p->expiration = ks_time_now() + (KS_DHT_SEARCH_EXPIRATION * 1000);
+       p->expiration = ks_time_now() + ((ks_time_t)KS_DHT_SEARCH_EXPIRATION * KS_USEC_PER_SEC);
        p->finished = KS_FALSE;
 
        // done:
index ce34bde9f9220587f86b503151dbf72da5f7dd6e..f34783c28fe70d5b73c4f4161d256dc08e1d4e7e 100644 (file)
@@ -2,12 +2,9 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
-KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v)
 {
        ks_dht_storageitem_t *si;
-       SHA_CTX sha;
-       size_t enc_len = 0;
-       uint8_t *enc = NULL;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(item);
@@ -19,16 +16,18 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
        ks_assert(si);
 
        si->pool = pool;
+       si->id = *target;
        si->mutable = KS_FALSE;
+       si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
        si->v = ben_clone(v);
        ks_assert(si->v);
-       
-       enc = ben_encode(&enc_len, si->v);
-       ks_assert(enc);
-       SHA1_Init(&sha);
-       SHA1_Update(&sha, enc, enc_len);
-       SHA1_Final(si->id.id, &sha);
-       free(enc);
+
+       //enc = ben_encode(&enc_len, si->v);
+       //ks_assert(enc);
+       //SHA1_Init(&sha);
+       //SHA1_Update(&sha, enc, enc_len);
+       //SHA1_Final(si->id.id, &sha);
+       //free(enc);
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -39,15 +38,14 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
 
 KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
                                                                                                                  ks_pool_t *pool,
+                                                                                                                 ks_dht_nodeid_t *target,
                                                                                                                  struct bencode *v,
                                                                                                                  ks_dht_storageitem_key_t *k,
-                                                                                                                 uint8_t *salt,
-                                                                                                                 ks_size_t salt_length,
+                                                                                                                 struct bencode *salt,
                                                                                                                  int64_t sequence,
                                                                                                                  ks_dht_storageitem_signature_t *signature)
 {
        ks_dht_storageitem_t *si;
-       SHA_CTX sha;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(item);
@@ -55,30 +53,25 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
        ks_assert(v);
        ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
        ks_assert(k);
-       ks_assert(!(!salt && salt_length > 0));
-       ks_assert(!(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
        ks_assert(signature);
 
        *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
        ks_assert(si);
 
        si->pool = pool;
+       si->id = *target;
        si->mutable = KS_TRUE;
+       si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
        si->v = ben_clone(v);
        ks_assert(si->v);
 
-       memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE);
-       if (salt && salt_length > 0) {
-               memcpy(si->salt, salt, salt_length);
-               si->salt_length = salt_length;
+       si->pk = *k;
+       if (salt) {
+               si->salt = ben_clone(salt);
+               ks_assert(si->salt);
        }
        si->seq = sequence;
-       memcpy(si->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
-
-       SHA1_Init(&sha);
-       SHA1_Update(&sha, si->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
-       if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length);
-       SHA1_Final(si->id.id, &sha);
+       si->sig = *signature;
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -103,6 +96,10 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
                ben_free(si->v);
                si->v = NULL;
        }
+       if (si->salt) {
+               ben_free(si->salt);
+               si->salt = NULL;
+       }
 
        ks_pool_free(si->pool, item);
 }
index 0f0458329cec1a3c52026932e930bac891a81177..39dcd6a4d1b60b218594f1f12e9b1de215e85a7f 100644 (file)
@@ -21,7 +21,7 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac
        t->job = job;
        t->transactionid = transactionid;
        t->callback = callback;
-       t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000);
+       t->expiration = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTION_EXPIRATION * KS_USEC_PER_SEC);
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
index d160a0cc45bb13d49f9a58e3de8039fcd482e4c7..7472c41ce7916bfe12f14882bbb551458bc32dbe 100644 (file)
@@ -132,7 +132,6 @@ int main() {
   //err = ks_dht_process(dht1, &raddr);
   //ok(err == KS_STATUS_SUCCESS);
 
-  
   diag("Ping test\n");
   
   //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
@@ -163,7 +162,6 @@ int main() {
 
   diag("Find_Node test\n");
 
-  //ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
   ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
 
   ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1