]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Tweaks, bug fixes, etc. Committing in preparation for introducing into libblade.
authorShane Bryldt <astaelan@gmail.com>
Thu, 5 Jan 2017 16:19:55 +0000 (16:19 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_message.c
libs/libks/test/testdht2.c

index 2e8ce749e37d3dc0672459d2f2b755149fd9b97e..6d06d4682bff9681344d41fbba8cbba7197ae30b 100644 (file)
@@ -21,6 +21,14 @@ KS_BEGIN_EXTERN_C
  */
 KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
 
+/**
+ * Called internally to receive datagrams from endpoints.
+ * Handles datagrams by dispatching each through a threadpool.
+ * @param dht pointer to the dht instance
+ * @param timeout time in ms to wait for an incoming datagram on any endpoint
+ */
+KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout);
+
 /**
  * Called internally to expire or reannounce storage item data.
  * Handles reannouncing and purging of expiring storage items.
@@ -334,14 +342,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
  */
 KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
 
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
-                                                                                               uint8_t *transactionid,
-                                                                                               ks_size_t transactionid_length,
-                                                                                               struct bencode **args);
-
                                                                                                                                                                                                
 /**
  *
index 7007ddb8eb14e0e818bda761140e3b074bc65926..4cffb7cef73d7941a9a038d3d0b95830e8ffe28a 100644 (file)
@@ -229,103 +229,73 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
 
        d = *dht;
 
+       // @todo ks_dht_shutdown to stop further incoming data and pulse to finish processing and flushing data
+       
        /**
-        * Cleanup the storageitems hash and it's contents if it is allocated.
-        */
-       if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
-       d->storageitems_pulse = 0;
-
-       /**
-        * Zero out the opaque write token variables.
+        * Cleanup the type, query, and error registries if they have been allocated.
+        * No dependancies during destruction, entries are function pointers with no cleanup required.
         */
-       d->token_secret_current = 0;
-       d->token_secret_previous = 0;
-       d->token_secret_expiration = 0;
-       d->tokens_pulse = 0;
-
+       if (d->registry_type) ks_hash_destroy(&d->registry_type);
+       if (d->registry_query) ks_hash_destroy(&d->registry_query);
+       if (d->registry_error) ks_hash_destroy(&d->registry_error);
+       
        /**
-        * Cleanup the route tables if they are allocated.
+        * Cleanup the endpoint management and entries if they have been allocated.
+        * No dependancies during destruction, entries are destroyed through hash destructor.
+        * Sockets are closed during entry destruction, and route table references to local nodes are released.
         */
-       if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
-       if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
+       if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
+       if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
+       if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
 
        /**
-        * Cleanup the transactions mutex and hash if they are allocated.
+        * Cleanup the transaction management and entries if they have been allocated.
+        * No dependancies during destruction, entries are destroyed through hash destructor.
         */
-       d->transactionid_next = 0;
        if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
        if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
-       d->transactions_pulse = 0;
-
-       /**
-        * Cleanup the jobs mutex and jobs if they are allocated.
-        */
-       for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
-               jobn = job->next;
-               ks_dht_job_destroy(&job);
-       }
-       if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
-       
-       /**
-        * Probably don't need this, recv_buffer_length is temporary and may change
-        */
-       d->recv_buffer_length = 0;
 
        /**
-        * Cleanup the send queue and it's contents if it is allocated.
+        * Cleanup the message send queue and entries if they have been allocated.
+        * No dependancies during destruction, entries must be destroyed here.
         */
        if (d->send_q) {
                ks_dht_message_t *msg;
                while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg);
                ks_q_destroy(&d->send_q);
        }
-       
-       /**
-        * Cleanup the cached popped message if it is set.
-        */
        if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent);
 
        /**
-        * Probably don't need this
-        */
-       d->endpoints_length = 0;
-       d->endpoints_size = 0;
-
-       /**
-        * Cleanup the array of endpoint pointers if it is allocated.
+        * Cleanup the jobs management and entries if they have been allocated.
+        * Route table node and storage item references are released, entries must be destroyed here.
         */
-       if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
-
-       /**
-        * Cleanup the array of endpoint polling data if it is allocated.
-        */
-       if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
-
+       for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
+               jobn = job->next;
+               ks_dht_job_destroy(&job);
+       }
+       if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
+       
        /**
-        * Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated.
+        * Cleanup the storageitems hash and it's contents if it is allocated.
+        * No dependancies during destruction, entries are destroyed through hash destructor.
         */
-       if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
+       if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
 
        /**
-        * Cleanup the type, query, and error registries if they have been allocated.
+        * Cleanup the route tables if they are allocated.
+        * No nodes should be referenced anymore by this point.
         */
-       if (d->registry_type) ks_hash_destroy(&d->registry_type);
-       if (d->registry_query) ks_hash_destroy(&d->registry_query);
-       if (d->registry_error) ks_hash_destroy(&d->registry_error);
+       if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
+       if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
 
-       /**
-        * Probably don't need this
-        */
-       d->autoroute = KS_FALSE;
-       d->autoroute_port = 0;
 
        /**
         * If the thread pool was allocated internally, destroy it.
-        * If this fails, something catastrophically bad happened like memory corruption.
         */
        if (d->tpool_alloc) ks_thread_pool_destroy(&d->tpool);
-       d->tpool_alloc = KS_FALSE;
 
+       
        /**
         * Temporarily store the allocator level variables because freeing the dht instance will invalidate it.
         */
@@ -335,16 +305,11 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
        /**
         * Free the dht instance from the pool, after this the dht instance memory is invalid.
         */
-       ks_pool_free(d->pool, &d);
-
-       /**
-        * At this point dht instance is invalidated so NULL the pointer.
-        */
-       *dht = d = NULL;
+       ks_pool_free(d->pool, dht);
+       d = NULL;
 
        /**
         * If the pool was allocated internally, destroy it using the temporary variables stored earlier.
-        * If this fails, something catastrophically bad happened like memory corruption.
         */
        if (pool_alloc) ks_pool_close(&pool);
 }
@@ -587,20 +552,39 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
 
 KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
 {
-       ks_dht_datagram_t *datagram = NULL;
-       ks_sockaddr_t raddr;
-
        ks_assert(dht);
        ks_assert(timeout >= 0 && timeout <= 1000);
        // this should be called with a timeout of less than 1000ms, preferrably around 100ms
 
+       ks_dht_pulse_endpoints(dht, timeout);
+
+       if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
+       if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
+
+       ks_dht_pulse_storageitems(dht);
+
+       ks_dht_pulse_jobs(dht);
+
+       ks_dht_pulse_send(dht);
+
+       ks_dht_pulse_transactions(dht);
+
+       ks_dht_pulse_tokens(dht);
+}
+
+KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout)
+{
+       ks_assert(dht);
+       ks_assert(timeout >= 0 && timeout <= 1000);
+
        if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
 
        if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) {
                for (int32_t i = 0; i < dht->endpoints_length; ++i) {
+                       ks_dht_datagram_t *datagram = NULL;
+                       ks_sockaddr_t raddr = (const ks_sockaddr_t){ 0 };
                        if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
 
-                       raddr = (const ks_sockaddr_t){ 0 };
                        dht->recv_buffer_length = sizeof(dht->recv_buffer);
                        raddr.family = dht->endpoints[i]->addr.family;
                        if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue;
@@ -616,19 +600,6 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
                        if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
                }
        }
-
-       if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
-       if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
-
-       ks_dht_pulse_storageitems(dht);
-
-       ks_dht_pulse_jobs(dht);
-
-       ks_dht_pulse_send(dht);
-
-       ks_dht_pulse_transactions(dht);
-
-       ks_dht_pulse_tokens(dht);
 }
 
 KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht)
@@ -1702,6 +1673,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
        ks_size_t nodes_count = 0;
        ks_dht_nodeid_t distance;
        int32_t results_index = -1;
+       ks_bool_t finished = KS_FALSE;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
@@ -1713,7 +1685,10 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
        ks_mutex_lock(search->mutex);
        search->searching--;
 
-       if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done;
+       if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
+               finished = KS_TRUE;
+               goto done;
+       }
 
        ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target);
        if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
@@ -1781,11 +1756,12 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
                        ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
                }
        }
+       finished = search->searching == 0;
 
  done:
        ks_mutex_unlock(search->mutex);
        
-       if (search->searching == 0) {
+       if (finished) {
                if (search->callback) search->callback(dht, job);
                ks_dht_search_destroy(&search);
        }
@@ -1795,9 +1771,9 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
 
 KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
 {
-       ks_bool_t locked_search = KS_FALSE;
        ks_dht_search_t *search = NULL;
     ks_dhtrt_querynodes_t query;
+       ks_bool_t finished = KS_FALSE;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
@@ -1807,7 +1783,6 @@ KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
        search = (ks_dht_search_t *)job->data;
 
        ks_mutex_lock(search->mutex);
-       locked_search = KS_TRUE;
 
        // find closest good nodes to target locally and store as the closest results
        query.nodeid = search->target;
@@ -1828,11 +1803,12 @@ KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
                ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
        }
        ks_dhtrt_release_querynodes(&query);
+       finished = search->searching == 0;
 
        // done:
-       if (locked_search) ks_mutex_unlock(search->mutex);
+       ks_mutex_unlock(search->mutex);
 
-       if (search->searching == 0) {
+       if (finished) {
                if (search->callback) search->callback(dht, job);
                ks_dht_search_destroy(&search);
        }
@@ -1846,6 +1822,7 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
                                                           ks_dhtrt_routetable_t *table,
                                                           ks_dht_nodeid_t *target)
 {
+       char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_dht_search_t *search = NULL;
        ks_dht_job_t *job = NULL;
 
@@ -1853,6 +1830,8 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
        ks_assert(table);
        ks_assert(target);
 
+       ks_log(KS_LOG_INFO, "[%s] Searching\n", ks_dht_hex(target->id, target_buf, KS_DHT_NODEID_SIZE));
+
        ks_dht_search_create(&search, dht->pool, table, target, callback, data);
        ks_assert(search);
        
@@ -1866,6 +1845,7 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
 KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job)
 {
        ks_dht_publish_t *publish = NULL;
+       ks_bool_t finished = KS_FALSE;
        ks_status_t ret = KS_STATUS_SUCCESS;
 
        ks_assert(dht);
@@ -1874,22 +1854,20 @@ KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t
 
        publish = (ks_dht_publish_t *)job->data;
 
-       // @todo callbacks need job to contain cascaded publish->data before calling
        if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
-               job->data = publish->data;
-               if (publish->callback) publish->callback(dht, job);
+               finished = KS_TRUE;
                goto done;
        }
 
        if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) {
                ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item);
-       } else if (publish->callback) {
-               job->data = publish->data;
-               publish->callback(dht, job);
-       }
+       } else finished = KS_TRUE;
 
  done:
-       
+       if (finished) {
+               job->data = publish->data;
+               if (publish->callback) publish->callback(dht, job);
+       }
        ks_dht_publish_destroy(&publish);
        return ret;
 }
@@ -1901,6 +1879,7 @@ KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
                                                                int64_t cas,
                                                                ks_dht_storageitem_t *item)
 {
+       char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_dht_publish_t *publish = NULL;
        const uint8_t *salt = NULL;
        size_t salt_length = 0;
@@ -1910,6 +1889,8 @@ KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
        ks_assert(cas >= 0);
        ks_assert(item);
 
+       ks_log(KS_LOG_INFO, "[%s] Publishing to %s %d\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE), raddr->host, raddr->port);
+       
        if (item->salt) {
                salt = (const uint8_t *)ben_str_val(item->salt);
                salt_length = ben_str_len(item->salt);
@@ -1977,8 +1958,6 @@ KS_DECLARE(ks_status_t) ks_dht_distribute_search_callback(ks_dht_t *dht, ks_dht_
                ks_dht_distribute_destroy(&distribute);
        }
 
-       ks_dht_search_destroy(&search);
-
        return ret;
 }
 
@@ -1989,12 +1968,15 @@ KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
                                                                   int64_t cas,
                                                                   ks_dht_storageitem_t *item)
 {
+       char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
        ks_dht_distribute_t *distribute = NULL;
 
        ks_assert(dht);
        ks_assert(table);
        ks_assert(cas >= 0);
        ks_assert(item);
+       
+       ks_log(KS_LOG_INFO, "[%s] Distributing\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE));
 
        ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item);
        ks_assert(distribute);
@@ -2201,7 +2183,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
                                                                  &message,
                                                                  NULL)) != KS_STATUS_SUCCESS) goto done;
 
-       //ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Ping query to %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+       
        ks_q_push(dht->send_q, (void *)message);
 
  done:
@@ -2217,6 +2205,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        ks_assert(message);
        ks_assert(message->args);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Ping query from %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+
        //ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
 
        if ((ret = ks_dht_response_setup(dht,
@@ -2241,6 +2236,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t
        ks_assert(dht);
        ks_assert(job);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Ping response from %s %d\n",
+                  job->response->endpoint->addr.host,
+                  job->response->endpoint->addr.port,
+                  job->response->raddr.host,
+                  job->response->raddr.port);
+
        //ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
 
        // done:
@@ -2297,7 +2299,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
                ben_dict_set(a, ben_blob("want", 4), want);
        }
 
-       //ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Findnode query to %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+
        ks_q_push(dht->send_q, (void *)message);
 
  done:
@@ -2324,6 +2332,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        ks_assert(message);
        ks_assert(message->args);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Findnode query from %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+
        if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
                ks_dht_error(dht,
                                         message->endpoint,
@@ -2449,6 +2464,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
        ks_assert(dht);
        ks_assert(job);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Findnode response from %s %d\n",
+                  job->response->endpoint->addr.host,
+                  job->response->endpoint->addr.port,
+                  job->response->raddr.host,
+                  job->response->raddr.port);
+
        n = ben_dict_get_by_str(job->response->args, "nodes");
        if (n && dht->rt_ipv4) {
                //n4 = KS_TRUE;
@@ -2551,7 +2573,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
        if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
        ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
 
-       //ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Get query to %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+
        if (item) {
                ks_dht_storageitem_dereference(item);
                ks_mutex_unlock(item->mutex);
@@ -2583,6 +2611,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        ks_assert(message);
        ks_assert(message->args);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Get query from %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+
        if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
                ks_dht_error(dht,
                                         message->endpoint,
@@ -2609,10 +2644,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        }
        ks_hash_read_unlock(dht->storageitems_hash);
 
-       // If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back
        sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
-       // @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower
-       // maybe send a get query to the requester to update the local data
 
        query.nodeid = *target;
        query.type = KS_DHT_REMOTE;
@@ -2734,6 +2766,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
        ks_assert(dht);
        ks_assert(job);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Get response from %s %d\n",
+                  job->response->endpoint->addr.host,
+                  job->response->endpoint->addr.port,
+                  job->response->raddr.host,
+                  job->response->raddr.port);
+
        if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
        job->response_token = *token;
 
@@ -2928,7 +2967,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
        ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE));
        ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v));
 
-       //ks_log(KS_LOG_DEBUG, "Sending message query put\n");
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Put query to %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
+
        ks_q_push(dht->send_q, (void *)message);
 
        return KS_STATUS_SUCCESS;
@@ -2958,6 +3003,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        ks_assert(message);
        ks_assert(message->args);
 
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Put query from %s %d\n",
+                  message->endpoint->addr.host,
+                  message->endpoint->addr.port,
+                  message->raddr.host,
+                  message->raddr.port);
 
        if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) {
                ks_dht_error(dht,
@@ -3186,7 +3237,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
        ks_assert(dht);
        ks_assert(job);
 
-       ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
+       ks_log(KS_LOG_INFO,
+                  "[%s %d] Put response from %s %d\n",
+                  job->response->endpoint->addr.host,
+                  job->response->endpoint->addr.port,
+                  job->response->raddr.host,
+                  job->response->raddr.port);
+
+       //ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
        // done:
        return ret;
index 8d183a4c4a789ad16c55bfcaba57016cd8cb464d..3652c47a39f0ff09e77a923467f62a584ecc6704 100644 (file)
@@ -150,7 +150,6 @@ struct ks_dht_job_s {
        int64_t query_cas;
        ks_dht_token_t query_token;
        ks_dht_storageitem_t *query_storageitem;
-    int32_t query_family;
 
        // error response parameters
        int64_t error_code;
index a88240f652318cc6f5edfab9f2e5b24464b08598..5cf29b7677b43787e3820992af481590fa2162b9 100644 (file)
@@ -110,29 +110,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
        return KS_STATUS_SUCCESS;
 }
 
-KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
-                                                                                               uint8_t *transactionid,
-                                                                                               ks_size_t transactionid_length,
-                                                                                               struct bencode **args)
-{
-       struct bencode *r;
-
-       ks_assert(message);
-       ks_assert(transactionid);
-
-    ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
-       ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1));
-
-       // @note r joins message->data and will be freed with it
-       r = ben_dict();
-       ks_assert(r);
-       ben_dict_set(message->data, ben_blob("r", 1), r);
-
-       if (args) *args = r;
-
-       return KS_STATUS_SUCCESS;
-}
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index fd4ab7964c29454c4531a28834b1e483677344f6..59eee76cf4016a089692e25565683d7607da7794 100644 (file)
@@ -93,7 +93,7 @@ int main() {
   err = ks_init();
   ok(!err);
 
-  ks_global_set_default_logger(7);
+  ks_global_set_default_logger(KS_LOG_LEVEL_INFO);
 
   err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL);
   ok(err == KS_STATUS_SUCCESS);
@@ -172,7 +172,7 @@ int main() {
   ks_dht_ping(dht2, &raddr1, NULL, NULL); // (QUERYING)
 
   ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
-  
+
   ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
 
   ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet