]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Bug fixes and exposed interface changes while implementing tests for get...
authorShane Bryldt <astaelan@gmail.com>
Tue, 27 Dec 2016 04:27:35 +0000 (04:27 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_job.c
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_search.c
libs/libks/src/dht/ks_dht_storageitem.c
libs/libks/test/testdht2.c

index 0c475a16ffff0a035b486effbf56a7f485f04201..89e3643ee53bebbc818a6f8171a85dc2dbace9c4 100644 (file)
@@ -41,7 +41,7 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht);
  * @param buffer pointer to the buffer able to contain at least (KS_DHT_NODEID_SIZE * 2) + 1 characters
  * @return The pointer to the front of the populated string buffer
  */
-KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer);
+KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len);
 
 /**
  * Compacts address information as per the DHT specifications.
@@ -270,9 +270,9 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
 KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
                                                                          ks_dht_job_callback_t query_callback,
                                                                          ks_dht_job_callback_t finish_callback,
-                                                                         ks_dht_nodeid_t *target,
-                                                                         uint8_t *salt,
-                                                                         ks_size_t salt_length);
+                                                                         ks_dht_token_t *token,
+                                                                         int64_t cas,
+                                                                         ks_dht_storageitem_t *item);
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
 
 
@@ -286,7 +286,33 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
                                                                                           ks_socket_t sock);
 KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
+                                                                                         ks_pool_t *pool,
+                                                                                         ks_dht_endpoint_t *endpoint,
+                                                                                         const ks_sockaddr_t *raddr,
+                                                                                         ks_bool_t alloc_data);
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
+                                                                                               uint8_t *transactionid,
+                                                                                               ks_size_t transactionid_length,
+                                                                                               struct bencode **args);
+
+                                                                                                                                                                                               
 /**
  *
  */
@@ -299,19 +325,50 @@ KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pendin
 KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid);
 KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target);
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v);
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_storageitem_t **item,
+                                                                                                                                        ks_pool_t *pool,
+                                                                                                                                        ks_dht_nodeid_t *target,
+                                                                                                                                        struct bencode *v,
+                                                                                                                                        ks_bool_t clone_v);
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item,
+                                                                                                                       ks_pool_t *pool,
+                                                                                                                       ks_dht_nodeid_t *target,
+                                                                                                                       const uint8_t *value,
+                                                                                                                       ks_size_t value_length);
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storageitem_t **item,
+                                                                                                                                  ks_pool_t *pool,
+                                                                                                                                  ks_dht_nodeid_t *target,
+                                                                                                                                  struct bencode *v,
+                                                                                                                                  ks_bool_t clone_v,
+                                                                                                                                  ks_dht_storageitem_pkey_t *pk,
+                                                                                                                                  struct bencode *salt,
+                                                                                                                                  ks_bool_t clone_salt,
+                                                                                                                                  int64_t sequence,
+                                                                                                                                  ks_dht_storageitem_signature_t *signature);
 KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
                                                                                                                  ks_pool_t *pool,
                                                                                                                  ks_dht_nodeid_t *target,
-                                                                                                                 struct bencode *v,
-                                                                                                                 ks_dht_storageitem_key_t *k,
-                                                                                                                 struct bencode *salt,
+                                                                                                                 const uint8_t *value,
+                                                                                                                 ks_size_t value_length,
+                                                                                                                 ks_dht_storageitem_pkey_t *pk,
+                                                                                                                 const uint8_t *salt,
+                                                                                                                 ks_size_t salt_length,
                                                                                                                  int64_t sequence,
                                                                                                                  ks_dht_storageitem_signature_t *signature);
+KS_DECLARE(void) ks_dht_storageitem_update_mutable(ks_dht_storageitem_t *item, struct bencode *v, int64_t sequence, ks_dht_storageitem_signature_t *signature);
 KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);
 
 /**
index 50ed2c6d770f146080158a96f8a0c7ac8be7e98e..b54e2ad2c449bdf5d75e15ec0d4172153e5182b8 100644 (file)
@@ -211,10 +211,9 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
         */
        if (d->storageitems_hash) {
                for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       ks_dht_storageitem_t *val;
-
-                       ks_hash_this_val(it, (void **)&val);
-
+                       const void *key = NULL;
+                       ks_dht_storageitem_t *val = NULL;
+                       ks_hash_this(it, &key, NULL, (void **)&val);
                        ks_dht_storageitem_destroy(&val);
                }
                ks_hash_destroy(&d->storageitems_hash);
@@ -232,18 +231,18 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
         */
        if (d->searches6_hash) {
                for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       ks_dht_search_t *val;
-
-                       ks_hash_this_val(it, (void **)&val);
+                       const void *key = NULL;
+                       ks_dht_search_t *val = NULL;
+                       ks_hash_this(it, &key, NULL, (void **)&val);
                        ks_dht_search_destroy(&val);
                }
                ks_hash_destroy(&d->searches6_hash);
        }
        if (d->searches4_hash) {
                for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       ks_dht_search_t *val;
-
-                       ks_hash_this_val(it, (void **)&val);
+                       const void *key = NULL;
+                       ks_dht_search_t *val = NULL;
+                       ks_hash_this(it, &key, NULL, (void **)&val);
                        ks_dht_search_destroy(&val);
                }
                ks_hash_destroy(&d->searches4_hash);
@@ -652,8 +651,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *sea
                                char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
                                ks_log(KS_LOG_DEBUG,
                                           "Search for %s pending find_node to %s has expired without response\n",
-                                          ks_dht_hexid(&value->target, id_buf),
-                                          ks_dht_hexid(&v->nodeid, id2_buf));
+                                          ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE),
+                                          ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE));
                                v->finished = KS_TRUE;
                                continue;
                        }
@@ -691,7 +690,7 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
                else if (value->expiration <= now) {
                        // if the transaction expires, so does the attached job, but the job may try again with a new transaction
                        value->job->state = KS_DHT_JOB_STATE_EXPIRING;
-                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration);
+                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
                        remove = KS_TRUE;
                }
                if (remove) {
@@ -736,16 +735,16 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
        }
 }
 
-KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
+KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
 {
        char *t = buffer;
 
-       ks_assert(id);
+       ks_assert(data);
        ks_assert(buffer);
 
-       memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1);
+       memset(buffer, 0, len * 2 + 1);
 
-       for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]);
+       for (int i = 0; i < len; ++i, t += 2) sprintf(t, "%02X", data[i]);
 
        return buffer;
 }
@@ -933,10 +932,10 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args,
-                                                                                                                          ks_bool_t optional,
-                                                                                                                          const char *key,
-                                                                                                                          ks_dht_storageitem_key_t **sikey)
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_pkey(struct bencode *args,
+                                                                                                                               ks_bool_t optional,
+                                                                                                                               const char *key,
+                                                                                                                               ks_dht_storageitem_pkey_t **pkey)
 {
        struct bencode *k;
        const char *kv;
@@ -945,9 +944,9 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *a
 
        ks_assert(args);
        ks_assert(key);
-       ks_assert(sikey);
+       ks_assert(pkey);
 
-       *sikey = NULL;
+       *pkey = NULL;
 
        k = ben_dict_get_by_str(args, key);
        if (!k) {
@@ -960,12 +959,12 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *a
 
     kv = ben_str_val(k);
        kv_len = ben_str_len(k);
-    if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) {
+    if (kv_len != KS_DHT_STORAGEITEM_PKEY_SIZE) {
                ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len);
                return KS_STATUS_ARG_INVALID;
        }
 
-       *sikey = (ks_dht_storageitem_key_t *)kv;
+       *pkey = (ks_dht_storageitem_pkey_t *)kv;
 
  done:
        return ret;
@@ -1046,35 +1045,54 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *ra
        return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target)
 {
        SHA_CTX sha;
-       const uint8_t *v;
+       uint8_t *v = NULL;
        size_t v_len;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(value);
        ks_assert(target);
 
-       v = (const uint8_t *)ben_str_val(value);
-       v_len = ben_str_len(value);
-       
+       v = ben_encode(&v_len, value);
        if (!SHA1_Init(&sha) ||
                !SHA1_Update(&sha, v, v_len) ||
-               !SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+               !SHA1_Final(target->id, &sha)) {
+               ret = KS_STATUS_FAIL;
+       }
+       free(v);
 
-       return KS_STATUS_SUCCESS;
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target)
+{
+       struct bencode *v = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(value);
+       ks_assert(value_length > 0);
+       ks_assert(target);
+
+       v = ben_blob(value, value_length);
+       ret = ks_dht_storageitem_target_immutable_internal(v, target);
+       ben_free(v);
+
+       return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target)
 {
        SHA_CTX sha;
+       //char buf1[KS_DHT_NODEID_SIZE * 2 + 1];
 
-       ks_assert(k);
+       ks_assert(pk);
        ks_assert(target);
 
        
        if (!SHA1_Init(&sha) ||
-               !SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL;
+               !SHA1_Update(&sha, pk->key, KS_DHT_STORAGEITEM_PKEY_SIZE)) return KS_STATUS_FAIL;
        if (salt) {
                const uint8_t *s = (const uint8_t *)ben_str_val(salt);
                size_t s_len = ben_str_len(salt);
@@ -1082,9 +1100,156 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key
        }
        if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
 
+       //ks_log(KS_LOG_DEBUG, "Mutable ID: %s\n", ks_dht_hex(target->id, buf1, KS_DHT_NODEID_SIZE));
        return KS_STATUS_SUCCESS;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target)
+{
+       struct bencode *s = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       
+       ks_assert(pk);
+       ks_assert(target);
+
+       if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
+       ret = ks_dht_storageitem_target_mutable_internal(pk, s, target);
+       if (s) ben_free(s);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_encode(uint8_t **encoded,
+                                                                                                                       ks_size_t *encoded_length,
+                                                                                                                       struct bencode *salt,
+                                                                                                                       struct bencode *seq,
+                                                                                                                       struct bencode *v)
+{
+       char *enc = NULL;
+       char *salt_enc = NULL;
+       size_t salt_enc_length = 0;
+       char *seq_enc = NULL;
+       size_t seq_enc_length = 0;
+       char *v_enc = NULL;
+       size_t v_enc_length = 0;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(encoded);
+       ks_assert(encoded_length);
+       ks_assert(seq);
+       ks_assert(v);
+
+       if (salt) salt_enc = ben_encode(&salt_enc_length, salt);
+       seq_enc = ben_encode(&seq_enc_length, seq);
+       v_enc = ben_encode(&v_enc_length, v);
+
+       *encoded_length = (salt ? 6 : 0) + // 4:salt
+               salt_enc_length +
+               5 + // 3:seq
+               seq_enc_length +
+               3 + // 1:v
+               v_enc_length;
+       enc = malloc((*encoded_length) + 1);
+       *encoded = (uint8_t *)enc;
+       enc[0] = '\0';
+
+       if (salt) {
+               strncat(enc, "4:salt", 6);
+               strncat(enc, salt_enc, salt_enc_length);
+       }
+       strncat(enc, "3:seq", 5);
+       strncat(enc, seq_enc, seq_enc_length);
+       strncat(enc, "1:v", 3);
+       strncat(enc, v_enc, v_enc_length);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate_internal(ks_dht_storageitem_signature_t *sig,
+                                                                                                                                          ks_dht_storageitem_skey_t *sk,
+                                                                                                                                          struct bencode *salt,
+                                                                                                                                          struct bencode *seq,
+                                                                                                                                          struct bencode *v)
+{
+       uint8_t *tmpsig = NULL;
+       size_t tmpsig_len = 0;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       //char buf1[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1];
+
+       ks_assert(sig);
+       ks_assert(sk);
+       ks_assert(seq);
+       ks_assert(v);
+
+       if ((ret = ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v)) != KS_STATUS_SUCCESS) goto done;
+
+       if (crypto_sign_detached(sig->sig, NULL, tmpsig, tmpsig_len, sk->key) != 0) {
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
+       //ks_log(KS_LOG_DEBUG, "Signed: %s\n", ks_dht_hex(sig->sig, buf1, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
+
+ done:
+       if (tmpsig) free(tmpsig);
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
+                                                                                                                         ks_dht_storageitem_skey_t *sk,
+                                                                                                                         const uint8_t *salt,
+                                                                                                                         ks_size_t salt_length,
+                                                                                                                         int64_t sequence,
+                                                                                                                         const uint8_t *value,
+                                                                                                                         ks_size_t value_length)
+{
+       struct bencode *s = NULL;
+       struct bencode *seq = NULL;
+       struct bencode *v = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(sig);
+       ks_assert(sk);
+       ks_assert(sequence > 0);
+       ks_assert(value);
+       ks_assert(value_length > 0);
+
+       if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
+       seq = ben_int(sequence);
+       v = ben_blob(value, value_length);
+
+       ret = ks_dht_storageitem_signature_generate_internal(sig, sk, s, seq, v);
+
+       if (s) ben_free(s);
+       ben_free(seq);
+       ben_free(v);
+
+       return ret;
+}
+
+KS_DECLARE(ks_bool_t) ks_dht_storageitem_signature_verify(ks_dht_storageitem_signature_t *sig,
+                                                                                                                 ks_dht_storageitem_pkey_t *pk,
+                                                                                                                 struct bencode *salt,
+                                                                                                                 struct bencode *seq,
+                                                                                                                 struct bencode *v)
+{
+       uint8_t *tmpsig = NULL;
+       size_t tmpsig_len = 0;
+       int32_t res = 0;
+
+       ks_assert(sig);
+       ks_assert(pk);
+       ks_assert(seq);
+       ks_assert(v);
+
+       if (ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v) != KS_STATUS_SUCCESS) return KS_FALSE;
+       
+       res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, pk->key);
+
+       if (tmpsig) free(tmpsig);
+
+       return res == 0;
+}
+
 KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
 {
        char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1];
@@ -1288,7 +1453,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
 
        memcpy(query, qv, qv_len);
        query[qv_len] = '\0';
-       ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
+       //ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
 
        a = ben_dict_get_by_str(message->data, "a");
        if (!a) {
@@ -1302,7 +1467,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
     if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
        message->args_id = *id;
 
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
        if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
                                                                        *id,
                                                                        KS_DHT_REMOTE,
@@ -1347,7 +1512,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
        if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
        message->args_id = *id;
 
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
        if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
                                                                        *id,
                                                                        KS_DHT_REMOTE,
@@ -1356,7 +1521,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                                                                        &node)) != KS_STATUS_SUCCESS) goto done;
        if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
        
-       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
        if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
 
        
@@ -1377,9 +1542,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                           transaction->job->raddr.port);
        } else {
                transaction->job->response = message;
-               transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
                message->transaction = transaction;
                if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
+               else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
+
                transaction->job->response = NULL; // message is destroyed after we return, stop using it
                transaction->finished = KS_TRUE;
        }
@@ -1502,6 +1668,46 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
        return ret;
 }
 
+KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht)
+{
+       ks_assert(dht);
+       ks_hash_read_lock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht)
+{
+       ks_assert(dht);
+       ks_hash_read_unlock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht)
+{
+       ks_assert(dht);
+       ks_hash_write_lock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht)
+{
+       ks_assert(dht);
+       ks_hash_write_lock(dht->storageitems_hash);
+}
+
+KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target)
+{
+       ks_assert(dht);
+       ks_assert(target);
+
+       return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item)
+{
+       ks_assert(dht);
+       ks_assert(item);
+
+       return ks_hash_insert(dht->storageitems_hash, item->id.id, item);
+}
+
 
 KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
                                                                         ks_dht_endpoint_t *ep,
@@ -1632,45 +1838,45 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
 
 KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
 {
+       ks_dht_job_t *first = NULL;
+       ks_dht_job_t *last = NULL;
+       
        ks_assert(dht);
 
        ks_mutex_lock(dht->jobs_mutex);
        for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
-               ks_bool_t remove = KS_FALSE;
+               ks_bool_t done = KS_FALSE;
                jobn = job->next;
-               switch (job->state) {
-               case KS_DHT_JOB_STATE_QUERYING:
+               if (job->state == KS_DHT_JOB_STATE_QUERYING) {
                        job->state = KS_DHT_JOB_STATE_RESPONDING;
                        if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
-                       break;
-               case KS_DHT_JOB_STATE_RESPONDING:
-                       break;
-               case KS_DHT_JOB_STATE_EXPIRING:
+               }
+               if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
                        job->attempts--;
                        if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
-                       else {
-                               if (job->finish_callback) job->finish_callback(dht, job);
-                               remove = KS_TRUE;
-                       }
-                       break;
-               case KS_DHT_JOB_STATE_PROCESSING:
-                       break;
-               case KS_DHT_JOB_STATE_COMPLETING:
-                       if (job->finish_callback) job->finish_callback(dht, job);
-                       remove = KS_TRUE;
-                       break;
-               default: break;
+                       else done = KS_TRUE;
                }
+               if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
 
-               if (remove) {
+               if (done) {
                        if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
                        else if (!jobp) dht->jobs_first = jobn;
                        else if (!jobn) dht->jobs_last = jobp;
                        else jobp->next = jobn;
-                       ks_dht_job_destroy(&job);
+
+                       job->next = NULL;
+                       if (last) last = last->next = job;
+                       else first = last = job;
                } else jobp = job;
        }
        ks_mutex_unlock(dht->jobs_mutex);
+
+       for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
+               jobn = job->next;
+               // this cannot occur inside of the main loop, may add new jobs invalidating list pointers
+               if (job->finish_callback) job->finish_callback(dht, job);
+               ks_dht_job_destroy(&job);
+       }
 }
 
 KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
@@ -1681,6 +1887,8 @@ KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, k
        ks_assert(dht);
        ks_assert(raddr);
 
+       //ks_log(KS_LOG_DEBUG, "Starting ping!\n");
+
        if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
        ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
        ks_dht_jobs_add(dht, job);
@@ -1707,7 +1915,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
                                                                  &message,
                                                                  NULL)) != KS_STATUS_SUCCESS) goto done;
 
-       ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
        ks_q_push(dht->send_q, (void *)message);
 
  done:
@@ -1723,7 +1931,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        ks_assert(message);
        ks_assert(message->args);
 
-       ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
+       //ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
 
        if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
@@ -1733,7 +1941,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
                                                                         &response,
                                                                         NULL)) != KS_STATUS_SUCCESS) goto done;
 
-       ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
        ks_q_push(dht->send_q, (void *)response);
 
  done:
@@ -1747,9 +1955,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t
        ks_assert(dht);
        ks_assert(job);
 
-       ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
-
-       job->state = KS_DHT_JOB_STATE_COMPLETING;
+       //ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
 
        // done:
        return ret;
@@ -1805,7 +2011,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
                ben_dict_set(a, ben_blob("want", 4), want);
        }
 
-       ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
        ks_q_push(dht->send_q, (void *)message);
 
  done:
@@ -1850,7 +2056,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                want6 = message->raddr.family == AF_INET6;
        }
 
-       ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
+       //ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
 
 
        query.nodeid = *target;
@@ -1869,7 +2075,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                                                                                                           &buffer4_length,
                                                                                                           sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
 
-                       ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+                       ks_log(KS_LOG_DEBUG,
+                                  "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
                }
                ks_dhtrt_release_querynodes(&query);
        }
@@ -1886,7 +2093,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                                                                                                           &buffer6_length,
                                                                                                           sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
 
-                       ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+                       ks_log(KS_LOG_DEBUG,
+                                  "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
                }
                ks_dhtrt_release_querynodes(&query);
        }
@@ -1902,7 +2110,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
        if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
 
-       ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
        ks_q_push(dht->send_q, (void *)response);
 
  done:
@@ -1964,11 +2172,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
                ks_log(KS_LOG_DEBUG,
                           "Expanded ipv4 nodeinfo for %s (%s %d)\n",
-                          ks_dht_hexid(&nid, id_buf),
+                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
                           addr.host,
                           addr.port);
 
-               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
                ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
                job->response_nodes[job->response_nodes_count++] = node;
 
@@ -2000,9 +2208,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
                                ks_log(KS_LOG_DEBUG,
                                           "Set closer node id %s (%s) in search of target id %s at results index %d\n",
-                                          ks_dht_hexid(&nid, id_buf),
-                                          ks_dht_hexid(&distance, id2_buf),
-                                          ks_dht_hexid(&search->target, id3_buf),
+                                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
+                                          ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+                                          ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
                                           results_index);
                                search->results[results_index] = nid;
                                search->distances[results_index] = distance;
@@ -2026,11 +2234,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
                ks_log(KS_LOG_DEBUG,
                           "Expanded ipv6 nodeinfo for %s (%s %d)\n",
-                          ks_dht_hexid(&nid, id_buf),
+                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
                           addr.host,
                           addr.port);
 
-               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
                ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
                job->response_nodes6[job->response_nodes6_count++] = node;
 
@@ -2062,9 +2270,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 
                                ks_log(KS_LOG_DEBUG,
                                           "Set closer node id %s (%s) in search of target id %s at results index %d\n",
-                                          ks_dht_hexid(&nid, id_buf),
-                                          ks_dht_hexid(&distance, id2_buf),
-                                          ks_dht_hexid(&search->target, id3_buf),
+                                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
+                                          ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+                                          ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
                                           results_index);
                                search->results[results_index] = nid;
                                search->distances[results_index] = distance;
@@ -2079,9 +2287,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
                }
        }
 
-       ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
-
-       job->state = KS_DHT_JOB_STATE_COMPLETING;
+       //ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
 
  done:
        if(search) ks_mutex_unlock(search->mutex);
@@ -2130,7 +2336,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
        // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
        ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
 
-       ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message query get\n");
        ks_q_push(dht->send_q, (void *)message);
 
        return KS_STATUS_SUCCESS;
@@ -2163,7 +2369,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        seq = ben_dict_get_by_str(message->args, "seq");
        if (seq) sequence = ben_int_val(seq);
 
-       ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
+       //ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
 
        ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
 
@@ -2190,7 +2396,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                                                                                                           &buffer4_length,
                                                                                                           sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
 
-                       ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+                       ks_log(KS_LOG_DEBUG,
+                                  "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
                }
                ks_dhtrt_release_querynodes(&query);
        }
@@ -2207,7 +2414,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                                                                                                           &buffer6_length,
                                                                                                           sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
 
-                       ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+                       ks_log(KS_LOG_DEBUG,
+                                  "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
                }
                ks_dhtrt_release_querynodes(&query);
        }
@@ -2225,7 +2433,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        if (item) {
                if (item->mutable) {
                        if (!sequence_snuffed) {
-                               ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE));
+                               ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE));
                                ben_dict_set(r, ben_blob("sig", 3), ben_blob(item->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
                        }
                        ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
@@ -2235,7 +2443,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
        if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
 
-       ks_log(KS_LOG_DEBUG, "Sending message response get\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message response get\n");
        ks_q_push(dht->send_q, (void *)response);
 
  done:
@@ -2246,7 +2454,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
 {
        ks_dht_storageitem_t *item = NULL;
        ks_dht_token_t *token = NULL;
-       ks_dht_storageitem_key_t *k = NULL;
+       ks_dht_storageitem_pkey_t *k = NULL;
        ks_dht_storageitem_signature_t *sig = NULL;
        struct bencode *seq;
        int64_t sequence = -1;
@@ -2271,7 +2479,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
        job->response_token = *token;
 
-       if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_storageitem_pkey(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
        if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
 
        seq = ben_dict_get_by_str(job->response->args, "seq");
@@ -2297,7 +2505,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
                nodes6_size = ben_str_len(n);
        }
 
-       ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+       //ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
 
        while (nodes_len < nodes_size) {
                ks_dht_nodeid_t nid;
@@ -2308,11 +2516,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
 
                ks_log(KS_LOG_DEBUG,
                           "Expanded ipv4 nodeinfo for %s (%s %d)\n",
-                          ks_dht_hexid(&nid, id_buf),
+                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
                           addr.host,
                           addr.port);
 
-               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
                ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
                job->response_nodes[job->response_nodes_count++] = node;
        }
@@ -2325,11 +2533,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
 
                ks_log(KS_LOG_DEBUG,
                           "Expanded ipv6 nodeinfo for %s (%s %d)\n",
-                          ks_dht_hexid(&nid, id_buf),
+                          ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
                           addr.host,
                           addr.port);
 
-               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
                ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
                job->response_nodes6[job->response_nodes6_count++] = node;
        }
@@ -2343,72 +2551,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
 
                if (!seq) {
                        // immutable
-                       if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+                       if ((ret = ks_dht_storageitem_target_immutable_internal(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
                        if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
                                ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
                                ret = KS_STATUS_FAIL;
                                goto done;
                        }
                        if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
-                       else if ((ret = ks_dht_storageitem_create_immutable(&item,
-                                                                                                                                 dht->pool,
-                                                                                                                                 &tmptarget,
-                                                                                                                                 v)) != KS_STATUS_SUCCESS) goto done;
+                       else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
+                                                                                                                                                dht->pool,
+                                                                                                                                                &tmptarget,
+                                                                                                                                                v,
+                                                                                                                                                KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
                } else {
                        // mutable
-                       struct bencode *tmp = NULL;
-                       uint8_t *tmpsig = NULL;
-                       size_t tmpsig_len = 0;
-                       int32_t res = 0;
-
-                       if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+                       if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
                        if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
-                               ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
+                               ks_log(KS_LOG_DEBUG, "Mutable data hash does not match requested target id\n");
                                ret = KS_STATUS_FAIL;
                                goto done;
                        }
 
-                       tmp = ben_dict();
-                       if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt));
-                       ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq));
-                       ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v));
-                       tmpsig = ben_encode(&tmpsig_len, tmp);
-                       ben_free(tmp);
-
-                       res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key);
-
-                       free(tmpsig);
-
-                       if (res) {
-                               ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n");
+                       if (!ks_dht_storageitem_signature_verify(sig, k, job->query_salt, seq, v)) {
+                               ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
                                ret = KS_STATUS_FAIL;
                                goto done;
                        }
-                       
+                       ks_log(KS_LOG_DEBUG, "Signature verified for %s\n", ks_dht_hex(tmptarget.id, id_buf, KS_DHT_NODEID_SIZE));
                        if (olditem) {
-                               if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
-                               else {
-                                       ks_hash_remove(dht->storageitems_hash, olditem->id.id);
-                                       olditem = NULL;
+                               if (olditem->seq > sequence) {
+                                       goto done;
                                }
+                               if (olditem->seq == sequence) {
+                                       if (ben_cmp(olditem->v, v) != 0) {
+                                               goto done;
+                                       }
+                               } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+                               olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
                        }
-                       if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item,
-                                                                                                                                        dht->pool,
-                                                                                                                                        &tmptarget,
-                                                                                                                                        v,
-                                                                                                                                        k,
-                                                                                                                                        job->query_salt,
-                                                                                                                                        sequence,
-                                                                                                                                        sig)) != KS_STATUS_SUCCESS) goto done;
+                       else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
+                                                                                                                                          dht->pool,
+                                                                                                                                          &tmptarget,
+                                                                                                                                          v,
+                                                                                                                                          KS_TRUE,
+                                                                                                                                          k,
+                                                                                                                                          job->query_salt,
+                                                                                                                                          KS_TRUE,
+                                                                                                                                          sequence,
+                                                                                                                                          sig)) != KS_STATUS_SUCCESS) goto done;
                }
                if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
+               item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED);
        } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
 
        if (item) job->response_storageitem = item;
        else if (olditem) job->response_storageitem = olditem;
 
-       job->state = KS_DHT_JOB_STATE_COMPLETING;
-
  done:
        if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
        if (ret != KS_STATUS_SUCCESS) {
@@ -2417,22 +2615,25 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        return ret;
 }
 
+// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers
+// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
 KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
                                                                   const ks_sockaddr_t *raddr,
                                                                   ks_dht_job_callback_t callback,
-                                                                  ks_dht_nodeid_t *target,
-                                                                  uint8_t *salt,
-                                                                  ks_size_t salt_length)
+                                                                  ks_dht_token_t *token,
+                                                                  int64_t cas,
+                                                                  ks_dht_storageitem_t *item)
 {
        ks_dht_job_t *job = NULL;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
-       ks_assert(target);
+       ks_assert(token);
+       ks_assert(item);
 
        if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
-       ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length);
+       ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item);
        ks_dht_jobs_add(dht, job);
 
  done:
@@ -2455,9 +2656,17 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
                                                   &message,
                                                   &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
+       if (job->query_storageitem->mutable) {
+               if (job->query_cas > 0) ben_dict_set(a, ben_blob("cas", 3), ben_int(job->query_cas));
+               ben_dict_set(a, ben_blob("k", 1), ben_blob(job->query_storageitem->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE));
+               if (job->query_storageitem->salt) ben_dict_set(a, ben_blob("salt", 4), ben_clone(job->query_storageitem->salt));
+               ben_dict_set(a, ben_blob("seq", 3), ben_int(job->query_storageitem->seq));
+               ben_dict_set(a, ben_blob("sig", 3), ben_blob(job->query_storageitem->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
+       }
+       ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE));
+       ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v));
 
-
-       ks_log(KS_LOG_DEBUG, "Sending message query put\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message query put\n");
        ks_q_push(dht->send_q, (void *)message);
 
        return KS_STATUS_SUCCESS;
@@ -2465,6 +2674,20 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
 {
+       ks_dht_token_t *token = NULL;
+       ks_dht_storageitem_pkey_t *k = NULL;
+       ks_dht_storageitem_signature_t *sig = NULL;
+       struct bencode *salt = NULL;
+       struct bencode *seq = NULL;
+       int64_t sequence = -1;
+       struct bencode *cas = NULL;
+       int64_t cas_seq = -1;
+       struct bencode *v = NULL;
+       //ks_size_t v_len = 0;
+       ks_bool_t storageitems_locked = KS_FALSE;
+       ks_dht_storageitem_t *item = NULL;
+       ks_dht_storageitem_t *olditem = NULL;
+       ks_dht_nodeid_t target;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -2473,7 +2696,99 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        ks_assert(message);
        ks_assert(message->args);
 
-       ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
+
+       if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+
+       if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
+
+       salt = ben_dict_get_by_str(message->args, "salt");
+
+       seq = ben_dict_get_by_str(message->args, "seq");
+       if (seq) sequence = ben_int_val(seq);
+
+       cas = ben_dict_get_by_str(message->args, "cas");
+       if (cas) cas_seq = ben_int_val(cas);
+
+       if (seq && (!k || !sig)) {
+               ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n");
+               ret = KS_STATUS_ARG_INVALID;
+               goto done;
+       }
+
+       v = ben_dict_get_by_str(message->args, "v");
+       if (!v) {
+               ks_log(KS_LOG_DEBUG, "Must provide v\n");
+               ret = KS_STATUS_ARG_INVALID;
+               goto done;
+       }
+       //v_len = ben_str_len(v);
+
+       if (!seq) {
+               // immutable
+               if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done;
+       } else {
+               // mutable
+               if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done;
+       }
+       olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
+
+       if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
+               ks_log(KS_LOG_DEBUG, "Invalid token\n");
+               ret = KS_STATUS_FAIL;
+               goto done;
+       }
+
+       //ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
+
+       ks_hash_write_lock(dht->storageitems_hash);
+       storageitems_locked = KS_TRUE;
+
+       if (!seq) {
+               // immutable
+               if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+               else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
+                                                                                                                                        dht->pool,
+                                                                                                                                        &target,
+                                                                                                                                        v,
+                                                                                                                                        KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+       } else {
+               // mutable
+               if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) {
+                       ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
+                       ret = KS_STATUS_FAIL;
+                       goto done;
+               }
+               
+               if (olditem) {
+                       if (cas && olditem->seq != cas_seq) {
+                               // @todo send 301 error instead of the response
+                               goto done;
+                       }
+                       if (olditem->seq > sequence) {
+                               // @todo send 302 error instead of the response
+                               goto done;
+                       }
+                       if (olditem->seq == sequence) {
+                               if (ben_cmp(olditem->v, v) != 0) {
+                                       // @todo send 201? error instead of the response
+                                       goto done;
+                               }
+                       } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+                       olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+               }
+               else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
+                                                                                                                                  dht->pool,
+                                                                                                                                  &target,
+                                                                                                                                  v,
+                                                                                                                                  KS_TRUE,
+                                                                                                                                  k,
+                                                                                                                                  salt,
+                                                                                                                                  KS_TRUE,
+                                                                                                                                  sequence,
+                                                                                                                                  sig)) != KS_STATUS_SUCCESS) goto done;
+       }
+       if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
 
        if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
@@ -2483,10 +2798,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
                                                                         &response,
                                                                         &r)) != KS_STATUS_SUCCESS) goto done;
 
-       ks_log(KS_LOG_DEBUG, "Sending message response put\n");
+       //ks_log(KS_LOG_DEBUG, "Sending message response put\n");
        ks_q_push(dht->send_q, (void *)response);
 
  done:
+       if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
+       if (ret != KS_STATUS_SUCCESS) {
+               if (item) ks_dht_storageitem_destroy(&item);
+       }
        return ret;
 }
 
@@ -2499,8 +2818,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
 
        ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
-       job->state = KS_DHT_JOB_STATE_COMPLETING;
-
        // done:
        return ret;
 }
index aacd74b43cb2d2dfcd63fa055aa2a91ea7aa4884..7a2caca64336cdf82c53d6ff530dd0a7bb36b056 100644 (file)
@@ -29,11 +29,12 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
 
-#define KS_DHT_TRANSACTION_EXPIRATION 30
+#define KS_DHT_TRANSACTION_EXPIRATION 10
 #define KS_DHT_SEARCH_EXPIRATION 10
 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
 
-#define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
+#define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES
+#define KS_DHT_STORAGEITEM_SKEY_SIZE crypto_sign_SECRETKEYBYTES
 #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
 #define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
 #define KS_DHT_STORAGEITEM_EXPIRATION 7200
@@ -48,7 +49,8 @@ typedef struct ks_dht_datagram_s ks_dht_datagram_t;
 typedef struct ks_dht_job_s ks_dht_job_t;
 typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
 typedef struct ks_dht_token_s ks_dht_token_t;
-typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t;
+typedef struct ks_dht_storageitem_pkey_s ks_dht_storageitem_pkey_t;
+typedef struct ks_dht_storageitem_skey_s ks_dht_storageitem_skey_t;
 typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t;
 typedef struct ks_dht_message_s ks_dht_message_t;
 typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
@@ -103,7 +105,6 @@ enum ks_dht_job_state_t {
        KS_DHT_JOB_STATE_QUERYING,
        KS_DHT_JOB_STATE_RESPONDING,
        KS_DHT_JOB_STATE_EXPIRING,
-       KS_DHT_JOB_STATE_PROCESSING,
        KS_DHT_JOB_STATE_COMPLETING,
 };
 
@@ -133,6 +134,9 @@ struct ks_dht_job_s {
        // job specific query parameters
        ks_dht_nodeid_t query_target;
        struct bencode *query_salt;
+       int64_t query_cas;
+       ks_dht_token_t query_token;
+       ks_dht_storageitem_t *query_storageitem;
 
        // job specific response parameters
        ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
@@ -158,8 +162,12 @@ struct ks_dhtrt_querynodes_s {
     ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
 };
 
-struct ks_dht_storageitem_key_s {
-       uint8_t key[KS_DHT_STORAGEITEM_KEY_SIZE];
+struct ks_dht_storageitem_pkey_s {
+       uint8_t key[KS_DHT_STORAGEITEM_PKEY_SIZE];
+};
+
+struct ks_dht_storageitem_skey_s {
+       uint8_t key[KS_DHT_STORAGEITEM_SKEY_SIZE];
 };
 
 struct ks_dht_storageitem_signature_s {
@@ -225,8 +233,9 @@ struct ks_dht_storageitem_s {
        struct bencode *v;
        
        ks_bool_t mutable;
-       ks_dht_storageitem_key_t pk;
-       ks_dht_storageitem_key_t sk;
+       ks_mutex_t *mutex;
+       ks_dht_storageitem_pkey_t pk;
+       ks_dht_storageitem_skey_t sk;
        struct bencode *salt;
        int64_t seq;
        ks_dht_storageitem_signature_t sig;
@@ -361,14 +370,88 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
  */
 KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
 
+
+KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len);
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
+                                                                                                                         ks_dht_storageitem_skey_t *sk,
+                                                                                                                         const uint8_t *salt,
+                                                                                                                         ks_size_t salt_length,
+                                                                                                                         int64_t sequence,
+                                                                                                                         const uint8_t *value,
+                                                                                                                         ks_size_t value_length);
+                                                                                               
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht);
+
+/**
+ *
+ */
+KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item);
+                                               
+/**
+ *
+ */
 KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
+
+/**
+ *
+ */
 KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
+
+/**
+ *
+ */
 KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
                                                                   const ks_sockaddr_t *raddr,
                                                                   ks_dht_job_callback_t callback,
                                                                   ks_dht_nodeid_t *target,
                                                                   uint8_t *salt,
                                                                   ks_size_t salt_length);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
+                                                                  const ks_sockaddr_t *raddr,
+                                                                  ks_dht_job_callback_t callback,
+                                                                  ks_dht_token_t *token,
+                                                                  int64_t cas,
+                                                                  ks_dht_storageitem_t *item);
                                                
 /**
  * Create a network search of the closest nodes to a target.
@@ -391,34 +474,6 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
                                                                          ks_dht_search_t **search);
 
 
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
-                                                                                         ks_pool_t *pool,
-                                                                                         ks_dht_endpoint_t *endpoint,
-                                                                                         const ks_sockaddr_t *raddr,
-                                                                                         ks_bool_t alloc_data);
-/**
- *
- */
-KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
-                                                                                               uint8_t *transactionid,
-                                                                                               ks_size_t transactionid_length,
-                                                                                               struct bencode **args);
-
-
 /**
  * route table methods
  *
index d92b5a62a730c771fc57770bdb2362d5a92897d8..97e30b460db0a794f67a69035380e877be9bb07b 100644 (file)
@@ -78,18 +78,20 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
 KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
                                                                          ks_dht_job_callback_t query_callback,
                                                                          ks_dht_job_callback_t finish_callback,
-                                                                         ks_dht_nodeid_t *target,
-                                                                         uint8_t *salt,
-                                                                         ks_size_t salt_length)
+                                                                         ks_dht_token_t *token,
+                                                                         int64_t cas,
+                                                                         ks_dht_storageitem_t *item)
 {
        ks_assert(job);
        ks_assert(query_callback);
-       ks_assert(target);
+       ks_assert(token);
+       ks_assert(item);
 
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
-       job->query_target = *target;
-       if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
+       job->query_token = *token;
+       job->query_cas = cas;
+       job->query_storageitem = item;
 }
 
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
index e9385cb74be0e8cdb4f098871e0a0fb37f6fffa0..a88240f652318cc6f5edfab9f2e5b24464b08598 100644 (file)
@@ -105,7 +105,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
 
        memcpy(message->type, yv, yv_len);
        message->type[yv_len] = '\0';
-       ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type);
+       //ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type);
 
        return KS_STATUS_SUCCESS;
 }
index 3dc838545c66c2cd3e38e3a0590c5c24e7eb6709..31e62062b0cd9db19c7631ba5a6f1fe7fdb91fbb 100644 (file)
@@ -44,9 +44,9 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
 
        if (s->pending) {
                for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
-                       ks_dht_search_pending_t *val;
-                       
-                       ks_hash_this_val(it, (void **)&val);
+                       const void *key = NULL;
+                       ks_dht_search_pending_t *val = NULL;
+                       ks_hash_this(it, &key, NULL, (void **)&val);
                        ks_dht_search_pending_destroy(&val);
                }
                ks_hash_destroy(&s->pending);
index f34783c28fe70d5b73c4f4161d256dc08e1d4e7e..42bffa031928b1d8bdcd699c6de7b5b71746d06a 100644 (file)
@@ -2,7 +2,11 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
-KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_storageitem_t **item,
+                                                                                                                                        ks_pool_t *pool,
+                                                                                                                                        ks_dht_nodeid_t *target,
+                                                                                                                                        struct bencode *v,
+                                                                                                                                        ks_bool_t clone_v)
 {
        ks_dht_storageitem_t *si;
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -19,16 +23,9 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
        si->id = *target;
        si->mutable = KS_FALSE;
        si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
-       si->v = ben_clone(v);
+       si->v = clone_v ? ben_clone(v) : v;
        ks_assert(si->v);
 
-       //enc = ben_encode(&enc_len, si->v);
-       //ks_assert(enc);
-       //SHA1_Init(&sha);
-       //SHA1_Update(&sha, enc, enc_len);
-       //SHA1_Final(si->id.id, &sha);
-       //free(enc);
-
        // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (si) ks_dht_storageitem_destroy(item);
@@ -36,14 +33,36 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
-                                                                                                                 ks_pool_t *pool,
-                                                                                                                 ks_dht_nodeid_t *target,
-                                                                                                                 struct bencode *v,
-                                                                                                                 ks_dht_storageitem_key_t *k,
-                                                                                                                 struct bencode *salt,
-                                                                                                                 int64_t sequence,
-                                                                                                                 ks_dht_storageitem_signature_t *signature)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item,
+                                                                                                                       ks_pool_t *pool,
+                                                                                                                       ks_dht_nodeid_t *target,
+                                                                                                                       const uint8_t *value,
+                                                                                                                       ks_size_t value_length)
+{
+       struct bencode *v = NULL;
+
+       ks_assert(item);
+       ks_assert(pool);
+       ks_assert(value);
+       ks_assert(value_length > 0);
+       ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
+
+       v = ben_blob(value, value_length);
+       ks_assert(v);
+       
+       return ks_dht_storageitem_create_immutable_internal(item, pool, target, v, KS_FALSE);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storageitem_t **item,
+                                                                                                                                  ks_pool_t *pool,
+                                                                                                                                  ks_dht_nodeid_t *target,
+                                                                                                                                  struct bencode *v,
+                                                                                                                                  ks_bool_t clone_v,
+                                                                                                                                  ks_dht_storageitem_pkey_t *pk,
+                                                                                                                                  struct bencode *salt,
+                                                                                                                                  ks_bool_t clone_salt,
+                                                                                                                                  int64_t sequence,
+                                                                                                                                  ks_dht_storageitem_signature_t *signature)
 {
        ks_dht_storageitem_t *si;
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -52,7 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
        ks_assert(pool);
        ks_assert(v);
        ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
-       ks_assert(k);
+       ks_assert(pk);
        ks_assert(signature);
 
        *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
@@ -62,12 +81,15 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
        si->id = *target;
        si->mutable = KS_TRUE;
        si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
-       si->v = ben_clone(v);
+       si->v = clone_v ? ben_clone(v) : v;
        ks_assert(si->v);
 
-       si->pk = *k;
+       ks_mutex_create(&si->mutex, KS_MUTEX_FLAG_DEFAULT, si->pool);
+       ks_assert(si->mutex);
+       
+       si->pk = *pk;
        if (salt) {
-               si->salt = ben_clone(salt);
+               si->salt = clone_salt ? ben_clone(salt) : salt;
                ks_assert(si->salt);
        }
        si->seq = sequence;
@@ -80,6 +102,48 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
        return ret;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
+                                                                                                                 ks_pool_t *pool,
+                                                                                                                 ks_dht_nodeid_t *target,
+                                                                                                                 const uint8_t *value,
+                                                                                                                 ks_size_t value_length,
+                                                                                                                 ks_dht_storageitem_pkey_t *pk,
+                                                                                                                 const uint8_t *salt,
+                                                                                                                 ks_size_t salt_length,
+                                                                                                                 int64_t sequence,
+                                                                                                                 ks_dht_storageitem_signature_t *signature)
+{
+       struct bencode *v = NULL;
+       struct bencode *s = NULL;
+
+       ks_assert(item);
+       ks_assert(pool);
+       ks_assert(value);
+       ks_assert(value_length > 0);
+       ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
+       ks_assert(pk);
+       ks_assert(signature);
+
+       v = ben_blob(value, value_length);
+       if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
+       return ks_dht_storageitem_create_mutable_internal(item, pool, target, v, KS_FALSE, pk, s, KS_FALSE, sequence, signature);
+}
+
+KS_DECLARE(void) ks_dht_storageitem_update_mutable(ks_dht_storageitem_t *item, struct bencode *v, int64_t sequence, ks_dht_storageitem_signature_t *signature)
+{
+       ks_assert(item);
+       ks_assert(v);
+       ks_assert(sequence);
+       ks_assert(signature);
+
+       ks_mutex_lock(item->mutex);
+       ben_free(item->v);
+       item->v = ben_clone(v);
+       item->seq = sequence;
+       item->sig = *signature;
+       ks_mutex_unlock(item->mutex);
+}
+
 /**
  *
  */
@@ -96,6 +160,7 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
                ben_free(si->v);
                si->v = NULL;
        }
+       if (si->mutex) ks_mutex_destroy(&si->mutex);
        if (si->salt) {
                ben_free(si->salt);
                si->salt = NULL;
index 7472c41ce7916bfe12f14882bbb551458bc32dbe..60d0d71de095de88f2fcc5dca372f49a131b211f 100644 (file)
@@ -3,14 +3,32 @@
 #include <../dht/ks_dht-int.h>
 #include <tap.h>
 
-#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
-#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
+ks_dht_storageitem_skey_t sk;
+ks_dht_storageitem_pkey_t pk;
 
-ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
+ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       diag("dht_z_callback\n");
-       ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
-       ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
+       diag("dht2_put_callback\n");
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       char buf[KS_DHT_TOKEN_SIZE * 2 + 1];
+       const char *v = "Hello World!";
+       size_t v_len = strlen(v);
+       ks_dht_storageitem_signature_t sig;
+       ks_dht_storageitem_t *mutable = NULL;
+       
+       diag("dht2_get_token_callback %s\n", ks_dht_hex(job->response_token.token, buf, KS_DHT_TOKEN_SIZE));
+
+       ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+       // @todo check if exists
+       ks_dht_storageitem_create_mutable(&mutable, dht->pool, &job->query_target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+       mutable->sk = sk;
+       ks_dht_storageitems_insert(dht, mutable);
+       
+       ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable);
        return KS_STATUS_SUCCESS;
 }
 
@@ -30,6 +48,27 @@ int main() {
   ks_sockaddr_t raddr1;
   //ks_sockaddr_t raddr2;
   //ks_sockaddr_t raddr3;
+  ks_dht_nodeid_t target;
+  //ks_dht_storageitem_t *immutable = NULL;
+  //ks_dht_storageitem_t *mutable = NULL;
+  //const char *v = "Hello World!";
+  //size_t v_len = strlen(v);
+  //ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b,
+  //0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d,
+  //0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22,
+  //0xe7, 0x54, 0x94, 0x1a, 0xc7, 0x84, 0x79, 0xd6, 0xc5, 0x4e, 0x1f, 0xaf, 0x60, 0x37, 0x88, 0x1d } };
+  //ks_dht_storageitem_pkey_t pk; //= { { 0x77, 0xff, 0x84, 0x90, 0x5a, 0x91, 0x93, 0x63, 0x67, 0xc0, 0x13, 0x60, 0x80, 0x31, 0x04, 0xf9,
+  //0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } };
+  //uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE];
+  //uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE];
+  //ks_dht_storageitem_signature_t sig;
+  //char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1];
+  //char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1];
+  //const char *test1vector = "3:seqi1e1:v12:Hello World!";
+  //const char *test1vector = "4:salt6:foobar3:seqi1e1:v12:Hello World!";
+  //size_t test1vector_len = strlen(test1vector);
+  //uint8_t test1vector_sig[KS_DHT_STORAGEITEM_SIGNATURE_SIZE];
+  //char test1vector_buf[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1];
 
   err = ks_init();
   ok(!err);
@@ -62,9 +101,6 @@ int main() {
   err = ks_dht_create(&dht3, NULL, NULL);
   ok(err == KS_STATUS_SUCCESS);
   
-  
-  ks_dht_register_type(dht1, "z", dht_z_callback);
-  
   if (have_v4) {
     err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
        ok(err == KS_STATUS_SUCCESS);
@@ -111,30 +147,9 @@ int main() {
        ok(err == KS_STATUS_SUCCESS);
   }
 
-  //diag("Custom type tests\n");
-  
-  //buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
-  //memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
-  //dht1->recv_buffer_length = buflen;
-
-  //err = ks_dht_process(dht1, ep1, &raddr);
-  //ok(err == KS_STATUS_SUCCESS);
-
-  //ks_dht_pulse(dht1, 100);
-
-  //ks_dht_pulse(&dht2, 100);
-
-  
-  //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
-  //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);
-  //dht1->recv_buffer_length = buflen;
-
-  //err = ks_dht_process(dht1, &raddr);
-  //ok(err == KS_STATUS_SUCCESS);
-
+  /*
   diag("Ping test\n");
   
-  //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
   ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
 
   ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
@@ -147,7 +162,7 @@ int main() {
 
   ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
 
-  ks_dht_pulse(dht2, 100); // (COMPLETING)
+  ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
 
   diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
   for (int i = 0; i < 10; ++i) {
@@ -157,31 +172,92 @@ int main() {
          ks_dht_pulse(dht2, 100);
   }
   ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+  */
   
-  // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
+  //diag("Get test\n");
+  
+
+  /*
+  ks_dht_storageitem_target_immutable((uint8_t *)v, v_len, &target);
+  ks_dht_storageitem_create_immutable(&immutable, dht1->pool, &target, (uint8_t *)v, v_len);
+  ks_dht_storageitems_insert(dht1, immutable);
+  */
+  
+  /*
+  crypto_sign_keypair(pk.key, sk.key);
 
-  diag("Find_Node test\n");
+  ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+  ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
+  ks_dht_storageitem_create_mutable(&mutable, dht1->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+  mutable->sk = sk;
+  ks_dht_storageitems_insert(dht1, mutable);
 
-  ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
+  ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0);
+  ks_dht_pulse(dht2, 100); // send get query
 
-  ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+  ks_dht_pulse(dht1, 100); // receive get query and send get response
 
-  ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+  ks_dht_pulse(dht2, 100); // receive get response
 
-  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  ok(ks_dht_storageitems_find(dht2, &target) != NULL); // item should be verified and stored
 
-  ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+  ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
+  */
 
-  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  diag("Put test\n");
+
+  crypto_sign_keypair(pk.key, sk.key);
+
+  ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
+
+  ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
   
-  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  ks_dht_pulse(dht2, 100); // send get query
+
+  ks_dht_pulse(dht1, 100); // receive get query and send get response
+
+  ks_dht_pulse(dht2, 100); // receive get response
+
+  ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query
+
+  ks_dht_pulse(dht1, 100); // receive put query and send put response
+
+  ks_dht_pulse(dht2, 100); // receive put response
+
+  ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING)
+
   for (int i = 0; i < 10; ++i) {
          //diag("DHT 1\n");
          ks_dht_pulse(dht1, 100);
          //diag("DHT 2\n");
          ks_dht_pulse(dht2, 100);
   }
-  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+
+  // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
+
+  //diag("Find_Node test\n");
+
+  //ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
+
+  //ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+
+  //ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+
+  //ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+  //ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+
+  //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  
+  //diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  //for (int i = 0; i < 10; ++i) {
+         //diag("DHT 1\n");
+         //ks_dht_pulse(dht1, 100);
+         //diag("DHT 2\n");
+         //ks_dht_pulse(dht2, 100);
+  //}
+  //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
 
 
   /* Cleanup and shutdown */