]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Started mocking out structures for ks_dht_search, but merged route table...
authorShane Bryldt <astaelan@gmail.com>
Mon, 12 Dec 2016 20:33:48 +0000 (20:33 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:35 +0000 (14:59 -0600)
libs/libks/Makefile.am
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_search.c [new file with mode: 0644]
libs/libks/test/testdht2.c

index 94b01d454fb87212bc821e42e113d706dbe3acbc..d1931c19bdd37be742bdcce471331282cb05ece7 100644 (file)
@@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k
 libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
 libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
 libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
-libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
+libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_search.c
 libks_la_SOURCES += src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
 libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c 
 #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
index fbb434f114e7b9ec98a4b6a6bb6dd925389d01b0..ae7ae23870b2c875c95bad371e9e54c5dfb3f1ac 100644 (file)
@@ -248,6 +248,27 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint,
                                                                                         ks_socket_t sock);
 KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool);
+KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search);
+
+KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search,
+                                                                                  const ks_dht_nodeid_t *target,
+                                                                                  ks_dht_search_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search);
+
+KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback);
+
+KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool);
+KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending);
+
+KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration);
+KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending);
+
 /**
  *
  */
index 2fcbe5586688892c7483b28826643bbe9d3c09a6..870c9daec244f824807289a26809fec689ca864a 100644 (file)
@@ -786,7 +786,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
 
        if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM;
 
-       memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
+       memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE);
        *buffer_length += KS_DHT_NODEID_SIZE;
 
        return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address);
@@ -1132,8 +1132,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                           transaction->raddr.host,
                           transaction->raddr.port);
        } else {
-               transaction->finished = KS_TRUE;
                ret = transaction->callback(dht, message);
+               transaction->finished = KS_TRUE;
        }
 
        return ret;
@@ -1315,6 +1315,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
        struct bencode *r = NULL;
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
@@ -1324,10 +1325,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
 
        routetable = message->endpoint->node->table;
 
-       // @todo touch here, or only create if not exists?
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
 
@@ -1354,6 +1353,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
        ks_dht_nodeid_t *id;
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
@@ -1362,9 +1362,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
 
@@ -1435,9 +1437,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
 
@@ -1536,9 +1537,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        while (nodes_len < nodes_size) {
                ks_dht_nodeid_t nid;
@@ -1553,9 +1556,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                           addr.host,
                           addr.port);
 
-               if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) {
-                       ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
-               }
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
        }
 
        while (nodes6_len < nodes6_size) {
@@ -1571,9 +1573,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
                           addr.host,
                           addr.port);
 
-               if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) {
-                       ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
-               }
+               ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+               ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
        }
        // @todo repeat above for ipv6 table
 
@@ -1617,6 +1618,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
        struct bencode *r = NULL;
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
@@ -1631,9 +1633,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
 
@@ -1685,6 +1686,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
        ks_dht_token_t *token;
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
@@ -1699,9 +1701,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
        // @todo add/touch bucket entries for other nodes/nodes6 returned
 
        ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
@@ -1719,6 +1723,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
        struct bencode *r = NULL;
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
@@ -1728,9 +1733,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
 
@@ -1757,6 +1761,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
        ks_dht_nodeid_t *id;
        ks_dhtrt_routetable_t *routetable = NULL;
        ks_dht_node_t *node = NULL;
+       char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
 
        ks_assert(dht);
        ks_assert(message);
@@ -1765,9 +1770,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
 
        routetable = message->endpoint->node->table;
 
-       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
-               ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node);
-       }
+       ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+       ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+       if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
 
        ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
 
index f151938237467231e527633f5bebc4bbce089431..a27e63dd6e5a2e964853e4ed70184ff98bf15870 100644 (file)
@@ -21,6 +21,7 @@ KS_BEGIN_EXTERN_C
 
 #define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
 #define KS_DHT_SEARCH_EXPIRATION 10
+#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE
 
 #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
 #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
@@ -39,6 +40,8 @@ typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t;
 typedef struct ks_dht_message_s ks_dht_message_t;
 typedef struct ks_dht_endpoint_s ks_dht_endpoint_t;
 typedef struct ks_dht_transaction_s ks_dht_transaction_t;
+typedef struct ks_dht_search_s ks_dht_search_t;
+typedef struct ks_dht_search_pending_s ks_dht_search_pending_t;
 typedef struct ks_dht_node_s ks_dht_node_t;
 typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t;
 typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
@@ -46,6 +49,7 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
 
 
 typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
+typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
 
 /**
  * Note: This must remain a structure for casting from raw data
@@ -122,10 +126,39 @@ struct ks_dht_transaction_s {
        ks_bool_t finished;
 };
 
+// Check if search already exists for the target id, if so add another callback, must be a popular target id
+// Otherwise create new search, set target id, add callback, and insert the search into the dht search_hash with target id key
+// Get closest local nodes to target id, check against results, send_findnode for closer nodes and add to pending hash with queried node id
+// Upon receiving find_node response, check target id against dht search_hash, check responding node id against pending hash, set finished for purging
+// Update results if responding node id is closer than any current result, or the results are not full
+// Check response nodes against results, send_findnode for closer nodes and add to pending hash with an expiration
+// Pulse expirations purges expired and finished from pending hash, once hash is empty callbacks are called providing results array
+// Note:
+// During the lifetime of a search, the ks_dht_node_t's must be kept alive
+// Do a query touch on nodes prior to being added to pending, this should reset timeout and keep the nodes alive long enough even if they are dubious
+// Nodes which land in results are known good with recent response to find_nodes and should be around for a while before route table worries about cleanup
+struct ks_dht_search_s {
+       ks_pool_t *pool;
+       ks_mutex_t *mutex;
+       ks_dht_nodeid_t target;
+       ks_dht_search_callback_t *callbacks;
+       ks_size_t callbacks_size;
+       ks_hash_t *pending;
+       ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE];
+       ks_size_t results_length;
+};
+
+struct ks_dht_search_pending_s {
+       ks_pool_t *pool;
+       ks_dht_node_t *node;
+       ks_time_t expiration;
+       ks_bool_t finished;
+};
+
 struct ks_dht_storageitem_s {
        ks_pool_t *pool;
        ks_dht_nodeid_t id;
-
+       // @todo ks_time_t expiration;
        struct bencode *v;
        
        ks_bool_t mutable;
@@ -169,6 +202,8 @@ struct ks_dht_s {
        ks_dhtrt_routetable_t *rt_ipv4;
        ks_dhtrt_routetable_t *rt_ipv6;
 
+       ks_hash_t *search_hash;
+
        volatile uint32_t token_secret_current;
        volatile uint32_t token_secret_previous;
        ks_time_t token_secret_expiration;
@@ -275,8 +310,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
  * Bind a local address and port for receiving UDP datagrams.
  * @param dht pointer to the dht instance
  * @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly
- * @param addr pointer to the remote address information
- * @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint
+ * @param addr pointer to the local address information
+ * @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output
  * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ...
  * @see ks_socket_option
  * @see ks_addr_bind
diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c
new file mode 100644 (file)
index 0000000..4dfd3b2
--- /dev/null
@@ -0,0 +1,195 @@
+#include "ks_dht.h"
+#include "ks_dht-int.h"
+#include "sodium.h"
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool)
+{
+       ks_dht_search_t *s;
+
+       ks_assert(search);
+       ks_assert(pool);
+       
+       *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t));
+       s->pool = pool;
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool)
+{
+       ks_assert(search);
+       ks_assert(pool);
+
+       memset(search, 0, sizeof(ks_dht_search_t));
+       
+       search->pool = pool;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       
+       ks_assert(search);
+       ks_assert(*search);
+
+       if ((ret = ks_dht_search_deinit(*search)) != KS_STATUS_SUCCESS) return ret;
+       if ((ret = ks_pool_free((*search)->pool, *search)) != KS_STATUS_SUCCESS) return ret;
+
+       *search = NULL;
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(search);
+       ks_assert(search->pool);
+       ks_assert(target);
+
+       if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret;
+       memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE);
+
+       if (callback) ks_dht_search_callback_add(search, callback);
+
+       if ((ret = ks_hash_create(&search->pending,
+                                                         KS_HASH_MODE_ARBITRARY,
+                                                         KS_HASH_FLAG_RWLOCK,
+                                                         search->pool)) != KS_STATUS_SUCCESS) return ret;
+       ks_hash_set_keysize(search->pending, KS_DHT_NODEID_SIZE);
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search)
+{
+       ks_hash_iterator_t *it;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       
+       ks_assert(search);
+
+       search->results_length = 0;
+       if (search->pending) {
+               for (it = ks_hash_first(search->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+                       const void *key;
+                       ks_dht_search_pending_t *val;
+                       
+                       ks_hash_this(it, &key, NULL, (void **)&val);
+                       if ((ret = ks_dht_search_pending_deinit(val)) != KS_STATUS_SUCCESS) return ret;
+                       if ((ret = ks_dht_search_pending_free(&val)) != KS_STATUS_SUCCESS) return ret;
+               }
+               ks_hash_destroy(&search->pending);
+       }
+       search->callbacks_size = 0;
+       if (search->callbacks) {
+               if ((ret = ks_pool_free(search->pool, search->callbacks)) != KS_STATUS_SUCCESS) return ret;
+               search->callbacks = NULL;
+       }
+       if (search->mutex && (ret = ks_mutex_destroy(&search->mutex)) != KS_STATUS_SUCCESS) return ret;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback)
+{
+       ks_assert(search);
+
+       if (callback) {
+               int32_t index = search->callbacks_size++;
+               search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
+                                                                                                                                          (void *)search->callbacks,
+                                                                                                                                          sizeof(ks_dht_search_callback_t) * search->callbacks_size);
+               search->callbacks[index] = callback;
+       }
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool)
+{
+       ks_dht_search_pending_t *p;
+
+       ks_assert(pending);
+       ks_assert(pool);
+       
+       *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t));
+       p->pool = pool;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool)
+{
+       ks_assert(pending);
+       ks_assert(pool);
+
+       memset(pending, 0, sizeof(ks_dht_search_pending_t));
+       
+       pending->pool = pool;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       
+       ks_assert(pending);
+       ks_assert(*pending);
+
+       if ((ret = ks_dht_search_pending_deinit(*pending)) != KS_STATUS_SUCCESS) return ret;
+       if ((ret = ks_pool_free((*pending)->pool, *pending)) != KS_STATUS_SUCCESS) return ret;
+
+       *pending = NULL;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration)
+{
+       ks_assert(pending);
+       ks_assert(pending->pool);
+       ks_assert(node);
+
+       pending->node = node;
+       pending->expiration = expiration;
+       pending->finished = KS_FALSE;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending)
+{
+       ks_assert(pending);
+
+       pending->node = NULL;
+       pending->expiration = 0;
+       pending->finished = KS_FALSE;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index d77eee436c94b65429fde25d05637705fccb01bf..82a00e6364803c1b40c8c13b3ae54e4d00a64776 100644 (file)
@@ -143,15 +143,28 @@ int main() {
   
   diag("Ping test\n");
   
-  ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue ping from dht2 to dht1
+  ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
 
   ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1
   
   ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
 
+  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
   ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1
 
-  // Test blind find_node from dht3 to dht1 to find dht2 nodeid
+  ok(ks_dhtrt_find_node(dht2.rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
+
+  diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up
+  for (int i = 0; i < 10; ++i) {
+         diag("DHT 1\n");
+         ks_dht_pulse(dht1, 100);
+         diag("DHT 2\n");
+         ks_dht_pulse(&dht2, 100);
+  }
+  ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+  
+  // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
 
   diag("Find_Node test\n");
 
@@ -163,8 +176,17 @@ int main() {
 
   ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
 
-  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL);
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
   
+  diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up
+  for (int i = 0; i < 10; ++i) {
+         diag("DHT 1\n");
+         ks_dht_pulse(dht1, 100);
+         diag("DHT 2\n");
+         ks_dht_pulse(&dht2, 100);
+  }
+  ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+
   diag("Cleanup\n");
   /* Cleanup and shutdown */