]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Overhauled query/response handling by implementing a reusable job system...
authorShane Bryldt <astaelan@gmail.com>
Sun, 18 Dec 2016 21:15:47 +0000 (21:15 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:37 +0000 (14:59 -0600)
libs/libks/Makefile.am
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_job.c [new file with mode: 0644]
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_transaction.c
libs/libks/test/testdht2.c

index 64f366a26e3fcb3c97439eabdc6a657c07bfe8ab..ce21d3d898028e4a08729a4d26e229333b122c46 100644 (file)
@@ -14,7 +14,7 @@ libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
 libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
 libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
 libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
-libks_la_SOURCES += src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
+libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
 libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c 
 #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
 
index 24bec0c531fc933bcb135aee3528ed340f18decd..166af867c1357c285369199204049cd22f309feb 100644 (file)
@@ -19,7 +19,7 @@ KS_BEGIN_EXTERN_C
  * @see ks_addr_set
  * @see ks_dht_bind
  */
-KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
+KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
 
 /**
  * Called internally to expire various data.
@@ -131,7 +131,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
  * @param token pointer to the output token being generated
  * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
  */
-KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
+KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
 
 /**
  * Verify an opaque write token matches the provided remote address and target nodeid.
@@ -142,7 +142,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra
  * @param token pointer to the input token being compared
  * @return Either KS_TRUE if verification passes, otherwise KS_FALSE
  */
-KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
+KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
 
 /**
  * Encodes a message for transmission as a UDP datagram and sends it.
@@ -158,8 +158,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
  * Sets up the common parts of a query message.
  * Determines the local endpoint aware of autorouting, assigns the remote address, generates a transaction, and queues a callback.
  * @param dht pointer to the dht instance
- * @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one
- * @param raddr pointer to the remote address
+ * @param job pointer to the job
  * @param query string value of the query type, for example "ping"
  * @param callback callback to be called when response to transaction is received
  * @param transaction dereferenced out pointer to the allocated transaction, may be NULL to ignore output
@@ -174,11 +173,10 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
  * @see ks_dht_message_query
  * @see ks_hash_insert
  */
-KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
-                                                                                  ks_dht_endpoint_t *ep,
-                                                                                  ks_sockaddr_t *raddr,
+KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
+                                                                                  ks_dht_job_t *job,
                                                                                   const char *query,
-                                                                                  ks_dht_message_callback_t callback,
+                                                                                  ks_dht_job_callback_t callback,
                                                                                   ks_dht_transaction_t **transaction,
                                                                                   ks_dht_message_t **message,
                                                                                   struct bencode **args);
@@ -199,25 +197,27 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
  * @see ks_dht_message_init
  * @see ks_dht_message_response
  */
-KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
+KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
                                                                                          ks_dht_endpoint_t *ep,
-                                                                                         ks_sockaddr_t *raddr,
+                                                                                         const ks_sockaddr_t *raddr,
                                                                                          uint8_t *transactionid,
                                                                                          ks_size_t transactionid_length,
                                                                                          ks_dht_message_t **message,
                                                                                          struct bencode **args);
                                                                                                
                                                
-KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
-                                                                                 ks_dht_endpoint_t *ep,
-                                                                                 ks_sockaddr_t *raddr,
-                                                                                 uint8_t *transactionid,
-                                                                                 ks_size_t transactionid_length,
-                                                                                 long long errorcode,
-                                                                                 const char *errorstr);
-KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr);
-KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
-KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
+KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
+                                                                        ks_dht_endpoint_t *ep,
+                                                                        const ks_sockaddr_t *raddr,
+                                                                        uint8_t *transactionid,
+                                                                        ks_size_t transactionid_length,
+                                                                        long long errorcode,
+                                                                        const char *errorstr);
+
+KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht);
+KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job);
+KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job);
+KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job);
 
 KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data);
 
@@ -226,16 +226,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
 KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message);
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t *job);
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_job_t *job);
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job);
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
-KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t *job);
 
 
 /**
@@ -248,6 +248,20 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
                                                                                           const ks_sockaddr_t *raddr);
 KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
+                                                                                 ks_pool_t *pool,
+                                                                                 const ks_sockaddr_t *raddr,
+                                                                                 int32_t attempts);
+KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback);
+KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
+                                                                                  ks_dht_job_callback_t query_callback,
+                                                                                  ks_dht_job_callback_t finish_callback,
+                                                                                  ks_dht_nodeid_t *target);
+KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
+
 
 /**
  *
@@ -292,9 +306,9 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);
  */
 KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction,
                                                                                                  ks_pool_t *pool,
-                                                                                                 ks_sockaddr_t *raddr,
+                                                                                                 ks_dht_job_t *job,
                                                                                                  uint32_t transactionid,
-                                                                                                 ks_dht_message_callback_t callback);
+                                                                                                 ks_dht_job_callback_t callback);
 KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction);
 
 KS_END_EXTERN_C
index aaaef29fc5fad306a549a3462c2eeff7611059bd..6c7331887381a7a9a1455f41ebc0a84123a711b8 100644 (file)
@@ -120,6 +120,15 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
         */
        d->recv_buffer_length = 0;
 
+       /**
+        * Initialize the jobs mutex
+        */
+       ks_mutex_create(&d->jobs_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
+       ks_assert(d->jobs_mutex);
+
+       d->jobs_first = NULL;
+       d->jobs_last = NULL;
+
        /**
         * Initialize the transaction id mutex, should use atomic increment instead
         */
@@ -167,13 +176,13 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
        /**
         * Create the hash to store arbitrary data for BEP44.
         */
-       ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
-       ks_assert(d->storage_hash);
+       ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+       ks_assert(d->storageitems_hash);
 
        /**
-        * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
+        * The storageitems hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
         */
-       ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE);
+       ks_hash_set_keysize(d->storageitems_hash, KS_DHT_NODEID_SIZE);
 
        // done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -198,17 +207,17 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        d = *dht;
 
        /**
-        * Cleanup the storage hash and it's contents if it is allocated.
+        * Cleanup the storageitems hash and it's contents if it is allocated.
         */
-       if (d->storage_hash) {
-               for (it = ks_hash_first(d->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+       if (d->storageitems_hash) {
+               for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
                        ks_dht_storageitem_t *val;
 
                        ks_hash_this_val(it, (void **)&val);
 
                        ks_dht_storageitem_destroy(&val);
                }
-               ks_hash_destroy(&d->storage_hash);
+               ks_hash_destroy(&d->storageitems_hash);
        }
 
        /**
@@ -254,6 +263,15 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex);
        if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
 
+       /**
+        * Cleanup the jobs mutex and jobs if they are allocated.
+        */
+       for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
+               jobn = job->next;
+               ks_dht_job_destroy(&job);
+       }
+       if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
+       
        /**
         * Probably don't need this, recv_buffer_length is temporary and may change
         */
@@ -363,7 +381,7 @@ KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t
        dht->autoroute_port = port;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
+KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
 {
        // @todo lookup standard def for IPV6 max size
        char ip[48 + 1];
@@ -594,6 +612,8 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
                }
        }
 
+       ks_dht_pulse_jobs(dht);
+
        ks_dht_pulse_send(dht);
 
        ks_dht_pulse_expirations(dht);
@@ -669,6 +689,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
                ks_hash_this(it, &key, NULL, (void **)&value);
                if (value->finished) remove = KS_TRUE;
                else if (value->expiration <= now) {
+                       // if the transaction expires, so does the attached job, it may try again with a new transaction
+                       value->job->state = KS_DHT_JOB_STATE_EXPIRING;
                        ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
                        remove = KS_TRUE;
                }
@@ -910,7 +932,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
 }
 
 
-KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
+KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
 {
        SHA_CTX sha;
        uint16_t port = 0;
@@ -933,7 +955,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
+KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
 {
        ks_dht_token_t tok;
 
@@ -949,8 +971,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k
 
 KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       // @todo calculate max IPV6 payload size?
-       char buf[1001];
+       char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1];
        ks_size_t buf_len;
 
        ks_assert(dht);
@@ -973,22 +994,23 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
 }
 
 
-KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
-                                                                                  ks_dht_endpoint_t *ep,
-                                                                                  ks_sockaddr_t *raddr,
+KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
+                                                                                  ks_dht_job_t *job,
                                                                                   const char *query,
-                                                                                  ks_dht_message_callback_t callback,
+                                                                                  ks_dht_job_callback_t callback,
                                                                                   ks_dht_transaction_t **transaction,
                                                                                   ks_dht_message_t **message,
                                                                                   struct bencode **args)
 {
+       ks_dht_endpoint_t *ep = NULL;
        uint32_t transactionid;
        ks_dht_transaction_t *trans = NULL;
        ks_dht_message_t *msg = NULL;
-       ks_status_t ret = KS_STATUS_FAIL;
+       struct bencode *a = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(raddr);
+       ks_assert(job);
        ks_assert(query);
        ks_assert(callback);
        ks_assert(message);
@@ -996,18 +1018,32 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
        if (transaction) *transaction = NULL;
        *message = NULL;
 
-       if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret;
+       if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
 
     // @todo atomic increment
        ks_mutex_lock(dht->tid_mutex);
        transactionid = dht->transactionid_next++;
        ks_mutex_unlock(dht->tid_mutex);
 
-       if ((ret = ks_dht_transaction_create(&trans, dht->pool, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
+       //      if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
+    transactionid = htonl(transactionid);
+
+       ben_dict_set(msg->data, ben_blob("t", 1), ben_blob((uint8_t *)&transactionid, sizeof(uint32_t)));
+       ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("q", 1));
+       ben_dict_set(msg->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
+
+       // @note a joins msg->data and will be freed with it
+       a = ben_dict();
+       ks_assert(a);
+       ben_dict_set(msg->data, ben_blob("a", 1), a);
+
+       if (args) *args = a;
+
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
 
        *message = msg;
 
@@ -1015,8 +1051,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
 
        if (transaction) *transaction = trans;
 
-       ret = KS_STATUS_SUCCESS;
-
  done:
        if (ret != KS_STATUS_SUCCESS) {
                if (trans) ks_dht_transaction_destroy(&trans);
@@ -1026,16 +1060,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
+KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
                                                                                          ks_dht_endpoint_t *ep,
-                                                                                         ks_sockaddr_t *raddr,
+                                                                                         const ks_sockaddr_t *raddr,
                                                                                          uint8_t *transactionid,
                                                                                          ks_size_t transactionid_length,
                                                                                          ks_dht_message_t **message,
                                                                                          struct bencode **args)
 {
        ks_dht_message_t *msg = NULL;
-       ks_status_t ret = KS_STATUS_FAIL;
+       struct bencode *r = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
@@ -1048,11 +1083,20 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
 
        if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
+       //if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done;
+    ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
+       ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1));
 
-       *message = msg;
+       // @note r joins msg->data and will be freed with it
+       r = ben_dict();
+       ks_assert(r);
+       ben_dict_set(msg->data, ben_blob("r", 1), r);
 
-       ret = KS_STATUS_SUCCESS;
+       if (args) *args = r;
+
+       ben_dict_set(r, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
+
+       *message = msg;
 
  done:
        if (ret != KS_STATUS_SUCCESS) {
@@ -1103,14 +1147,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
        struct bencode *a;
        const char *qv;
        ks_size_t qv_len;
+       ks_dht_nodeid_t *id;
+       ks_dht_node_t *node;
        char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE];
        ks_dht_message_callback_t callback;
-       ks_status_t ret = KS_STATUS_FAIL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
 
-       // @todo start of ks_dht_message_parse_query
     q = ben_dict_get_by_str(message->data, "q");
     if (!q) {
                ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
@@ -1121,7 +1167,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
        qv_len = ben_str_len(q);
     if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
                ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len);
-               return KS_STATUS_FAIL;
+               ret = KS_STATUS_FAIL;
+               goto done;
        }
 
        memcpy(query, qv, qv_len);
@@ -1131,42 +1178,73 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
        a = ben_dict_get_by_str(message->data, "a");
        if (!a) {
                ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n");
-               return KS_STATUS_FAIL;
+               ret = KS_STATUS_FAIL;
+               goto done;
        }
-       // @todo end of ks_dht_message_parse_query
 
        message->args = a;
 
+    if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+       message->args_id = *id;
+
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
+                                                                       *id,
+                                                                       KS_DHT_REMOTE,
+                                                                       message->raddr.host,
+                                                                       message->raddr.port,
+                                                                       &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
+
        callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
        ks_hash_read_unlock(dht->registry_query);
 
        if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
        else ret = callback(dht, message);
 
+ done:
        return ret;
 }
 
 KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message)
 {
        struct bencode *r;
+       ks_dht_nodeid_t *id;
+       ks_dht_node_t *node;
        ks_dht_transaction_t *transaction;
        uint32_t *tid;
        uint32_t transactionid;
-       ks_status_t ret = KS_STATUS_FAIL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+       ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
 
-       // @todo start of ks_dht_message_parse_response
        r = ben_dict_get_by_str(message->data, "r");
        if (!r) {
                ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n");
-               return KS_STATUS_FAIL;
+               ret = KS_STATUS_FAIL;
+               goto done;
        }
-       // @todo end of ks_dht_message_parse_response
-
+       
        message->args = r;
+       
+       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+       message->args_id = *id;
+
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
+                                                                       *id,
+                                                                       KS_DHT_REMOTE,
+                                                                       message->raddr.host,
+                                                                       message->raddr.port,
+                                                                       &node)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
+       
+       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+       if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
 
+       
        tid = (uint32_t *)message->transactionid;
        transactionid = ntohl(*tid);
 
@@ -1174,19 +1252,22 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
        ks_hash_read_unlock(dht->transactions_hash);
 
        if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
-       else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+       else if (!ks_addr_cmp(&message->raddr, &transaction->job->raddr)) {
                ks_log(KS_LOG_DEBUG,
                           "Message response rejected due to spoofing from %s %d, expected %s %d\n",
                           message->raddr.host,
                           message->raddr.port,
-                          transaction->raddr.host,
-                          transaction->raddr.port);
+                          transaction->job->raddr.host,
+                          transaction->job->raddr.port);
        } else {
+               transaction->job->response = message;
+               transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
                message->transaction = transaction;
-               ret = transaction->callback(dht, message);
+               if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
                transaction->finished = KS_TRUE;
        }
 
+ done:
        return ret;
 }
 
@@ -1286,9 +1367,9 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
                        ks_dht_search_pending_destroy(&pending);
                        goto done;
                }
-               if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
+               if ((ret = ks_dht_findnode(dht, &n->addr, NULL, target)) != KS_STATUS_SUCCESS) goto done;
        }
-       // @todo release closest local query node locks
+       ks_dhtrt_release_querynodes(&query);
        ks_mutex_unlock(s->mutex);
        locked_search = KS_FALSE;
 
@@ -1305,13 +1386,13 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
 }
 
 
-KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
-                                                                                 ks_dht_endpoint_t *ep,
-                                                                                 ks_sockaddr_t *raddr,
-                                                                                 uint8_t *transactionid,
-                                                                                 ks_size_t transactionid_length,
-                                                                                 long long errorcode,
-                                                                                 const char *errorstr)
+KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
+                                                                        ks_dht_endpoint_t *ep,
+                                                                        const ks_sockaddr_t *raddr,
+                                                                        uint8_t *transactionid,
+                                                                        ks_size_t transactionid_length,
+                                                                        long long errorcode,
+                                                                        const char *errorstr)
 {
        ks_dht_message_t *error = NULL;
        struct bencode *e = NULL;
@@ -1326,7 +1407,15 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
 
        if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
 
-       if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
+       //if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
+    ben_dict_set(error->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
+       ben_dict_set(error->data, ben_blob("y", 1), ben_blob("e", 1));
+
+       // @note e joins error->data and will be freed with it
+       e = ben_list();
+       ks_assert(e);
+       ben_dict_set(error->data, ben_blob("e", 1), e);
+
 
        ben_list_append(e, ben_int(errorcode));
        ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
@@ -1357,7 +1446,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        ks_assert(dht);
        ks_assert(message);
 
-       // @todo start of ks_dht_message_parse_error
        e = ben_dict_get_by_str(message->data, "e");
        if (!e) {
                ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n");
@@ -1376,7 +1464,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
 
        memcpy(error, et, es_len);
        error[es_len] = '\0';
-       // @todo end of ks_dht_message_parse_error
 
        message->args = e;
 
@@ -1392,13 +1479,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
                goto done;
        }
 
-       if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+       if (!ks_addr_cmp(&message->raddr, &transaction->job->raddr)) {
                ks_log(KS_LOG_DEBUG,
                           "Message error rejected due to spoofing from %s %d, expected %s %d\n",
                           message->raddr.host,
                           message->raddr.port,
-                          transaction->raddr.host,
-                          transaction->raddr.port);
+                          transaction->job->raddr.host,
+                          transaction->job->raddr.port);
                ret = KS_STATUS_FAIL;
                goto done;
        }
@@ -1415,26 +1502,92 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        return ret;
 }
 
+KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_assert(dht);
+       ks_assert(job);
+
+       ks_mutex_lock(dht->jobs_mutex);
+    if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job;
+       else dht->jobs_first = dht->jobs_last = job;
+       ks_mutex_unlock(dht->jobs_mutex);
+}
 
-KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
+KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
 {
-       ks_dht_message_t *message = NULL;
-       struct bencode *a = NULL;
+       ks_assert(dht);
+
+       ks_mutex_lock(dht->jobs_mutex);
+       for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
+               ks_bool_t remove = KS_FALSE;
+               jobn = job->next;
+               switch (job->state) {
+               case KS_DHT_JOB_STATE_QUERYING:
+                       if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
+                       break;
+               case KS_DHT_JOB_STATE_RESPONDING:
+                       break;
+               case KS_DHT_JOB_STATE_EXPIRING:
+                       job->attempts--;
+                       if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
+                       else {
+                               if (job->finish_callback) job->finish_callback(dht, job);
+                               remove = KS_TRUE;
+                       }
+                       break;
+               case KS_DHT_JOB_STATE_PROCESSING:
+                       break;
+               case KS_DHT_JOB_STATE_COMPLETING:
+                       if (job->finish_callback) job->finish_callback(dht, job);
+                       remove = KS_TRUE;
+                       break;
+               default: break;
+               }
+
+               if (remove) {
+                       if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
+                       else if (!jobp) dht->jobs_first = jobn;
+                       else if (!jobn) dht->jobs_last = jobp;
+                       else jobp->next = jobn;
+                       ks_dht_job_destroy(&job);
+               } else jobp = job;
+       }
+       ks_mutex_unlock(dht->jobs_mutex);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
+{
+       ks_dht_job_t *job = NULL;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(raddr);
 
-       if ((ret = ks_dht_setup_query(dht,
-                                                                 ep,
-                                                                 raddr,
+       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
+       ks_dht_jobs_add(dht, job);
+
+       // next step in ks_dht_pulse_jobs with QUERYING state
+
+ done:
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
+{
+       ks_dht_message_t *message = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(job);
+
+       if ((ret = ks_dht_query_setup(dht,
+                                                                 job,
                                                                  "ping",
                                                                  ks_dht_process_response_ping,
                                                                  NULL,
                                                                  &message,
-                                                                 &a)) != KS_STATUS_SUCCESS) goto done;
-
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+                                                                 NULL)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
        ks_q_push(dht->send_q, (void *)message);
@@ -1445,37 +1598,22 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       ks_dht_nodeid_t *id;
        ks_dht_message_t *response = NULL;
-       struct bencode *r = NULL;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
-       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
        ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
 
-       if ((ret = ks_dht_setup_response(dht,
+       if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
                                                                         &message->raddr,
                                                                         message->transactionid,
                                                                         message->transactionid_length,
                                                                         &response,
-                                                                        &r)) != KS_STATUS_SUCCESS) goto done;
-
-       ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+                                                                        NULL)) != KS_STATUS_SUCCESS) goto done;
 
        ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
        ks_q_push(dht->send_q, (void *)response);
@@ -1484,36 +1622,42 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_dht_nodeid_t *id;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
-       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(message);
+       ks_assert(job);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
+       ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
 
-       routetable = message->endpoint->node->table;
+       job->state = KS_DHT_JOB_STATE_COMPLETING;
 
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-       
-       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
+       // done:
+       return ret;
+}
 
-       ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
+
+KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target)
+{
+       ks_dht_job_t *job = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(target);
+
+       if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
+       ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target);
+       ks_dht_jobs_add(dht, job);
+
+       // next step in ks_dht_pulse_jobs with QUERYING state
 
  done:
        return ret;
 }
 
-
-KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
 {
        ks_dht_transaction_t *transaction = NULL;
        ks_dht_message_t *message = NULL;
@@ -1521,24 +1665,22 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(targetid);
+       ks_assert(job);
 
-       if ((ret = ks_dht_setup_query(dht,
-                                                                 ep,
-                                                                 raddr,
+       if ((ret = ks_dht_query_setup(dht,
+                                                                 job,
                                                                  "find_node",
                                                                  ks_dht_process_response_findnode,
                                                                  &transaction,
                                                                  &message,
                                                                  &a)) != KS_STATUS_SUCCESS) goto done;
 
-       memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE);
+       //memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE);
+       transaction->target = job->target;
 
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-       ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
        // Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
-       if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, targetid->id, KS_DHT_NODEID_SIZE)) {
+       if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) {
                struct bencode *want = ben_list();
                ben_list_append_str(want, "n4");
                ben_list_append_str(want, "n6");
@@ -1554,7 +1696,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       ks_dht_nodeid_t *id;
        ks_dht_nodeid_t *target;
        struct bencode *want;
        ks_bool_t want4 = KS_FALSE;
@@ -1565,8 +1706,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        uint8_t buffer6[1000];
        ks_size_t buffer4_length = 0;
        ks_size_t buffer6_length = 0;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
        ks_dhtrt_querynodes_t query;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
@@ -1575,8 +1714,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        ks_assert(message);
        ks_assert(message->args);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
        if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
 
        want = ben_dict_get_by_str(message->args, "want");
@@ -1595,12 +1732,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                want6 = message->raddr.family == AF_INET6;
        }
 
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
        ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
 
 
@@ -1622,7 +1753,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
 
                        ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
                }
-               // @todo release query nodes
+               ks_dhtrt_release_querynodes(&query);
        }
        if (want6) {
                query.family = AF_INET6;
@@ -1639,9 +1770,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
 
                        ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
                }
+               ks_dhtrt_release_querynodes(&query);
        }
 
-       if ((ret = ks_dht_setup_response(dht,
+       if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
                                                                         &message->raddr,
                                                                         message->transactionid,
@@ -1649,7 +1781,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                                                                         &response,
                                                                         &r)) != KS_STATUS_SUCCESS) goto done;
 
-       ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
        if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
        if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
 
@@ -1660,9 +1791,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_dht_nodeid_t *id;
        struct bencode *n;
        //ks_bool_t n4 = KS_FALSE;
        //ks_bool_t n6 = KS_FALSE;
@@ -1672,50 +1802,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
        size_t nodes6_size = 0;
        size_t nodes_len = 0;
        size_t nodes6_len = 0;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
-       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_hash_t *searches = NULL;
        ks_dht_search_t *search = NULL;
+       ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(message);
-       ks_assert(message->transaction);
+       ks_assert(job);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
-       n = ben_dict_get_by_str(message->args, "nodes");
+       n = ben_dict_get_by_str(job->response->args, "nodes");
        if (n) {
                //n4 = KS_TRUE;
                nodes = (const uint8_t *)ben_str_val(n);
                nodes_size = ben_str_len(n);
        }
-       n = ben_dict_get_by_str(message->args, "nodes6");
+       n = ben_dict_get_by_str(job->response->args, "nodes6");
        if (n) {
                //n6 = KS_TRUE;
                nodes6 = (const uint8_t *)ben_str_val(n);
                nodes6_size = ben_str_len(n);
        }
 
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
-       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
-
-       searches = message->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
+       searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
 
        ks_hash_read_lock(searches);
-       search = ks_hash_search(searches, message->transaction->target.id, KS_UNLOCKED);
+       search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED);
        if (search) {
                ks_dht_search_pending_t *pending = NULL;
 
                ks_mutex_lock(search->mutex);
-               pending = ks_hash_search(search->pending, id->id, KS_UNLOCKED);
+               pending = ks_hash_search(search->pending, job->response->args_id.id, KS_UNLOCKED);
                if (pending) pending->finished = KS_TRUE;
        }
        ks_hash_read_unlock(searches);
@@ -1737,7 +1854,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
                ks_dhtrt_release_node(node);
 
-               if (search && message->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
+               if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
                        ks_dht_nodeid_t distance;
                        int32_t results_index = -1;
                        
@@ -1776,7 +1893,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                                        ks_dht_search_pending_destroy(&pending);
                                        goto done;
                                }
-                               if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
+                               if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
                        }
                }
        }
@@ -1798,7 +1915,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
                ks_dhtrt_release_node(node);
 
-               if (search && message->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
+               if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
                        ks_dht_nodeid_t distance;
                        int32_t results_index = -1;
                        
@@ -1837,7 +1954,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                                        ks_dht_search_pending_destroy(&pending);
                                        goto done;
                                }
-                               if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
+                               if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done;
                        }
                }
        }
@@ -1849,28 +1966,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
        return ret;
 }
 
+// @todo ks_dht_get
 
-KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
 {
        ks_dht_message_t *message = NULL;
        struct bencode *a = NULL;
 
        ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(targetid);
+       ks_assert(job);
 
-       if (ks_dht_setup_query(dht,
-                                                  ep,
-                                                  raddr,
+       if (ks_dht_query_setup(dht,
+                                                  job,
                                                   "get",
                                                   ks_dht_process_response_get,
                                                   NULL,
                                                   &message,
                                                   &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
        // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
-       ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message query get\n");
        ks_q_push(dht->send_q, (void *)message);
@@ -1880,7 +1995,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       ks_dht_nodeid_t *id;
        ks_dht_nodeid_t *target;
        struct bencode *seq;
        int64_t sequence = -1;
@@ -1889,8 +2003,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_dht_storageitem_t *item = NULL;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
+       ks_dhtrt_querynodes_t query;
+       uint8_t buffer4[1000];
+       uint8_t buffer6[1000];
+       ks_size_t buffer4_length = 0;
+       ks_size_t buffer6_length = 0;
        char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
@@ -1898,34 +2015,61 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_assert(message);
        ks_assert(message->args);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
        if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
 
        seq = ben_dict_get_by_str(message->args, "seq");
        if (seq) sequence = ben_int_val(seq);
 
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
        ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
 
        ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
 
-       item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED);
-       ks_hash_read_unlock(dht->storage_hash);
+       item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED);
+       ks_hash_read_unlock(dht->storageitems_hash);
 
        sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
-       // @todo if sequence is provided then requester has the data so if the local sequence is lower, maybe create job to update local data from the requester?
+       // @todo if sequence is provided then requester has the data, so if the local sequence is lower maybe send a get to the requester to update local data?
+
+       query.nodeid = *target;
+       query.type = KS_DHT_REMOTE;
+       query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
+       if (dht->rt_ipv4) {
+               query.family = AF_INET;
+
+               ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
+               for (int32_t i = 0; i < query.count; ++i) {
+                       ks_dht_node_t *qn = query.nodes[i];
+
+                       if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+                                                                                                          &qn->addr,
+                                                                                                          buffer4,
+                                                                                                          &buffer4_length,
+                                                                                                          sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
 
-       // @todo find closest ipv4 and ipv6 nodes to target
+                       ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+               }
+               ks_dhtrt_release_querynodes(&query);
+       }
+       if (dht->rt_ipv6) {
+               query.family = AF_INET6;
 
-       // @todo compact ipv4 and ipv6 nodes into separate buffers
+               ks_dhtrt_findclosest_nodes(dht->rt_ipv6, &query);
+               for (int32_t i = 0; i < query.count; ++i) {
+                       ks_dht_node_t *qn = query.nodes[i];
 
-       if ((ret = ks_dht_setup_response(dht,
+                       if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
+                                                                                                          &qn->addr,
+                                                                                                          buffer6,
+                                                                                                          &buffer6_length,
+                                                                                                          sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
+
+                       ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+               }
+               ks_dhtrt_release_querynodes(&query);
+       }
+
+
+       if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
                                                                         &message->raddr,
                                                                         message->transactionid,
@@ -1933,7 +2077,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                                                                         &response,
                                                                         &r)) != KS_STATUS_SUCCESS) goto done;
 
-       ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
        ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE));
        if (item) {
                if (item->mutable) {
@@ -1945,7 +2088,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
                }
                if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
        }
-       // @todo nodes, nodes6
+       if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
+       if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
 
        ks_log(KS_LOG_DEBUG, "Sending message response get\n");
        ks_q_push(dht->send_q, (void *)response);
@@ -1954,34 +2098,20 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_dht_nodeid_t *id;
        ks_dht_token_t *token;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
-       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(message);
+       ks_assert(job);
 
        // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
-       if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+       if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
 
        // @todo add extract function for mutable ks_dht_storageitem_key_t
        // @todo add extract function for mutable ks_dht_storageitem_signature_t
 
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
-       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
        // @todo add/touch bucket entries for other nodes/nodes6 returned
 
        ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
@@ -1990,34 +2120,22 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
        return ret;
 }
 
-
-// @todo ks_dht_send_put
+// @todo ks_dht_put
+// @todo ks_dht_query_put
 
 KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       ks_dht_nodeid_t *id;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
-       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
        ks_assert(message);
        ks_assert(message->args);
 
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
        ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
 
-       if ((ret = ks_dht_setup_response(dht,
+       if ((ret = ks_dht_response_setup(dht,
                                                                         message->endpoint,
                                                                         &message->raddr,
                                                                         message->transactionid,
@@ -2025,8 +2143,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
                                                                         &response,
                                                                         &r)) != KS_STATUS_SUCCESS) goto done;
 
-       //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-
        ks_log(KS_LOG_DEBUG, "Sending message response put\n");
        ks_q_push(dht->send_q, (void *)response);
 
@@ -2034,31 +2150,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        return ret;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_dht_nodeid_t *id;
-       ks_dhtrt_routetable_t *routetable = NULL;
-       ks_dht_node_t *node = NULL;
-       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
-       ks_assert(message);
-
-       if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
-
-       routetable = message->endpoint->node->table;
-
-       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
-       if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
-
-       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
-       if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
+       ks_assert(job);
 
        ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
- done:
      // done:
        return ret;
 }
 
index 51b4f84d9f55f3b89bcda3a3935b0659fbc7ef3f..1f320b997a3887e103177fc27ba1e1fc1bf5daa8 100644 (file)
@@ -42,6 +42,7 @@ KS_BEGIN_EXTERN_C
 
 typedef struct ks_dht_s ks_dht_t;
 typedef struct ks_dht_datagram_s ks_dht_datagram_t;
+typedef struct ks_dht_job_s ks_dht_job_t;
 typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
 typedef struct ks_dht_token_s ks_dht_token_t;
 typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t;
@@ -57,6 +58,7 @@ typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
 typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
 
 
+typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job);
 typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
 typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
 
@@ -90,6 +92,41 @@ struct ks_dht_node_s {
     ks_rwl_t        *reflock;          
 };
 
+enum ks_dht_job_state_t {
+       KS_DHT_JOB_STATE_QUERYING,
+       KS_DHT_JOB_STATE_RESPONDING,
+       KS_DHT_JOB_STATE_EXPIRING,
+       KS_DHT_JOB_STATE_PROCESSING,
+       KS_DHT_JOB_STATE_COMPLETING,
+};
+
+//enum ks_dht_job_type_t {
+//     KS_DHT_JOB_TYPE_NONE = 0,
+//     KS_DHT_JOB_TYPE_PING,
+//     KS_DHT_JOB_TYPE_FINDNODE,
+//};
+
+struct ks_dht_job_s {
+       ks_pool_t *pool;
+       ks_dht_t *dht;
+       ks_dht_job_t *next;
+
+       enum ks_dht_job_state_t state;
+
+       ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
+       int32_t attempts;
+
+       //enum ks_dht_job_type_t type;
+       ks_dht_job_callback_t query_callback;
+       ks_dht_job_callback_t finish_callback;
+
+       ks_dht_message_t *response;
+       //ks_dht_nodeid_t response_id;
+
+       // job specific query parameters
+       ks_dht_nodeid_t target;
+};
+
 struct ks_dhtrt_routetable_s {
     void*       internal;                       
     ks_pool_t*  pool;                           
@@ -127,6 +164,7 @@ struct ks_dht_message_s {
        ks_dht_transaction_t *transaction;
        char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
        struct bencode *args;
+       ks_dht_nodeid_t args_id;
 };
 
 struct ks_dht_endpoint_s {
@@ -141,10 +179,10 @@ struct ks_dht_endpoint_s {
 
 struct ks_dht_transaction_s {
        ks_pool_t *pool;
-       ks_sockaddr_t raddr;
+       ks_dht_job_t *job;
        uint32_t transactionid;
-       ks_dht_nodeid_t target;
-       ks_dht_message_callback_t callback;
+       ks_dht_nodeid_t target; // @todo look at moving this into job now
+       ks_dht_job_callback_t callback;
        ks_time_t expiration;
        ks_bool_t finished;
 };
@@ -209,6 +247,10 @@ struct ks_dht_s {
        uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
        ks_size_t recv_buffer_length;
 
+       ks_mutex_t *jobs_mutex;
+       ks_dht_job_t *jobs_first;
+       ks_dht_job_t *jobs_last;
+
        ks_mutex_t *tid_mutex;
        volatile uint32_t transactionid_next;
        ks_hash_t *transactions_hash;
@@ -222,7 +264,8 @@ struct ks_dht_s {
        volatile uint32_t token_secret_current;
        volatile uint32_t token_secret_previous;
        ks_time_t token_secret_expiration;
-       ks_hash_t *storage_hash;
+
+       ks_hash_t *storageitems_hash;
 };
 
 /**
@@ -307,6 +350,9 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
  */
 KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
 
+KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
+                                               
 /**
  * Create a network search of the closest nodes to a target.
  * @param dht pointer to the dht instance
@@ -327,13 +373,15 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
                                                                          ks_dht_search_callback_t callback,
                                                                          ks_dht_search_t **search);
 
+
+
 /**
  *
  */
 KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
                                                                                          ks_pool_t *pool,
                                                                                          ks_dht_endpoint_t *endpoint,
-                                                                                         ks_sockaddr_t *raddr,
+                                                                                         const ks_sockaddr_t *raddr,
                                                                                          ks_bool_t alloc_data);
 /**
  *
@@ -345,14 +393,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
  */
 KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
-                                                                                        uint32_t transactionid,
-                                                                                        const char *query,
-                                                                                        struct bencode **args);
-
 /**
  *
  */
@@ -361,14 +401,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
                                                                                                ks_size_t transactionid_length,
                                                                                                struct bencode **args);
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
-                                                                                        uint8_t *transactionid,
-                                                                                        ks_size_t transactionid_length,
-                                                                                        struct bencode **args);
-
 
 /**
  * route table methods
diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c
new file mode 100644 (file)
index 0000000..c99556a
--- /dev/null
@@ -0,0 +1,80 @@
+#include "ks_dht.h"
+#include "ks_dht-int.h"
+
+KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
+                                                                                 ks_pool_t *pool,
+                                                                                 const ks_sockaddr_t *raddr,
+                                                                                 int32_t attempts)
+{
+       ks_dht_job_t *j;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(job);
+       ks_assert(pool);
+       //ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(attempts > 0 && attempts <= 10);
+
+       *job = j = ks_pool_alloc(pool, sizeof(ks_dht_job_t));
+       ks_assert(j);
+
+       j->pool = pool;
+       j->state = KS_DHT_JOB_STATE_QUERYING;
+       j->raddr = *raddr;
+       j->attempts = attempts;
+
+       //ks_mutex_lock(dht->jobs_mutex);
+       //if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = j;
+       //else dht->jobs_first = dht->jobs_last = j;
+       //ks_mutex_unlock(dht->jobs_mutex);
+
+       // done:
+       if (ret != KS_STATUS_SUCCESS) {
+               if (j) ks_dht_job_destroy(job);
+       }
+       return ret;
+}
+
+KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback)
+{
+       ks_assert(job);
+
+       job->query_callback = query_callback;
+       job->finish_callback = finish_callback;
+}
+
+KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
+                                                                                  ks_dht_job_callback_t query_callback,
+                                                                                  ks_dht_job_callback_t finish_callback,
+                                                                                  ks_dht_nodeid_t *target)
+{
+       ks_assert(job);
+       ks_assert(target);
+
+       job->query_callback = query_callback;
+       job->finish_callback = finish_callback;
+       job->target = *target;
+}
+
+KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
+{
+       ks_dht_job_t *j;
+
+       ks_assert(job);
+       ks_assert(*job);
+
+       j = *job;
+
+       ks_pool_free(j->pool, job);
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 75780f46c53bb42e90533c4c9026e7413aecdf12..7997b41c0480ca5946ca30c79cf53f9eeff83061 100644 (file)
@@ -4,7 +4,7 @@
 KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
                                                                                          ks_pool_t *pool,
                                                                                          ks_dht_endpoint_t *endpoint,
-                                                                                         ks_sockaddr_t *raddr,
+                                                                                         const ks_sockaddr_t *raddr,
                                                                                          ks_bool_t alloc_data)
 {
        ks_dht_message_t *m;
@@ -110,33 +110,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
-                                                                                        uint32_t transactionid,
-                                                                                        const char *query,
-                                                                                        struct bencode **args)
-{
-       struct bencode *a;
-       uint32_t tid;
-
-       ks_assert(message);
-       ks_assert(query);
-
-       tid = htonl(transactionid);
-
-    ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t)));
-       ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1));
-       ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
-
-       // @note r joins message->data and will be freed with it
-       a = ben_dict();
-       ks_assert(a);
-       ben_dict_set(message->data, ben_blob("a", 1), a);
-
-       if (args) *args = a;
-
-       return KS_STATUS_SUCCESS;
-}
-
 KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
                                                                                                uint8_t *transactionid,
                                                                                                ks_size_t transactionid_length,
@@ -160,33 +133,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
        return KS_STATUS_SUCCESS;
 }
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
-                                                                                        uint8_t *transactionid,
-                                                                                        ks_size_t transactionid_length,
-                                                                                        struct bencode **args)
-{
-       struct bencode *e;
-
-       ks_assert(message);
-       ks_assert(transactionid);
-
-    ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
-       ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1));
-
-       // @note r joins message->data and will be freed with it
-       e = ben_list();
-       ks_assert(e);
-       ben_dict_set(message->data, ben_blob("e", 1), e);
-
-       if (args) *args = e;
-
-       return KS_STATUS_SUCCESS;
-}
-
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index 6d6b7baf3585661982bdbdd29494a0bf6ee73739..0f0458329cec1a3c52026932e930bac891a81177 100644 (file)
@@ -3,22 +3,22 @@
 
 KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction,
                                                                                                  ks_pool_t *pool,
-                                                                                                 ks_sockaddr_t *raddr,
+                                                                                                 ks_dht_job_t *job,
                                                                                                  uint32_t transactionid,
-                                                                                                 ks_dht_message_callback_t callback)
+                                                                                                 ks_dht_job_callback_t callback)
 {
        ks_dht_transaction_t *t;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(transaction);
        ks_assert(pool);
-       ks_assert(raddr);
+       ks_assert(job);
 
        *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t));
        ks_assert(t);
 
        t->pool = pool;
-       t->raddr = *raddr;
+       t->job = job;
        t->transactionid = transactionid;
        t->callback = callback;
        t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000);
index 36dc09e43106341947c20cae6e0131a89f1a4b76..d160a0cc45bb13d49f9a58e3de8039fcd482e4c7 100644 (file)
@@ -10,7 +10,7 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
 {
        diag("dht_z_callback\n");
        ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
-       ks_dht_send_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
+       ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
        return KS_STATUS_SUCCESS;
 }
 
@@ -135,18 +135,21 @@ int main() {
   
   diag("Ping test\n");
   
-  ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
+  //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
+  ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
 
-  ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1
+  ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
   
   ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
 
   ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
 
-  ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1
+  ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING)
 
   ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
 
+  ks_dht_pulse(dht2, 100); // (COMPLETING)
+
   diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
   for (int i = 0; i < 10; ++i) {
          //diag("DHT 1\n");
@@ -160,7 +163,8 @@ int main() {
 
   diag("Find_Node test\n");
 
-  ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
+  //ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
+  ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
 
   ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1