]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: A bunch of stuff related to chaining multiple jobs, bug fixes, few other...
authorShane Bryldt <astaelan@gmail.com>
Tue, 3 Jan 2017 07:09:02 +0000 (07:09 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
12 files changed:
libs/libks/Makefile.am
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/src/dht/ks_dht_distribute.c [new file with mode: 0644]
libs/libks/src/dht/ks_dht_job.c
libs/libks/src/dht/ks_dht_publish.c [new file with mode: 0644]
libs/libks/src/dht/ks_dht_search.c
libs/libks/src/dht/ks_dht_storageitem.c
libs/libks/src/ks_thread_pool.c
libs/libks/test/testdht2.c

index 6d8c5c466fe4a56bb761b4a4e18d54e488435be4..ae74a81dbd00315a6350dcc7fd5ab2783924b5ee 100644 (file)
@@ -14,7 +14,8 @@ libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
 libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
 libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
 libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
-libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
+libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_publish.c src/dht/ks_dht_distribute.c src/dht/ks_dht_storageitem.c
+libks_la_SOURCES += src/dht/ks_dht_bucket.c
 libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c 
 #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
 
index 76d797ef1c8f8d75bd7e5caee1d461cfb7bd8f69..2e8ce749e37d3dc0672459d2f2b755149fd9b97e 100644 (file)
@@ -22,11 +22,11 @@ KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
 
 /**
- * Called internally to expire search data.
- * Handles completing and purging of finished searches.
+ * Called internally to expire or reannounce storage item data.
+ * Handles reannouncing and purging of expiring storage items.
  * @param dht pointer to the dht instance
  */
-KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht);
+KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht);
 
 /**
  * Called internally to process job state machine.
@@ -276,19 +276,18 @@ KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram);
 KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
                                                                                  ks_pool_t *pool,
                                                                                  const ks_sockaddr_t *raddr,
-                                                                                 int32_t attempts);
+                                                                                 int32_t attempts,
+                                                                                 void *data);
 KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback);
 KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
-                                                                                  ks_dht_search_t *search,
                                                                                   ks_dht_job_callback_t query_callback,
                                                                                   ks_dht_job_callback_t finish_callback,
                                                                                   ks_dht_nodeid_t *target);
 KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
-                                                                         ks_dht_search_t *search,
                                                                          ks_dht_job_callback_t query_callback,
                                                                          ks_dht_job_callback_t finish_callback,
                                                                          ks_dht_nodeid_t *target,
-                                                                         uint8_t *salt,
+                                                                         const uint8_t *salt,
                                                                          ks_size_t salt_length);
 KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
                                                                          ks_dht_job_callback_t query_callback,
@@ -296,6 +295,9 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
                                                                          ks_dht_token_t *token,
                                                                          int64_t cas,
                                                                          ks_dht_storageitem_t *item);
+KS_DECLARE(void) ks_dht_job_build_search(ks_dht_job_t *job,
+                                                                                ks_dht_job_callback_t query_callback,
+                                                                                ks_dht_job_callback_t finish_callback);
 KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job,
                                                                        ks_dht_nodeid_t *target,
                                                                        uint32_t family,
@@ -344,9 +346,35 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search,
+                                                                                        ks_pool_t *pool,
+                                                                                        ks_dhtrt_routetable_t *table,
+                                                                                        const ks_dht_nodeid_t *target,
+                                                                                        ks_dht_job_callback_t callback,
+                                                                                        void *data);
 KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_publish_create(ks_dht_publish_t **publish,
+                                                                                         ks_pool_t *pool,
+                                                                                         ks_dht_job_callback_t callback,
+                                                                                         void *data,
+                                                                                         int64_t cas,
+                                                                                         ks_dht_storageitem_t *item);
+KS_DECLARE(void) ks_dht_publish_destroy(ks_dht_publish_t **publish);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_distribute_create(ks_dht_distribute_t **distribute,
+                                                                                                ks_pool_t *pool,
+                                                                                                ks_dht_storageitem_callback_t callback,
+                                                                                                void *data,
+                                                                                                int64_t cas,
+                                                                                                ks_dht_storageitem_t *item);
+KS_DECLARE(void) ks_dht_distribute_destroy(ks_dht_distribute_t **distribute);
 
 /**
  *
index a40c1cf8c5791699bb08e25d2b9fdb16b3a32a70..7007ddb8eb14e0e818bda761140e3b074bc65926 100644 (file)
@@ -175,17 +175,21 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        d->rt_ipv6 = NULL;
 
        /**
-        * Create the mutex to handle searches list.
+        * Default tokens expirations to not be checked for one pulse.
         */
-       ks_mutex_create(&d->searches_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
-       ks_assert(d->searches_mutex);
-
+       d->tokens_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC);
+       
        /**
         * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
         */
        d->token_secret_current = d->token_secret_previous = rand();
        d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC);
 
+       /**
+        * Default storageitems expirations to not be checked for one pulse.
+        */
+       d->storageitems_pulse = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEMS_PULSE * KS_USEC_PER_SEC);
+       
        /**
         * Create the hash to store arbitrary data for BEP44.
         */
@@ -229,6 +233,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
         * Cleanup the storageitems hash and it's contents if it is allocated.
         */
        if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
+       d->storageitems_pulse = 0;
 
        /**
         * Zero out the opaque write token variables.
@@ -236,15 +241,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        d->token_secret_current = 0;
        d->token_secret_previous = 0;
        d->token_secret_expiration = 0;
-
-       /**
-        * Cleanup the search mutex and searches if they are allocated.
-        */
-       if (d->searches_mutex) ks_mutex_destroy(&d->searches_mutex);
-       for (ks_dht_search_t *search = d->searches_first, *searchn = NULL; search; search = searchn) {
-               searchn = search->next;
-               ks_dht_search_destroy(&search);
-       }
+       d->tokens_pulse = 0;
 
        /**
         * Cleanup the route tables if they are allocated.
@@ -258,6 +255,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        d->transactionid_next = 0;
        if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
        if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
+       d->transactions_pulse = 0;
 
        /**
         * Cleanup the jobs mutex and jobs if they are allocated.
@@ -290,8 +288,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        /**
         * Probably don't need this
         */
-       d->transactions_pulse = 0;
-
        d->endpoints_length = 0;
        d->endpoints_size = 0;
 
@@ -624,10 +620,7 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
        if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
        if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
 
-       ks_dht_pulse_searches(dht);
-
-       // @todo pulse_storageitems for keepalive and expiration
-       // hold keepalive counter on items to determine what to reannounce vs expire
+       ks_dht_pulse_storageitems(dht);
 
        ks_dht_pulse_jobs(dht);
 
@@ -638,43 +631,58 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
        ks_dht_pulse_tokens(dht);
 }
 
-KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht)
+KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht)
 {
-       ks_dht_search_t *searches_first = NULL;
-       ks_dht_search_t *searches_last = NULL;
-       
+       ks_hash_iterator_t *it = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_time_t now = ks_time_now();
+
        ks_assert(dht);
 
-       ks_mutex_lock(dht->searches_mutex);
-       for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) {
-               ks_bool_t done = KS_FALSE;
-               searchn = search->next;
+       if (dht->storageitems_pulse > now) return;
+       dht->storageitems_pulse = now + ((ks_time_t)KS_DHT_STORAGEITEMS_PULSE * KS_USEC_PER_SEC);
 
-               ks_mutex_lock(search->mutex);
-               done = ks_hash_count(search->searching) == 0;
-               
-               if (done) {
-                       if (!searchp && !searchn) dht->searches_first = dht->searches_last = NULL;
-                       else if (!searchp) dht->searches_first = searchn;
-                       else if (!searchn) {
-                               dht->searches_last = searchp;
-                               dht->searches_last->next = NULL;
+       ks_hash_write_lock(dht->storageitems_hash);
+       for (it = ks_hash_first(dht->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const void *key = NULL;
+               ks_dht_storageitem_t *value = NULL;
+               ks_bool_t remove = KS_FALSE;
+
+               ks_hash_this(it, &key, NULL, (void **)&value);
+
+               ks_mutex_lock(value->mutex);
+
+               if (value->keepalive <= now) {
+                       value->keepalive = now + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC);
+                       if (value->refc > 0) {
+                               value->expiration = now + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+                               ks_log(KS_LOG_DEBUG, "Item keepalive %s\n", ks_dht_hex(value->id.id, id_buf, KS_DHT_NODEID_SIZE));
+                               if (dht->rt_ipv4) ks_dht_distribute(dht, NULL, NULL, dht->rt_ipv4, 0, value);
+                               if (dht->rt_ipv6) ks_dht_distribute(dht, NULL, NULL, dht->rt_ipv6, 0, value);
                        }
-                       else searchp->next = searchn;
+               }
+                       
+               remove = value->refc == 0 && value->expiration <= now;
+               if (remove) ks_hash_remove(dht->storageitems_hash, (void *)key);
 
-                       search->next = NULL;
-                       if (searches_last) searches_last = searches_last->next = search;
-                       else searches_first = searches_last = search;
-               } else searchp = search;
-               ks_mutex_unlock(search->mutex);
-       }
-       ks_mutex_unlock(dht->searches_mutex);
+               ks_mutex_unlock(value->mutex);
 
-       for (ks_dht_search_t *search = searches_first, *searchn = NULL; search; search = searchn) {
-               searchn = search->next;
-               if (search->callback) search->callback(dht, search);
-               ks_dht_search_destroy(&search);
+               if (remove) {
+                       ks_log(KS_LOG_DEBUG, "Item expired %s\n", ks_dht_hex(value->id.id, id_buf, KS_DHT_NODEID_SIZE));
+                       ks_dht_storageitem_destroy(&value);
+               }
        }
+       ks_hash_write_unlock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_assert(dht);
+       ks_assert(job);
+       ks_mutex_lock(dht->jobs_mutex);
+    if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
+       else dht->jobs_first = dht->jobs_last = job;
+       ks_mutex_unlock(dht->jobs_mutex);
 }
 
 KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
@@ -686,21 +694,24 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
 
        ks_mutex_lock(dht->jobs_mutex);
        for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
-               ks_bool_t done = KS_FALSE;
                jobn = job->next;
 
                if (job->state == KS_DHT_JOB_STATE_QUERYING) {
                        job->state = KS_DHT_JOB_STATE_RESPONDING;
-                       if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
+                       if (job->query_callback(dht, job) != KS_STATUS_SUCCESS) {
+                               job->result = KS_DHT_JOB_RESULT_FAILURE;
+                               job->state = KS_DHT_JOB_STATE_COMPLETING;
+                       }
                }
                if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
                        job->attempts--;
                        if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
-                       else done = KS_TRUE;
+                       else {
+                               job->result = KS_DHT_JOB_RESULT_EXPIRED;
+                               job->state = KS_DHT_JOB_STATE_COMPLETING;
+                       }
                }
-               if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
-
-               if (done) {
+               if (job->state == KS_DHT_JOB_STATE_COMPLETING) {
                        if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
                        else if (!jobp) dht->jobs_first = jobn;
                        else if (!jobn) {
@@ -1359,8 +1370,11 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
        transactionid = dht->transactionid_next++;
        ks_mutex_unlock(dht->transactionid_mutex);
 
-       if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback);
+       ks_assert(trans);
+       
+       ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE);
+       ks_assert(msg);
 
        //      if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
     transactionid = htonl(transactionid);
@@ -1416,16 +1430,8 @@ KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
 
        if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) {
-               ks_dht_error(dht,
-                                        ep,
-                                        raddr,
-                                        transactionid,
-                                        transactionid_length,
-                                        202,
-                                        "Internal message create error");
-               goto done;
-       }
+       ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE);
+       ks_assert(msg);
 
     ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
        ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
@@ -1472,7 +1478,8 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data)
 
        // @todo blacklist check for bad actor nodes
 
-       if (ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE);
+       ks_assert(message);
 
        if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done;
 
@@ -1649,7 +1656,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                                                                        message->raddr.port,
                                                                        KS_DHTRT_CREATE_TOUCH,
                                                                        &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
        
        ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
        if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
@@ -1673,185 +1679,329 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                           transaction->job->raddr.host,
                           transaction->job->raddr.port);
        } else {
+               ks_dhtrt_sharelock_node(node);
                transaction->job->response = message;
-               transaction->job->response_id = message->args_id;
+               transaction->job->response_id = node;
                message->transaction = transaction;
-               if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
-               else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
+               if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->result = KS_DHT_JOB_RESULT_FAILURE;
+               transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
 
                transaction->job->response = NULL; // message is destroyed after we return, stop using it
                transaction->finished = KS_TRUE;
        }
 
  done:
+       if (node) ks_dhtrt_release_node(node);
        return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_job_t *job)
 {
+       ks_dht_search_t *search = NULL;
        ks_dht_node_t **nodes = NULL;
        ks_size_t nodes_count = 0;
-       ks_dht_node_t *node = NULL;
+       ks_dht_nodeid_t distance;
+       int32_t results_index = -1;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(job);
-       ks_assert(job->search);
+       ks_assert(job->data);
 
-       ks_mutex_lock(job->search->mutex);
-       ks_hash_remove(job->search->searching, job->response_id.id);
+       search = (ks_dht_search_t *)job->data;
+       
+       ks_mutex_lock(search->mutex);
+       search->searching--;
+
+       if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done;
 
+       ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target);
+       if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
+               results_index = search->results_length;
+               search->results_length++;
+       } else {
+               for (int32_t index = 0; index < search->results_length; ++index) {
+                       // Check if responding node is closer than the current result
+                       if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
+                               // Only existing results which are further from the target than the responding node are considered for replacement
+                               
+                               // If this is the first node that is further then keep it and keep looking for existing results which are further than this result
+                               // If additional results are further, and the current result is further than a previous result, use the current result as furthest to replace
+                               if (results_index < 0) results_index = index;
+                               else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
+                       }
+               }
+       }
+
+       if (results_index >= 0) {
+               // The results are either not full yet, or this responding node is closer than the furthest existing result
+               char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+               char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+               char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+
+               ks_log(KS_LOG_DEBUG,
+                          "Set closer node id %s (%s) in search of target id %s at results index %d\n",
+                          ks_dht_hex(job->response_id->nodeid.id, id_buf, KS_DHT_NODEID_SIZE),
+                          ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+                          ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
+                          results_index);
+
+               if (search->results[results_index]) ks_dhtrt_release_node(search->results[results_index]);
+               ks_dhtrt_sharelock_node(job->response_id);
+
+               search->results[results_index] = job->response_id;
+               search->distances[results_index] = distance;
+       }
+       
        nodes = job->raddr.family == AF_INET ? job->response_nodes : job->response_nodes6;
        nodes_count = job->raddr.family == AF_INET ? job->response_nodes_count : job->response_nodes6_count;
        
        for (int32_t i = 0; i < nodes_count; ++i) {
-               ks_dht_nodeid_t distance;
-               int32_t results_index = -1;
-               
-               node = nodes[i];
+               ks_bool_t closer = KS_FALSE;
+               ks_dht_node_t *node = nodes[i];
 
-               if (ks_hash_search(job->search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
+               // skip duplicates already searched
+               if (ks_hash_search(search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
 
-               ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &job->search->target);
-               if (job->search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
-                       results_index = job->search->results_length;
-                       job->search->results_length++;
-               } else {
-                       for (int32_t index = 0; index < job->search->results_length; ++index) {
-                               // Check if new node is closer than this previous result
-                               if (memcmp(distance.id, job->search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
-                                       // If this is the first node that is further then keep it
-                                       // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
-                                       if (results_index < 0) results_index = index;
-                                       else if (memcmp(job->search->distances[index].id, job->search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
-                               }
-                       }
-               }
+               // calculate distance of new node from target
+               ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &search->target);
 
-               if (results_index >= 0) {
-                       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                       char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
-                       char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+               // if the results are not full, or the new node is closer than any result then the new node should be checked
+               if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) closer = KS_TRUE;
+               for (int32_t index = 0; !closer && index < search->results_length; ++index) {
+                       // Check if new node is closer than this current result
+                       closer = memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0;
+               }
 
-                       ks_log(KS_LOG_DEBUG,
-                                  "Set closer node id %s (%s) in search of target id %s at results index %d\n",
-                                  ks_dht_hex(node->nodeid.id, id_buf, KS_DHT_NODEID_SIZE),
-                                  ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
-                                  ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
-                                  results_index);
-
-                       if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]);
-                       job->search->results[results_index] = node;
-                       job->search->distances[results_index] = distance;
-
-                       ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE);
-                       ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE);
+               if (closer) {
+                       // track new node as searched and searching then send off a findnode query to validate it as a result and end up back here for new closer nodes
+                       ks_hash_insert(search->searched, node->nodeid.id, (void *)KS_TRUE);
+                       search->searching++;
                        
-                       ks_dhtrt_sharelock_node(node);
-
-                       if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done;
+                       ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
                }
        }
-       
+
  done:
-       ks_mutex_unlock(job->search->mutex);
+       ks_mutex_unlock(search->mutex);
+       
+       if (search->searching == 0) {
+               if (search->callback) search->callback(dht, job);
+               ks_dht_search_destroy(&search);
+       }
 
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
-                                                                                          int32_t family,
-                                                                                          ks_dht_nodeid_t *target,
-                                                                                          ks_dht_search_callback_t callback,
-                                                                                          ks_dht_search_t **search)
+KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_bool_t locked_searches = KS_FALSE;
        ks_bool_t locked_search = KS_FALSE;
-       ks_dhtrt_routetable_t *rt = NULL;
-       ks_dht_search_t *s = NULL;
+       ks_dht_search_t *search = NULL;
     ks_dhtrt_querynodes_t query;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(family == AF_INET || family == AF_INET6);
-       ks_assert(target);
-
-       if (search) *search = NULL;
-
-       if (family == AF_INET) {
-               if (!dht->rt_ipv4) {
-                       ret = KS_STATUS_FAIL;
-                       goto done;
-               }
-               rt = dht->rt_ipv4;
-       } else {
-               if (!dht->rt_ipv6) {
-                       ret = KS_STATUS_FAIL;
-                       goto done;
-               }
-               rt = dht->rt_ipv6;
-       }
-
-       ks_mutex_lock(dht->searches_mutex);
-       locked_searches = KS_TRUE;
+       ks_assert(job);
+       ks_assert(job->data);
 
-       if ((ret = ks_dht_search_create(&s, dht->pool, target, callback)) != KS_STATUS_SUCCESS) goto done;
+       search = (ks_dht_search_t *)job->data;
 
-    if (dht->searches_last) dht->searches_last = dht->searches_last->next = s;
-       else dht->searches_first = dht->searches_last = s;
-       
-       ks_mutex_lock(s->mutex);
+       ks_mutex_lock(search->mutex);
        locked_search = KS_TRUE;
 
-       // release searches lock now, but search is still locked
-       ks_mutex_unlock(dht->searches_mutex);
-       locked_searches = KS_FALSE;
-
        // find closest good nodes to target locally and store as the closest results
-       query.nodeid = *target;
+       query.nodeid = search->target;
        query.type = KS_DHT_REMOTE;
        query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE;
-       query.family = family;
+       query.family = search->table == dht->rt_ipv4 ? AF_INET : AF_INET6;
        query.count = 0;
-       ks_dhtrt_findclosest_nodes(rt, &query);
+       ks_dhtrt_findclosest_nodes(search->table, &query);
        for (int32_t i = 0; i < query.count; ++i) {
-               ks_dht_node_t *n = query.nodes[i];
-               ks_bool_t searched = KS_FALSE;
+               ks_dht_node_t *node = query.nodes[i];
 
-               // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet
-               s->results[s->results_length] = n;
-               ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target);
-               s->results_length++;
+               // skip duplicates already searched, this really shouldn't happen on a new search but we sanity check
+               if (ks_hash_search(search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue;
+               
+               ks_hash_insert(search->searched, node->nodeid.id, (void *)KS_TRUE);
+               search->searching++;
 
-               searched = ks_hash_search(s->searched, n->nodeid.id, KS_UNLOCKED) != 0;
-               if (searched) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
+               ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
+       }
+       ks_dhtrt_release_querynodes(&query);
 
-               ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE);
-               ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE);
+       // done:
+       if (locked_search) ks_mutex_unlock(search->mutex);
 
-               ks_dhtrt_sharelock_node(n);
-               
-               if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) {
-                       ks_dhtrt_release_querynodes(&query);
-                       goto done;
-               }
+       if (search->searching == 0) {
+               if (search->callback) search->callback(dht, job);
+               ks_dht_search_destroy(&search);
        }
-       ks_dhtrt_release_querynodes(&query);
-       ks_mutex_unlock(s->mutex);
-       locked_search = KS_FALSE;
+       
+       return ret;
+}
+
+KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
+                                                          ks_dht_job_callback_t callback,
+                                                          void *data,
+                                                          ks_dhtrt_routetable_t *table,
+                                                          ks_dht_nodeid_t *target)
+{
+       ks_dht_search_t *search = NULL;
+       ks_dht_job_t *job = NULL;
+
+       ks_assert(dht);
+       ks_assert(table);
+       ks_assert(target);
+
+       ks_dht_search_create(&search, dht->pool, table, target, callback, data);
+       ks_assert(search);
+       
+       ks_dht_job_create(&job, dht->pool, NULL, 3, search);
+       ks_assert(job);
+       
+       ks_dht_job_build_search(job, ks_dht_query_search, NULL);
+       ks_dht_jobs_add(dht, job);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_dht_publish_t *publish = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(job);
+       ks_assert(job->data);
+
+       publish = (ks_dht_publish_t *)job->data;
 
-       if (search) *search = s;
+       // @todo callbacks need job to contain cascaded publish->data before calling
+       if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
+               job->data = publish->data;
+               if (publish->callback) publish->callback(dht, job);
+               goto done;
+       }
+
+       if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) {
+               ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item);
+       } else if (publish->callback) {
+               job->data = publish->data;
+               publish->callback(dht, job);
+       }
 
  done:
-       if (locked_searches) ks_mutex_unlock(dht->searches_mutex);
-       if (locked_search) ks_mutex_unlock(s->mutex);
-       if (ret != KS_STATUS_SUCCESS) {
-               //if (s) ks_dht_search_destroy(&s);
-               *search = NULL;
+       
+       ks_dht_publish_destroy(&publish);
+       return ret;
+}
+
+KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
+                                                               const ks_sockaddr_t *raddr,
+                                                               ks_dht_job_callback_t callback,
+                                                               void *data,
+                                                               int64_t cas,
+                                                               ks_dht_storageitem_t *item)
+{
+       ks_dht_publish_t *publish = NULL;
+       const uint8_t *salt = NULL;
+       size_t salt_length = 0;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(cas >= 0);
+       ks_assert(item);
+
+       if (item->salt) {
+               salt = (const uint8_t *)ben_str_val(item->salt);
+               salt_length = ben_str_len(item->salt);
+       }
+
+       ks_dht_publish_create(&publish, dht->pool, callback, data, cas, item);
+       ks_assert(publish);
+
+       ks_dht_get(dht, raddr, ks_dht_publish_get_callback, publish, &item->id, salt, salt_length);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_distribute_publish_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_dht_distribute_t *distribute = NULL;
+       ks_bool_t finished = KS_FALSE;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(job);
+       ks_assert(job->data);
+
+       distribute = (ks_dht_distribute_t *)job->data;
+       ks_mutex_lock(distribute->mutex);
+       distribute->publishing--;
+       finished = distribute->publishing == 0;
+       ks_mutex_unlock(distribute->mutex);
+       
+       if (finished) {
+               if (distribute->callback) distribute->callback(dht, distribute->item);
+               ks_dht_distribute_destroy(&distribute);
        }
+
        return ret;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_distribute_search_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_dht_search_t *search = NULL;
+       ks_dht_distribute_t *distribute = NULL;
+       ks_bool_t finished = KS_FALSE;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(job);
+       ks_assert(job->data);
+
+       search = (ks_dht_search_t *)job->data;
+       ks_assert(search->data);
+       
+       distribute = (ks_dht_distribute_t *)search->data;
+       
+       ks_mutex_lock(distribute->mutex);
+       for (int32_t index = 0; index < search->results_length; ++index) {
+               ks_dht_node_t *node = search->results[index];
+               if (node->type == KS_DHT_LOCAL) continue;
+               
+               distribute->publishing++;
+               ks_dht_publish(dht, &node->addr, ks_dht_distribute_publish_callback, distribute, distribute->cas, distribute->item);
+       }
+       finished = distribute->publishing == 0;
+       ks_mutex_unlock(distribute->mutex);
+       
+       if (finished) {
+               if (distribute->callback) distribute->callback(dht, distribute->item);
+               ks_dht_distribute_destroy(&distribute);
+       }
+
+       ks_dht_search_destroy(&search);
+
+       return ret;
+}
+
+KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
+                                                                  ks_dht_storageitem_callback_t callback,
+                                                                  void *data,
+                                                                  ks_dhtrt_routetable_t *table,
+                                                                  int64_t cas,
+                                                                  ks_dht_storageitem_t *item)
+{
+       ks_dht_distribute_t *distribute = NULL;
+
+       ks_assert(dht);
+       ks_assert(table);
+       ks_assert(cas >= 0);
+       ks_assert(item);
+
+       ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item);
+       ks_assert(distribute);
+
+       ks_dht_search(dht, ks_dht_distribute_search_callback, distribute, table, &item->id);
+}
+
 KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht)
 {
        ks_assert(dht);
@@ -1878,10 +2028,15 @@ KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht)
 
 KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target)
 {
+       ks_dht_storageitem_t *item = NULL;
+
        ks_assert(dht);
        ks_assert(target);
 
-       return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+       item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+       if (item) ks_dht_storageitem_reference(item);
+
+       return item;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item)
@@ -1912,13 +2067,12 @@ KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
 
        if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE);
+       ks_assert(error);
 
-       //if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
     ben_dict_set(error->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
        ben_dict_set(error->data, ben_blob("y", 1), ben_blob("e", 1));
 
-       // @note e joins error->data and will be freed with it
        e = ben_list();
        ks_assert(e);
        ben_dict_set(error->data, ben_blob("e", 1), e);
@@ -1997,7 +2151,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
                ret = KS_STATUS_FAIL;
                goto done;
        }
-
+       transaction->job->result = KS_DHT_JOB_RESULT_ERROR;
+       transaction->job->error_code = errorcode;
+       transaction->job->error_description = ben_clone(es);
+       transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
        transaction->finished = KS_TRUE;
 
        ks_hash_read_lock(dht->registry_error);
@@ -2011,35 +2168,21 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        return ret;
 }
 
-KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
-{
-       ks_assert(dht);
-       ks_assert(job);
-       ks_mutex_lock(dht->jobs_mutex);
-    if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
-       else dht->jobs_first = dht->jobs_last = job;
-       ks_mutex_unlock(dht->jobs_mutex);
-}
-
 
-KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
+KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data)
 {
        ks_dht_job_t *job = NULL;
-       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
 
        //ks_log(KS_LOG_DEBUG, "Starting ping!\n");
 
-       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+       ks_assert(job);
+       
        ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
        ks_dht_jobs_add(dht, job);
-
-       // next step in ks_dht_pulse_jobs with QUERYING state
-
- done:
-       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
@@ -2105,27 +2248,23 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t
 }
 
 
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
-                                                                               ks_dht_search_t *search,
-                                                                               const ks_sockaddr_t *raddr,
-                                                                               ks_dht_job_callback_t callback,
-                                                                               ks_dht_nodeid_t *target)
+KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht,
+                                                                const ks_sockaddr_t *raddr,
+                                                                ks_dht_job_callback_t callback,
+                                                                void *data,
+                                                                ks_dht_nodeid_t *target)
 {
        ks_dht_job_t *job = NULL;
-       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
        ks_assert(target);
 
-       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
-       ks_dht_job_build_findnode(job, search, ks_dht_query_findnode, callback, target);
+       ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+       ks_assert(job);
+       
+       ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target);
        ks_dht_jobs_add(dht, job);
-
-       // next step in ks_dht_pulse_jobs with QUERYING state
-
- done:
-       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
@@ -2366,27 +2505,25 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
 }
 
 
-KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
-                                                                  ks_dht_search_t *search,
-                                                                  const ks_sockaddr_t *raddr,
-                                                                  ks_dht_job_callback_t callback,
-                                                                  ks_dht_nodeid_t *target,
-                                                                  uint8_t *salt,
-                                                                  ks_size_t salt_length)
+KS_DECLARE(void) ks_dht_get(ks_dht_t *dht,
+                                                       const ks_sockaddr_t *raddr,
+                                                       ks_dht_job_callback_t callback,
+                                                       void *data,
+                                                       ks_dht_nodeid_t *target,
+                                                       const uint8_t *salt,
+                                                       ks_size_t salt_length)
 {
        ks_dht_job_t *job = NULL;
-       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
        ks_assert(target);
 
-       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
-       ks_dht_job_build_get(job, search, ks_dht_query_get, callback, target, salt, salt_length);
+       ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+       ks_assert(job);
+       
+       ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
        ks_dht_jobs_add(dht, job);
-
- done:
-       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
@@ -2407,13 +2544,18 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
                                                   &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_hash_read_lock(dht->storageitems_hash);
-       item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+       item = ks_dht_storageitems_find(dht, &job->query_target);
+       if (item) ks_mutex_lock(item->mutex);
        ks_hash_read_unlock(dht->storageitems_hash);
 
        if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
        ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
 
        //ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+       if (item) {
+               ks_dht_storageitem_dereference(item);
+               ks_mutex_unlock(item->mutex);
+       }
        ks_q_push(dht->send_q, (void *)message);
 
        return KS_STATUS_SUCCESS;
@@ -2460,11 +2602,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
 
        ks_hash_read_lock(dht->storageitems_hash);
-       item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+       item = ks_dht_storageitems_find(dht, target);
+       if (item) {
+               ks_mutex_lock(item->mutex);
+               item->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+       }
        ks_hash_read_unlock(dht->storageitems_hash);
 
+       // If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back
        sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
-       // @todo if sequence is provided then requester has the data, so if the local sequence is lower maybe send a get to the requester to update local data?
+       // @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower
+       // maybe send a get query to the requester to update the local data
 
        query.nodeid = *target;
        query.type = KS_DHT_REMOTE;
@@ -2553,6 +2701,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_q_push(dht->send_q, (void *)response);
 
  done:
+       if (item) {
+               ks_dht_storageitem_dereference(item);
+               ks_mutex_unlock(item->mutex);
+       }
        return ret;
 }
 
@@ -2589,7 +2741,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
 
        seq = ben_dict_get_by_str(job->response->args, "seq");
-       if (seq) sequence = ben_int_val(seq);
+       if (seq) {
+               sequence = ben_int_val(seq);
+               job->response_hasitem = KS_TRUE;
+               job->response_seq = sequence;
+       }
 
        if (seq && ((k && !sig) || (!k && sig))) {
                ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data");
@@ -2598,6 +2754,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        }
 
        v = ben_dict_get_by_str(job->response->args, "v");
+       if (v) job->response_hasitem = KS_TRUE;
        //if (v) v_len = ben_str_len(v);
 
        n = ben_dict_get_by_str(job->response->args, "nodes");
@@ -2650,7 +2807,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        
        ks_hash_write_lock(dht->storageitems_hash);
        storageitems_locked = KS_TRUE;
-       olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
+       olditem = ks_dht_storageitems_find(dht, &job->query_target);
+       if (olditem) ks_mutex_lock(olditem->mutex);
 
        if (v) {
                ks_dht_nodeid_t tmptarget;
@@ -2664,11 +2822,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
                                goto done;
                        }
                        if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
-                       else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
-                                                                                                                                                dht->pool,
-                                                                                                                                                &tmptarget,
-                                                                                                                                                v,
-                                                                                                                                                KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+                       else {
+                               ks_dht_storageitem_create_immutable_internal(&item, dht->pool, &tmptarget, v, KS_TRUE);
+                               ks_assert(item);
+                       }
                } else {
                        // mutable
                        if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
@@ -2692,19 +2849,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
                                        if (ben_cmp(olditem->v, v) != 0) {
                                                goto done;
                                        }
-                               } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+                               } else {
+                                       ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+                                       if (olditem->callback) olditem->callback(dht, olditem);
+                               }
                                olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
                        }
-                       else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
-                                                                                                                                          dht->pool,
-                                                                                                                                          &tmptarget,
-                                                                                                                                          v,
-                                                                                                                                          KS_TRUE,
-                                                                                                                                          k,
-                                                                                                                                          job->query_salt,
-                                                                                                                                          KS_TRUE,
-                                                                                                                                          sequence,
-                                                                                                                                          sig)) != KS_STATUS_SUCCESS) goto done;
+                       else {
+                               ks_dht_storageitem_create_mutable_internal(&item, dht->pool, &tmptarget, v, KS_TRUE, k, job->query_salt, KS_TRUE, sequence, sig);
+                               ks_assert(item);
+                       }
                }
                if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
        } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
@@ -2712,35 +2866,40 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        if (item) job->response_storageitem = item;
        else if (olditem) job->response_storageitem = olditem;
 
+       if (job->response_storageitem) ks_dht_storageitem_reference(job->response_storageitem);
+
  done:
+       if (olditem) {
+               ks_dht_storageitem_dereference(olditem);
+               ks_mutex_unlock(olditem->mutex);
+       }
+       if (item) ks_dht_storageitem_dereference(item);
        if (ret != KS_STATUS_SUCCESS) {
        }
        if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
        return ret;
 }
 
-// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
-KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
-                                                                  const ks_sockaddr_t *raddr,
-                                                                  ks_dht_job_callback_t callback,
-                                                                  ks_dht_token_t *token,
-                                                                  int64_t cas,
-                                                                  ks_dht_storageitem_t *item)
+KS_DECLARE(void) ks_dht_put(ks_dht_t *dht,
+                                                       const ks_sockaddr_t *raddr,
+                                                       ks_dht_job_callback_t callback,
+                                                       void *data,
+                                                       ks_dht_token_t *token,
+                                                       int64_t cas,
+                                                       ks_dht_storageitem_t *item)
 {
        ks_dht_job_t *job = NULL;
-       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
        ks_assert(token);
        ks_assert(item);
 
-       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_job_create(&job, dht->pool, raddr, 3, data);
+       ks_assert(job);
+       
        ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item);
        ks_dht_jobs_add(dht, job);
-
- done:
-       return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
@@ -2907,7 +3066,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        ks_hash_write_lock(dht->storageitems_hash);
        storageitems_locked = KS_TRUE;
        
-       olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
+       olditem = ks_dht_storageitems_find(dht, &target);
+       if (olditem) ks_mutex_lock(olditem->mutex);
 
        if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
                ks_log(KS_LOG_DEBUG, "Invalid token\n");
@@ -2928,19 +3088,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        if (!seq) {
                // immutable
                if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
-               else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
-                                                                                                                                        dht->pool,
-                                                                                                                                        &target,
-                                                                                                                                        v,
-                                                                                                                                        KS_TRUE)) != KS_STATUS_SUCCESS) {
-                       ks_dht_error(dht,
-                                                message->endpoint,
-                                                &message->raddr,
-                                                message->transactionid,
-                                                message->transactionid_length,
-                                                202,
-                                                "Internal storage item create immutable error");
-                       goto done;
+               else {
+                       ks_dht_storageitem_create_immutable_internal(&item, dht->pool, &target, v, KS_TRUE);
+                       ks_assert(item);
                }
        } else {
                // mutable
@@ -2989,27 +3139,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
                                                                 "Message query put sequence is equal to current but values are different");
                                        goto done;
                                }
-                       } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+                       } else {
+                               ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+                               if (olditem->callback) olditem->callback(dht, olditem);
+                       }
                        olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
                }
-               else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
-                                                                                                                                  dht->pool,
-                                                                                                                                  &target,
-                                                                                                                                  v,
-                                                                                                                                  KS_TRUE,
-                                                                                                                                  k,
-                                                                                                                                  salt,
-                                                                                                                                  KS_TRUE,
-                                                                                                                                  sequence,
-                                                                                                                                  sig)) != KS_STATUS_SUCCESS) {
-                       ks_dht_error(dht,
-                                                message->endpoint,
-                                                &message->raddr,
-                                                message->transactionid,
-                                                message->transactionid_length,
-                                                202,
-                                                "Internal storage item create mutable error");
-                       goto done;
+               else {
+                       ks_dht_storageitem_create_mutable_internal(&item, dht->pool, &target, v, KS_TRUE, k, salt, KS_TRUE, sequence, sig);
+                       ks_assert(item);
                }
        }
        if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item);
@@ -3024,8 +3162,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
 
        //ks_log(KS_LOG_DEBUG, "Sending message response put\n");
        ks_q_push(dht->send_q, (void *)response);
+       
+       //if (dht->rt_ipv4) ks_dht_distribute(dht, AF_INET, NULL, NULL, 0, olditem ? olditem : item);
+       //if (dht->rt_ipv6) ks_dht_distribute(dht, AF_INET6, NULL, NULL, 0, olditem ? olditem : item);
 
  done:
+       if (olditem) {
+               ks_dht_storageitem_dereference(olditem);
+               ks_mutex_unlock(olditem->mutex);
+       }
+       if (item) ks_dht_storageitem_dereference(item);
        if (ret != KS_STATUS_SUCCESS) {
                if (item) ks_hash_remove(dht->storageitems_hash, item->id.id);
        }
@@ -3046,43 +3192,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job) 
-{
-        return ks_dht_search_findnode(dht,
-                                                                       job->query_family,
-                                                                       &job->query_target,
-                                                                       NULL, 
-                                                                       NULL);
-}
-
-KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
-                                                                               ks_dhtrt_routetable_t *rt, 
-                                                                               ks_dht_nodeid_t *target, 
-                                                                               ks_dht_job_callback_t callback)
-{
-       ks_dht_job_t *job = NULL;
-       ks_status_t ret = KS_STATUS_SUCCESS;
-
-       ks_assert(dht);
-       ks_assert(rt);
-       ks_assert(target);
-
-       ks_sockaddr_t taddr;   /* just to satisfy the api */
-
-       if ((ret = ks_dht_job_create(&job, dht->pool, &taddr, 3)) == KS_STATUS_SUCCESS) {
-
-               int32_t family = AF_INET;
-       
-               if (rt == dht->rt_ipv6) {
-                       family = AF_INET6;
-               }
-
-               ks_dht_job_build_search_findnode(job, target, family, ks_dht_exec_search_findnode, callback);
-       }
-
-       return ret;
-}
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index 019688682ddc28c533a18944df5e3bd488dcaa2f..a4ca006c58a1b57b07def9831dfe2cbad9bb1692 100644 (file)
@@ -31,7 +31,6 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_TRANSACTION_EXPIRATION 10
 #define KS_DHT_TRANSACTIONS_PULSE 1
 
-#define KS_DHT_SEARCH_EXPIRATION 10
 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
 
 #define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES
@@ -39,6 +38,8 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
 #define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
 #define KS_DHT_STORAGEITEM_EXPIRATION 7200
+#define KS_DHT_STORAGEITEM_KEEPALIVE 300
+#define KS_DHT_STORAGEITEMS_PULSE 10
 
 #define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
 #define KS_DHT_TOKEN_EXPIRATION 300
@@ -58,7 +59,8 @@ typedef struct ks_dht_message_s ks_dht_message_t;
 typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
 typedef struct ks_dht_transaction_s ks_dht_transaction_t;
 typedef struct ks_dht_search_s ks_dht_search_t;
-typedef struct ks_dht_search_pending_s ks_dht_search_pending_t;
+typedef struct ks_dht_publish_s ks_dht_publish_t;
+typedef struct ks_dht_distribute_s ks_dht_distribute_t;
 typedef struct ks_dht_node_s ks_dht_node_t;
 typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t;
 typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
@@ -67,7 +69,9 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
 
 typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job);
 typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
-typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
+//typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
+typedef ks_status_t (*ks_dht_storageitem_callback_t)(ks_dht_t *dht, ks_dht_storageitem_t *item);
+
 
 struct ks_dht_datagram_s {
        ks_pool_t *pool;
@@ -115,11 +119,12 @@ enum ks_dht_job_state_t {
        KS_DHT_JOB_STATE_COMPLETING,
 };
 
-//enum ks_dht_job_type_t {
-//     KS_DHT_JOB_TYPE_NONE = 0,
-//     KS_DHT_JOB_TYPE_PING,
-//     KS_DHT_JOB_TYPE_FINDNODE,
-//};
+enum ks_dht_job_result_t {
+       KS_DHT_JOB_RESULT_SUCCESS = 0,
+       KS_DHT_JOB_RESULT_EXPIRED,
+       KS_DHT_JOB_RESULT_ERROR,
+       KS_DHT_JOB_RESULT_FAILURE,
+};
 
 struct ks_dht_job_s {
        ks_pool_t *pool;
@@ -127,8 +132,7 @@ struct ks_dht_job_s {
        ks_dht_job_t *next;
 
        enum ks_dht_job_state_t state;
-
-       ks_dht_search_t *search;
+       enum ks_dht_job_result_t result;
 
        ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
        int32_t attempts;
@@ -137,6 +141,7 @@ struct ks_dht_job_s {
        ks_dht_job_callback_t query_callback;
        ks_dht_job_callback_t finish_callback;
 
+       void *data;
        ks_dht_message_t *response;
 
        // job specific query parameters
@@ -145,15 +150,22 @@ struct ks_dht_job_s {
        int64_t query_cas;
        ks_dht_token_t query_token;
        ks_dht_storageitem_t *query_storageitem;
-    uint32_t query_family;
+    int32_t query_family;
 
+       // error response parameters
+       int64_t error_code;
+       struct bencode *error_description;
+       
        // job specific response parameters
-       ks_dht_nodeid_t response_id;
+       ks_dht_node_t *response_id;
        ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
        ks_size_t response_nodes_count;
        ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
        ks_size_t response_nodes6_count;
+       
        ks_dht_token_t response_token;
+       int64_t response_seq;
+       ks_bool_t response_hasitem;
        ks_dht_storageitem_t *response_storageitem;
 };
 
@@ -219,25 +231,48 @@ struct ks_dht_transaction_s {
 
 struct ks_dht_search_s {
        ks_pool_t *pool;
-       ks_dht_search_t *next;
+       ks_dhtrt_routetable_t *table;
        ks_dht_nodeid_t target;
-       ks_dht_search_callback_t callback;
+       ks_dht_job_callback_t callback;
+       void *data;
        ks_mutex_t *mutex;
        ks_hash_t *searched;
-       ks_hash_t *searching;
+       int32_t searching;
        ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
        ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
        ks_size_t results_length;
 };
 
+struct ks_dht_publish_s {
+       ks_pool_t *pool;
+       ks_dht_job_callback_t callback;
+       void *data;
+       int64_t cas;
+       ks_dht_storageitem_t *item;
+};
+
+struct ks_dht_distribute_s {
+       ks_pool_t *pool;
+       ks_dht_storageitem_callback_t callback;
+       void *data;
+       ks_mutex_t *mutex;
+       int32_t publishing;
+       int64_t cas;
+       ks_dht_storageitem_t *item;
+};
+
 struct ks_dht_storageitem_s {
        ks_pool_t *pool;
        ks_dht_nodeid_t id;
        ks_time_t expiration;
+       ks_time_t keepalive;
        struct bencode *v;
-       
-       ks_bool_t mutable;
+
        ks_mutex_t *mutex;
+       volatile int32_t refc;
+       ks_dht_storageitem_callback_t callback;
+
+       ks_bool_t mutable;
        ks_dht_storageitem_pkey_t pk;
        ks_dht_storageitem_skey_t sk;
        struct bencode *salt;
@@ -282,15 +317,12 @@ struct ks_dht_s {
        ks_dhtrt_routetable_t *rt_ipv4;
        ks_dhtrt_routetable_t *rt_ipv6;
 
-       ks_mutex_t *searches_mutex;
-       ks_dht_search_t *searches_first;
-       ks_dht_search_t *searches_last;
-
        ks_time_t tokens_pulse;
        volatile uint32_t token_secret_current;
        volatile uint32_t token_secret_previous;
        ks_time_t token_secret_expiration;
 
+       ks_time_t storageitems_pulse;
        ks_hash_t *storageitems_hash;
 };
 
@@ -395,7 +427,22 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem
                                                                                                                          int64_t sequence,
                                                                                                                          const uint8_t *value,
                                                                                                                          ks_size_t value_length);
-                                                                                               
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item);
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback);
+                                               
 /**
  *
  */
@@ -429,37 +476,38 @@ KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storage
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
+KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data);
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht,
-                                                                               ks_dht_search_t *search,
-                                                                               const ks_sockaddr_t *raddr,
-                                                                               ks_dht_job_callback_t callback,
-                                                                               ks_dht_nodeid_t *target);
+KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht,
+                                                                const ks_sockaddr_t *raddr,
+                                                                ks_dht_job_callback_t callback,
+                                                                void *data,
+                                                                ks_dht_nodeid_t *target);
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
-                                                                  ks_dht_search_t *search,
-                                                                  const ks_sockaddr_t *raddr,
-                                                                  ks_dht_job_callback_t callback,
-                                                                  ks_dht_nodeid_t *target,
-                                                                  uint8_t *salt,
-                                                                  ks_size_t salt_length);
+KS_DECLARE(void) ks_dht_get(ks_dht_t *dht,
+                                                       const ks_sockaddr_t *raddr,
+                                                       ks_dht_job_callback_t callback,
+                                                       void *data,
+                                                       ks_dht_nodeid_t *target,
+                                                       const uint8_t *salt,
+                                                       ks_size_t salt_length);
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
-                                                                  const ks_sockaddr_t *raddr,
-                                                                  ks_dht_job_callback_t callback,
-                                                                  ks_dht_token_t *token,
-                                                                  int64_t cas,
-                                                                  ks_dht_storageitem_t *item);
+KS_DECLARE(void) ks_dht_put(ks_dht_t *dht,
+                                                       const ks_sockaddr_t *raddr,
+                                                       ks_dht_job_callback_t callback,
+                                                       void *data,
+                                                       ks_dht_token_t *token,
+                                                       int64_t cas,
+                                                       ks_dht_storageitem_t *item);
                                                
 /**
  * Create a network search of the closest nodes to a target.
@@ -468,27 +516,29 @@ KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
  * @param target pointer to the nodeid for the target to be searched
  * @param callback an optional callback to add to the search when it is finished
  * @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output
- * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
  * @see ks_dht_search_create
- * @see ks_dht_search_callback_add
  * @see ks_hash_insert
- * @see ks_dht_search_pending_create
- * @see ks_dht_send_findnode
+ * @see ks_dht_findnode
  */
-KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
-                                                                                          int32_t family,
-                                                                                          ks_dht_nodeid_t *target,
-                                                                                          ks_dht_search_callback_t callback,
-                                                                                          ks_dht_search_t **search);
-
-KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
-                                                                                       ks_dhtrt_routetable_t *rt,
-                                                                                       ks_dht_nodeid_t *target,
-                                                                                       ks_dht_job_callback_t callback);
-                                                                               
-KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job);
-
-
+KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
+                                                          ks_dht_job_callback_t callback,
+                                                          void *data,
+                                                          ks_dhtrt_routetable_t *table,
+                                                          ks_dht_nodeid_t *target);
+
+KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
+                                                               const ks_sockaddr_t *raddr,
+                                                               ks_dht_job_callback_t callback,
+                                                               void *data,
+                                                               int64_t cas,
+                                                               ks_dht_storageitem_t *item);
+
+KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
+                                                                  ks_dht_storageitem_callback_t callback,
+                                                                  void *data,
+                                                                  ks_dhtrt_routetable_t *table,
+                                                                  int64_t cas,
+                                                                  ks_dht_storageitem_t *item);
 
 /**
  * route table methods
index 382c8155e4819b60bef3b8a9a518da07e56733f3..72063c8630ced1822df9f9b066974f2966afa78a 100644 (file)
@@ -1673,7 +1673,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
 #endif
        ks_dht_node_t* node = entry->gptr;
        ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port);
-       ks_dht_ping(internal->dht, &node->addr, NULL);
+       ks_dht_ping(internal->dht, &node->addr, NULL, NULL);
 
        return;
 }
@@ -1683,7 +1683,7 @@ void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal,
 
        char buf[100];
        ks_log(KS_LOG_DEBUG, "Find queued for target %s\n", ks_dhtrt_printableid(target->id, buf));
-       ks_dht_queue_search_findnode(internal->dht, table, target, NULL);
+       ks_dht_search(internal->dht, NULL, NULL, table, target);
     return;
 }
 
diff --git a/libs/libks/src/dht/ks_dht_distribute.c b/libs/libks/src/dht/ks_dht_distribute.c
new file mode 100644 (file)
index 0000000..24669eb
--- /dev/null
@@ -0,0 +1,65 @@
+#include "ks_dht.h"
+#include "ks_dht-int.h"
+#include "sodium.h"
+
+KS_DECLARE(ks_status_t) ks_dht_distribute_create(ks_dht_distribute_t **distribute,
+                                                                                                ks_pool_t *pool,
+                                                                                                ks_dht_storageitem_callback_t callback,
+                                                                                                void *data,
+                                                                                                int64_t cas,
+                                                                                                ks_dht_storageitem_t *item)
+{
+       ks_dht_distribute_t *d;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(distribute);
+       ks_assert(pool);
+       ks_assert(cas >= 0);
+       ks_assert(item);
+
+       *distribute = d = ks_pool_alloc(pool, sizeof(ks_dht_distribute_t));
+       ks_assert(d);
+
+       d->pool = pool;
+
+       d->callback = callback;
+       d->data = data;
+       ks_mutex_create(&d->mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+       ks_assert(d->mutex);
+       d->cas = cas;
+       d->item = item;
+       
+       ks_dht_storageitem_reference(d->item);
+
+       // done:
+       if (ret != KS_STATUS_SUCCESS) {
+               if (d) ks_dht_distribute_destroy(distribute);
+       }
+       return ret;
+}
+
+KS_DECLARE(void) ks_dht_distribute_destroy(ks_dht_distribute_t **distribute)
+{
+       ks_dht_distribute_t *d;
+
+       ks_assert(distribute);
+       ks_assert(*distribute);
+
+       d = *distribute;
+
+       if (d->mutex) ks_mutex_destroy(&d->mutex);
+       ks_dht_storageitem_dereference(d->item);
+       
+       ks_pool_free(d->pool, distribute);
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 7479a305fd9f9fe2b85ff33143cb1832593a01e8..5aeaf08ec923476f3dfec3f656c00398a4bc7c48 100644 (file)
@@ -4,7 +4,8 @@
 KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
                                                                                  ks_pool_t *pool,
                                                                                  const ks_sockaddr_t *raddr,
-                                                                                 int32_t attempts)
+                                                                                 int32_t attempts,
+                                                                                 void *data)
 {
        ks_dht_job_t *j;
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -12,7 +13,6 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
        ks_assert(job);
        ks_assert(pool);
        //ks_assert(dht);
-       ks_assert(raddr);
        ks_assert(attempts > 0 && attempts <= 10);
 
        *job = j = ks_pool_alloc(pool, sizeof(ks_dht_job_t));
@@ -20,13 +20,9 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
 
        j->pool = pool;
        j->state = KS_DHT_JOB_STATE_QUERYING;
-       j->raddr = *raddr;
+       if (raddr) j->raddr = *raddr;
        j->attempts = attempts;
-
-       //ks_mutex_lock(dht->jobs_mutex);
-       //if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = j;
-       //else dht->jobs_first = dht->jobs_last = j;
-       //ks_mutex_unlock(dht->jobs_mutex);
+       j->data = data;
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -45,7 +41,6 @@ KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t
 }
 
 KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
-                                                                                  ks_dht_search_t *search,
                                                                                   ks_dht_job_callback_t query_callback,
                                                                                   ks_dht_job_callback_t finish_callback,
                                                                                   ks_dht_nodeid_t *target)
@@ -54,25 +49,22 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
        ks_assert(query_callback);
        ks_assert(target);
 
-       job->search = search;
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
        job->query_target = *target;
 }
 
 KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
-                                                                         ks_dht_search_t *search,
                                                                          ks_dht_job_callback_t query_callback,
                                                                          ks_dht_job_callback_t finish_callback,
                                                                          ks_dht_nodeid_t *target,
-                                                                         uint8_t *salt,
+                                                                         const uint8_t *salt,
                                                                          ks_size_t salt_length)
 {
        ks_assert(job);
        ks_assert(query_callback);
        ks_assert(target);
 
-       job->search = search;
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
        job->query_target = *target;
@@ -96,23 +88,18 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
        job->query_token = *token;
        job->query_cas = cas;
        job->query_storageitem = item;
+       ks_dht_storageitem_reference(job->query_storageitem);
 }
 
-KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job,
-                                                                                  ks_dht_nodeid_t *target,
-                                                                                  uint32_t family,
-                                           ks_dht_job_callback_t query_callback,
-                                           ks_dht_job_callback_t finish_callback)
+KS_DECLARE(void) ks_dht_job_build_search(ks_dht_job_t *job,
+                                                                                ks_dht_job_callback_t query_callback,
+                                                                                ks_dht_job_callback_t finish_callback)
 {
        ks_assert(job);
-       ks_assert(target);
-       ks_assert(family);
+       ks_assert(query_callback);
 
-       job->search = NULL;
        job->query_callback = query_callback;
        job->finish_callback = finish_callback;
-       job->query_target = *target;
-       job->query_family = family;
 }
 
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
@@ -125,9 +112,15 @@ KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
        j = *job;
 
        if (j->query_salt) ben_free(j->query_salt);
+       if (j->response_id) ks_dhtrt_release_node(j->response_id);
        for (int32_t i = 0; i < j->response_nodes_count; ++i) ks_dhtrt_release_node(j->response_nodes[i]);
        for (int32_t i = 0; i < j->response_nodes6_count; ++i) ks_dhtrt_release_node(j->response_nodes6[i]);
 
+       if (j->query_storageitem) ks_dht_storageitem_dereference(j->query_storageitem);
+       if (j->response_storageitem) ks_dht_storageitem_dereference(j->response_storageitem);
+
+       if (j->error_description) ben_free(j->error_description);
+
        ks_pool_free(j->pool, job);
 }
 
diff --git a/libs/libks/src/dht/ks_dht_publish.c b/libs/libks/src/dht/ks_dht_publish.c
new file mode 100644 (file)
index 0000000..8affdd5
--- /dev/null
@@ -0,0 +1,62 @@
+#include "ks_dht.h"
+#include "ks_dht-int.h"
+#include "sodium.h"
+
+KS_DECLARE(ks_status_t) ks_dht_publish_create(ks_dht_publish_t **publish,
+                                                                                         ks_pool_t *pool,
+                                                                                         ks_dht_job_callback_t callback,
+                                                                                         void *data,
+                                                                                         int64_t cas,
+                                                                                         ks_dht_storageitem_t *item)
+{
+       ks_dht_publish_t *p;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(publish);
+       ks_assert(pool);
+       ks_assert(cas >= 0);
+       ks_assert(item);
+
+       *publish = p = ks_pool_alloc(pool, sizeof(ks_dht_publish_t));
+       ks_assert(p);
+
+       p->pool = pool;
+
+       p->callback = callback;
+       p->data = data;
+       p->cas = cas;
+       p->item = item;
+       
+       ks_dht_storageitem_reference(p->item);
+
+       // done:
+       if (ret != KS_STATUS_SUCCESS) {
+               if (p) ks_dht_publish_destroy(publish);
+       }
+       return ret;
+}
+
+KS_DECLARE(void) ks_dht_publish_destroy(ks_dht_publish_t **publish)
+{
+       ks_dht_publish_t *p;
+
+       ks_assert(publish);
+       ks_assert(*publish);
+
+       p = *publish;
+
+       ks_dht_storageitem_dereference(p->item);
+       
+       ks_pool_free(p->pool, publish);
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 313856d09ad3dbae04e1641ea7361459f546eeb5..5569d7842517571ed2a4da3f84776b67605eb794 100644 (file)
@@ -2,13 +2,19 @@
 #include "ks_dht-int.h"
 #include "sodium.h"
 
-KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback)
+KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search,
+                                                                                        ks_pool_t *pool,
+                                                                                        ks_dhtrt_routetable_t *table,
+                                                                                        const ks_dht_nodeid_t *target,
+                                                                                        ks_dht_job_callback_t callback,
+                                                                                        void *data)
 {
        ks_dht_search_t *s;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(search);
        ks_assert(pool);
+       ks_assert(table);
        ks_assert(target);
 
        *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t));
@@ -19,17 +25,17 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
        ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool);
        ks_assert(s->mutex);
 
+       s->table = table;
        memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE);
 
        s->callback = callback;
+       s->data = data;
 
        ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
        ks_assert(s->searched);
        ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE);
 
-       ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool);
-       ks_assert(s->searching);
-       ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE);
+       s->searching = 0;
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -47,7 +53,6 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
 
        s = *search;
 
-       if (s->searching) ks_hash_destroy(&s->searching);
        if (s->searched) ks_hash_destroy(&s->searched);
        if (s->mutex) ks_mutex_destroy(&s->mutex);
 
index 42bffa031928b1d8bdcd699c6de7b5b71746d06a..175e000968c0321622c9dbfcf5aa7c052d436b8d 100644 (file)
@@ -23,9 +23,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_stor
        si->id = *target;
        si->mutable = KS_FALSE;
        si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+       si->keepalive = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC);
        si->v = clone_v ? ben_clone(v) : v;
        ks_assert(si->v);
 
+       si->refc = 1;
+
        // done:
        if (ret != KS_STATUS_SUCCESS) {
                if (si) ks_dht_storageitem_destroy(item);
@@ -81,9 +84,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storag
        si->id = *target;
        si->mutable = KS_TRUE;
        si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+       si->keepalive = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC);
        si->v = clone_v ? ben_clone(v) : v;
        ks_assert(si->v);
 
+       si->refc = 1;
+
        ks_mutex_create(&si->mutex, KS_MUTEX_FLAG_DEFAULT, si->pool);
        ks_assert(si->mutex);
        
@@ -169,6 +175,33 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
        ks_pool_free(si->pool, item);
 }
 
+KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item)
+{
+       ks_assert(item);
+
+       ks_mutex_lock(item->mutex);
+       item->refc++;
+       ks_mutex_unlock(item->mutex);
+}
+
+KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item)
+{
+       ks_assert(item);
+
+       ks_mutex_lock(item->mutex);
+       item->refc--;
+       ks_mutex_unlock(item->mutex);
+
+       ks_assert(item->refc >= 0);
+}
+
+KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback)
+{
+       ks_assert(item);
+
+       item->callback = callback;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 8368ebdd247c29edc93d2a04fd53d84104d144b3..0ce9fdff849e6f6669c0d24c44dc542ef87852ad 100644 (file)
@@ -102,10 +102,10 @@ static int check_queue(ks_thread_pool_t *tp, ks_bool_t adding)
                
                need--;
        }
-
+       /*
        ks_log(KS_LOG_DEBUG, "WORKER check: adding %d need %d running %d dying %d total %d max %d\n", 
                   adding, need, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max);
-
+       */
        return need;
 }
 
@@ -129,10 +129,10 @@ static void *worker_thread(ks_thread_t *thread, void *data)
                ks_status_t status;
                
                status = ks_q_pop_timeout(tp->q, &pop, 1000);
-
+               /*
                ks_log(KS_LOG_DEBUG, "WORKER %d idle_sec %d running %d dying %d total %d max %d\n", 
                           my_id, idle_sec, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max);               
-               
+               */              
                check_queue(tp, KS_FALSE);
                
                if (status == KS_STATUS_TIMEOUT) {
index 28b79126d4b330c77a8d57262fbbdbc228a2afcf..fd4ab7964c29454c4531a28834b1e483677344f6 100644 (file)
@@ -6,6 +6,18 @@
 ks_dht_storageitem_skey_t sk;
 ks_dht_storageitem_pkey_t pk;
 
+ks_status_t dht2_updated_callback(ks_dht_t *dht, ks_dht_storageitem_t *item)
+{
+       diag("dht2_updated_callback\n");
+       return KS_STATUS_SUCCESS;
+}
+
+ks_status_t dht2_distribute_callback(ks_dht_t *dht, ks_dht_storageitem_t *item)
+{
+       diag("dht2_distribute_callback\n");
+       return KS_STATUS_SUCCESS;
+}
+
 ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job)
 {
        diag("dht2_put_callback\n");
@@ -28,13 +40,14 @@ ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job)
        mutable->sk = sk;
        ks_dht_storageitems_insert(dht, mutable);
        
-       ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable);
+       ks_dht_put(dht, &job->raddr, dht2_put_callback, NULL, &job->response_token, 0, mutable);
        return KS_STATUS_SUCCESS;
 }
 
-ks_status_t dht2_search_findnode_callback(ks_dht_t *dht, ks_dht_search_t *search)
+ks_status_t dht2_search_callback(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       diag("dht2_search_findnode_callback %d\n", search->results_length);
+       ks_dht_search_t *search = (ks_dht_search_t *)job->data;
+       diag("dht2_search_callback %d\n", search->results_length);
        return KS_STATUS_SUCCESS;
 }
 
@@ -54,11 +67,12 @@ int main() {
   ks_sockaddr_t raddr1;
   //ks_sockaddr_t raddr2;
   //ks_sockaddr_t raddr3;
-  //ks_dht_nodeid_t target;
+  ks_dht_nodeid_t target;
   //ks_dht_storageitem_t *immutable = NULL;
-  //ks_dht_storageitem_t *mutable = NULL;
-  //const char *v = "Hello World!";
-  //size_t v_len = strlen(v);
+  ks_dht_storageitem_t *mutable1 = NULL;
+  ks_dht_storageitem_t *mutable2 = NULL;
+  const char *v = "Hello World!";
+  size_t v_len = strlen(v);
   //ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b,
   //0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d,
   //0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22,
@@ -67,7 +81,7 @@ int main() {
   //0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } };
   //uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE];
   //uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE];
-  //ks_dht_storageitem_signature_t sig;
+  ks_dht_storageitem_signature_t sig;
   //char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1];
   //char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1];
   //const char *test1vector = "3:seqi1e1:v12:Hello World!";
@@ -155,7 +169,7 @@ int main() {
 
   diag("Ping test\n");
   
-  ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
+  ks_dht_ping(dht2, &raddr1, NULL, NULL); // (QUERYING)
 
   ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
   
@@ -178,7 +192,7 @@ int main() {
   ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
 
   
-  ks_dht_ping(dht3, &raddr1, NULL); // (QUERYING)
+  ks_dht_ping(dht3, &raddr1, NULL, NULL); // (QUERYING)
 
   ks_dht_pulse(dht3, 100); // Send queued ping from dht3 to dht1 (RESPONDING)
   
@@ -193,13 +207,51 @@ int main() {
   ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
 
   diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
-  for (int i = 0; i < 20; ++i) {
+  for (int i = 0; i < 10; ++i) {
          ks_dht_pulse(dht1, 100);
          ks_dht_pulse(dht2, 100);
          ks_dht_pulse(dht3, 100);
   }
   ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
 
+  // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
+
+  /*
+  diag("Find_Node test\n");
+
+  ks_dht_findnode(dht3, NULL, &raddr1, NULL, NULL, &ep2->nodeid);
+
+  ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+
+  ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+
+  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+  ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+  
+  ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
+
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  
+  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  for (int i = 0; i < 10; ++i) {
+         ks_dht_pulse(dht1, 100);
+         ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
+  }
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+  */
+
+  diag("Search test\n");
+  
+  ks_dht_search(dht3, dht2_search_callback, NULL, dht3->rt_ipv4, &ep2->nodeid);
+  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  for (int i = 0; i < 20; ++i) {
+         ks_dht_pulse(dht1, 100);
+         ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
+  }
+  
   //diag("Get test\n");
   
 
@@ -218,7 +270,7 @@ int main() {
   mutable->sk = sk;
   ks_dht_storageitems_insert(dht1, mutable);
 
-  ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0);
+  ks_dht_get(dht2, &raddr1, dht2_get_callback, NULL, &target, NULL, 0);
  
   ks_dht_pulse(dht2, 100); // send get query
 
@@ -238,66 +290,82 @@ int main() {
 
   ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
 
-  ks_dht_get(dht2, NULL, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
+  ks_dht_get(dht2, &raddr1, dht2_get_token_callback, NULL, &target, NULL, 0); // create job
   
-  ks_dht_pulse(dht2, 100); // send get query
-
-  ks_dht_pulse(dht1, 100); // receive get query and send get response
-
-  ks_dht_pulse(dht2, 100); // receive get response
-
-  ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query
-
-  ks_dht_pulse(dht1, 100); // receive put query and send put response
-
-  ks_dht_pulse(dht2, 100); // receive put response
-
-  ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
-
-  for (int i = 0; i < 10; ++i) {
+  for (int i = 0; i < 20; ++i) {
          ks_dht_pulse(dht1, 100);
          ks_dht_pulse(dht2, 100);
          ks_dht_pulse(dht3, 100);
   }
   */
 
-  // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
-
   /*
-  diag("Find_Node test\n");
-
-  ks_dht_findnode(dht3, NULL, &raddr1, NULL, &ep2->nodeid);
-
-  ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
-
-  ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
-
-  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
-
-  ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+  diag("Publish test\n");
   
-  ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING)
+  crypto_sign_keypair(pk.key, sk.key);
 
-  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+  ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
   
-  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
-  for (int i = 0; i < 10; ++i) {
+  ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+  
+  ks_dht_storageitem_create_mutable(&mutable, dht2->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+  mutable->sk = sk;
+  ks_dht_storageitems_insert(dht2, mutable);
+  
+  ks_dht_publish(dht2, &raddr1, dht2_put_callback, NULL, 0, mutable); // create job
+  
+  for (int i = 0; i < 20; ++i) {
          ks_dht_pulse(dht1, 100);
          ks_dht_pulse(dht2, 100);
          ks_dht_pulse(dht3, 100);
   }
-  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
   */
 
-  diag("Search test\n");
-  ks_dht_search_findnode(dht3, AF_INET, &ep2->nodeid, dht2_search_findnode_callback, NULL);
-  diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+  
+  diag("Distribute test\n");
+  
+  crypto_sign_keypair(pk.key, sk.key);
+
+  ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
+  
+  ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+  
+  ks_dht_storageitem_create_mutable(&mutable2, dht2->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+  mutable2->sk = sk;
+  ks_dht_storageitems_insert(dht2, mutable2);
+  
+  ks_dht_distribute(dht2, dht2_distribute_callback, NULL, dht2->rt_ipv4, 0, mutable2); // create job
+  
   for (int i = 0; i < 30; ++i) {
          ks_dht_pulse(dht1, 100);
          ks_dht_pulse(dht2, 100);
          ks_dht_pulse(dht3, 100);
   }
+  ks_dht_storageitem_dereference(mutable2);
+  ok(mutable2->refc == 0);
+
+  mutable1 = ks_dht_storageitems_find(dht1, &target);
+  ok(mutable1 != NULL);
+  
+  ks_dht_storageitem_callback(mutable1, dht2_updated_callback);
+  ks_dht_storageitem_callback(mutable2, dht2_updated_callback);
+
+  ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 2, (uint8_t *)v, v_len);
+  mutable1->seq = 2;
+  mutable1->sig = sig;
+  
+  //ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 2, (uint8_t *)v, v_len);
+  //mutable2->seq = 2;
+  //mutable2->sig = sig;
 
+  ks_dht_distribute(dht2, dht2_distribute_callback, NULL, dht2->rt_ipv4, 0, mutable2);
+  for (int i = 0; i < 30; ++i) {
+         ks_dht_pulse(dht1, 100);
+         ks_dht_pulse(dht2, 100);
+         ks_dht_pulse(dht3, 100);
+  }
+  ks_dht_storageitem_dereference(mutable1);
+  
   /* Cleanup and shutdown */
   diag("Cleanup\n");