]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Initial work towards sending messages, refactored into less headers, test...
authorShane Bryldt <astaelan@gmail.com>
Fri, 2 Dec 2016 19:57:45 +0000 (19:57 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:33 +0000 (14:59 -0600)
13 files changed:
libs/libks/Makefile.am
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_endpoint-int.h [deleted file]
libs/libks/src/dht/ks_dht_endpoint.c
libs/libks/src/dht/ks_dht_endpoint.h [deleted file]
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_message.h [deleted file]
libs/libks/src/dht/ks_dht_nodeid.c
libs/libks/src/dht/ks_dht_nodeid.h [deleted file]
libs/libks/src/dht/ks_dht_transaction.c [new file with mode: 0644]
libs/libks/test/testdht2.c

index e81a564a06325937c0c9562e3eafeec88fdf34fd..341925e6d46238b7587453381b49c6f0c2b2964c 100644 (file)
@@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k
 libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
 libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
 libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
-libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c
+libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
 libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c 
 #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
 
@@ -29,8 +29,7 @@ library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include
 library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h
 library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h
 library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h
-library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h src/dht/ks_dht_endpoint.h src/dht/ks_dht_endpoint-int.h
-library_include_HEADERS += src/dht/ks_dht_nodeid.h src/dht/ks_dht_message.h
+library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h
 
 tests: libks.la
        $(MAKE) -C test tests
index 9f10f5599e461d95cb911bd2647a91e450ec47cc..81ed2f558d3544f7a3af95563f7d5b85af8c0f62 100644 (file)
@@ -5,12 +5,35 @@
 
 KS_BEGIN_EXTERN_C
 
-
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
 KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
 
 KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+
 KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+
+KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr);
+KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht,
+                                                                                                  ks_sockaddr_t *raddr,
+                                                                                                  uint8_t *transactionid,
+                                                                                                  ks_size_t transactionid_length);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_endpoint_alloc(ks_dht2_endpoint_t **endpoint, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_endpoint_prealloc(ks_dht2_endpoint_t *endpoint, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_endpoint_free(ks_dht2_endpoint_t *endpoint);
+
+KS_DECLARE(ks_status_t) ks_dht2_endpoint_init(ks_dht2_endpoint_t *endpoint, const ks_sockaddr_t *addr, ks_socket_t sock);
+KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint);
+
 
 KS_END_EXTERN_C
 
index e0288b61f45a8d7c4fdb8baaab3f7d309be193e6..18f0d5e9961f6e1c0abc8a120592a0e884d774ec 100644 (file)
@@ -1,6 +1,5 @@
 #include "ks_dht.h"
 #include "ks_dht-int.h"
-#include "ks_dht_endpoint-int.h"
 #include "sodium.h"
 
 /**
@@ -62,6 +61,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
        ks_assert(dht);
        ks_assert(dht->pool);
 
+       dht->autoroute = KS_FALSE;
+       dht->autoroute_port = 0;
+       
        if (ks_dht2_nodeid_prealloc(&dht->nodeid, dht->pool) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
@@ -72,6 +74,7 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
 
        ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
        ks_dht2_register_type(dht, "q", ks_dht2_process_query);
+       ks_dht2_register_type(dht, "r", ks_dht2_process_response);
        // @todo ks_hash_insert the r/e callbacks into type registry
 
        ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
@@ -86,6 +89,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
        dht->endpoints_poll = NULL;
        
        dht->recv_buffer_length = 0;
+
+       dht->transactionid_next = rand() % 0xFFFF;
+       ks_hash_create(&dht->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, dht->pool);
        
        return KS_STATUS_SUCCESS;
 }
@@ -97,6 +103,11 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
 {
        ks_assert(dht);
 
+       dht->transactionid_next = 0;
+       if (dht->transactions_hash) {
+               ks_hash_destroy(&dht->transactions_hash);
+               dht->transactions_hash = NULL;
+       }
        dht->recv_buffer_length = 0;
        for (int32_t i = 0; i < dht->endpoints_size; ++i) {
                ks_dht2_endpoint_t *ep = dht->endpoints[i];
@@ -130,6 +141,9 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
        }
 
        ks_dht2_nodeid_deinit(&dht->nodeid);
+
+       dht->autoroute = KS_FALSE;
+       dht->autoroute_port = 0;
        
        return KS_STATUS_SUCCESS;
 }
@@ -137,7 +151,26 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
+KS_DECLARE(ks_status_t) ks_dht2_autoroute(ks_dht2_t *dht, ks_bool_t autoroute, ks_port_t port)
+{
+       ks_assert(dht);
+
+       if (!autoroute) {
+               port = 0;
+       } else if (port == 0) {
+               return KS_STATUS_FAIL;
+       }
+       
+       dht->autoroute = autoroute;
+       dht->autoroute_port = port;
+       
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback)
 {
        ks_assert(dht);
        ks_assert(value);
@@ -149,7 +182,7 @@ KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value,
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
+KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback)
 {
        ks_assert(dht);
        ks_assert(value);
@@ -161,7 +194,7 @@ KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr)
+KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, ks_dht2_endpoint_t **endpoint)
 {
        ks_dht2_endpoint_t *ep;
        ks_socket_t sock;
@@ -172,6 +205,9 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr)
        ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
        ks_assert(addr->port);
 
+       if (endpoint) {
+               *endpoint = NULL;
+       }
 
        dht->bind_ipv4 |= addr->family == AF_INET;
        dht->bind_ipv6 |= addr->family == AF_INET6;
@@ -215,7 +251,11 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr)
                                                                                                                  sizeof(struct pollfd) * dht->endpoints_size);
        dht->endpoints_poll[epindex].fd = ep->sock;
        dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
-       
+
+       if (endpoint) {
+               *endpoint = ep;
+       }
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -260,6 +300,63 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout)
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       // @todo lookup standard def for IPV6 max size
+       char ip[48];
+       ks_dht2_endpoint_t *ep;
+       // @todo calculate max IPV6 payload size?
+       char buf[1000];
+       ks_size_t buf_len;
+       
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+       ks_assert(message->data);
+
+       // @todo blacklist check
+
+       ks_ip_route(ip, sizeof(ip), raddr->host);
+
+       if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+               ks_sockaddr_t addr;
+               ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
+               if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
+                       return KS_STATUS_FAIL;
+               }
+       }
+
+       if (!ep) {
+               ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
+               return KS_STATUS_FAIL;
+       }
+
+       buf_len = ben_encode2(buf, sizeof(buf), message->data);
+
+       ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port);
+       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
+       
+       if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) {
+               ks_log(KS_LOG_DEBUG, "Socket error\n");
+               return KS_STATUS_FAIL;
+       }
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
+{
+       ks_assert(dht);
+
+       return KS_STATUS_SUCCESS;
+}
+
 /**
  *
  */
@@ -276,7 +373,7 @@ KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
 KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
 {
        ks_dht2_message_t message;
-       ks_dht2_registry_callback_t callback;
+       ks_dht2_message_callback_t callback;
        ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
@@ -294,16 +391,21 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
                return KS_STATUS_FAIL;
        }
 
-       if (ks_dht2_message_init(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
+       if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
+
+       if (ks_dht2_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
        
-       if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
+       if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
                ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
        } else {
                ret = callback(dht, raddr, &message);
        }
 
+ done:
        ks_dht2_message_deinit(&message);
        
        return ret;
@@ -319,23 +421,24 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad
        const char *qv;
        ks_size_t qv_len;
        char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE];
-       ks_dht2_registry_callback_t callback;
+       ks_dht2_message_callback_t callback;
        ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
        ks_assert(raddr);
        ks_assert(message);
 
+       // @todo start of ks_dht2_message_parse_query
     q = ben_dict_get_by_str(message->data, "q");
     if (!q) {
-               ks_log(KS_LOG_DEBUG, "Message missing required key 'q'\n");
+               ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
                return KS_STATUS_FAIL;
        }
        
     qv = ben_str_val(q);
        qv_len = ben_str_len(q);
     if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
-               ks_log(KS_LOG_DEBUG, "Message 'q' value has an unexpectedly large size of %d\n", qv_len);
+               ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len);
                return KS_STATUS_FAIL;
        }
 
@@ -345,13 +448,14 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad
 
        a = ben_dict_get_by_str(message->data, "a");
        if (!a) {
-               ks_log(KS_LOG_DEBUG, "Message missing required key 'a'\n");
+               ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n");
                return KS_STATUS_FAIL;
        }
+       // @todo end of ks_dht2_message_parse_query
 
        message->args = a;
 
-       if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
+       if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
                ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
        } else {
                ret = callback(dht, raddr, message);
@@ -360,6 +464,46 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad
        return ret;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       struct bencode *r;
+       ks_dht2_transaction_t *transaction;
+       uint32_t transactionid;
+       uint16_t *tid;
+       ks_status_t ret = KS_STATUS_FAIL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+
+       // @todo start of ks_dht2_message_parse_response
+       r = ben_dict_get_by_str(message->data, "r");
+       if (!r) {
+               ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n");
+               return KS_STATUS_FAIL;
+       }
+       // todo end of ks_dht2_message_parse_response
+
+       message->args = r;
+
+       tid = (uint16_t *)message->transactionid;
+       transactionid = ntohs(*tid);
+
+       transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+       ks_hash_read_unlock(dht->transactions_hash);
+       
+       if (!transaction) {
+               ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
+       } else {
+               ret = transaction->callback(dht, raddr, message);
+       }
+
+       return ret;
+}
+
 /**
  *
  */
@@ -369,6 +513,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t
        const char *idv;
        ks_size_t idv_len;
        ks_dht2_nodeid_t nid;
+       ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
        ks_assert(raddr);
@@ -397,13 +542,122 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t
        }
 
        //ks_log(KS_LOG_DEBUG, "Message query ping id is '%s'\n", id->id);
-       ks_log(KS_LOG_DEBUG, "Mesage query ping is valid\n");
+       ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
+
+       ret = ks_dht2_send_response_ping(dht, raddr, message->transactionid, message->transactionid_length);
 
        ks_dht2_nodeid_deinit(&nid);
 
+       return ret;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr)
+{
+       uint32_t transactionid;
+       ks_dht2_transaction_t *transaction = NULL;
+       ks_dht2_message_t query;
+       struct bencode *a;
+       ks_status_t ret = KS_STATUS_FAIL;
+       
+       ks_assert(dht);
+       ks_assert(raddr);
+
+       // @todo atomic increment or mutex...
+       transactionid = dht->transactionid_next++;
+
+       if (ks_dht2_transaction_alloc(&transaction, dht->pool) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
+
+       if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
+
+       if (ks_dht2_message_prealloc(&query, dht->pool) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
+
+       if (ks_dht2_message_init(&query, KS_TRUE) != KS_STATUS_SUCCESS) {
+           goto done;
+       }
+
+       if (ks_dht2_message_query(&query, transactionid, "ping", &a) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
+
+       // @todo transaction expiration and raddr
+
+       // @todo transactions_hash mutex?
+       ks_hash_insert(dht->transactions_hash, (void *)&transactionid, transaction);
+
+       // @note a joins response.data and will be freed with it
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
+
+       ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+       ret = ks_dht2_send(dht, raddr, &query);
+
+ done:
+       if (transaction && ret != KS_STATUS_SUCCESS) {
+               ks_dht2_transaction_deinit(transaction);
+               ks_dht2_transaction_free(transaction);
+       }
+       ks_dht2_message_deinit(&query);
+       return ret;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht,
+                                                                                                  ks_sockaddr_t *raddr,
+                                                                                                  uint8_t *transactionid,
+                                                                                                  ks_size_t transactionid_length)
+{
+       ks_dht2_message_t response;
+       struct bencode *r;
+       ks_status_t ret = KS_STATUS_FAIL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+
+       if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
+       if (ks_dht2_message_init(&response, KS_TRUE) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
+       if (ks_dht2_message_response(&response, transactionid, transactionid_length, &r) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
+
+       // @note r joins response.data and will be freed with it
+       ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
+
+       ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
+       ret = ks_dht2_send(dht, raddr, &response);
+
+ done:
+       ks_dht2_message_deinit(&response);
+       return ret;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index e3ce9ead779a3097deaf11fc8a59907a9f8d6caa..7629d2da3bc11976ad349af9c0b7066afa5a7ade 100644 (file)
@@ -4,9 +4,6 @@
 #include "ks.h"
 #include "ks_bencode.h"
 
-#include "ks_dht_endpoint.h"
-#include "ks_dht_message.h"
-#include "ks_dht_nodeid.h"
 
 KS_BEGIN_EXTERN_C
 
@@ -14,11 +11,62 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_DEFAULT_PORT 5309
 #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
 
+#define KS_DHT_NODEID_LENGTH 20
+
+#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
+#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
+#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
+
+
 typedef struct ks_dht2_s ks_dht2_t;
+typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t;
+typedef struct ks_dht2_nodeid_raw_s ks_dht2_nodeid_raw_t;
+typedef struct ks_dht2_message_s ks_dht2_message_t;
+typedef struct ks_dht2_endpoint_s ks_dht2_endpoint_t;
+typedef struct ks_dht2_transaction_s ks_dht2_transaction_t;
+
+
+typedef ks_status_t (*ks_dht2_message_callback_t)(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+
+struct ks_dht2_nodeid_raw_s {
+       uint8_t id[KS_DHT_NODEID_LENGTH];
+};
+
+struct ks_dht2_nodeid_s {
+       ks_pool_t *pool;
+       uint8_t id[KS_DHT_NODEID_LENGTH];
+};
+
+struct ks_dht2_message_s {
+       ks_pool_t *pool;
+       struct bencode *data;
+       uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
+       ks_size_t transactionid_length;
+       char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
+       struct bencode *args;
+};
+
+struct ks_dht2_endpoint_s {
+       ks_pool_t *pool;
+       ks_sockaddr_t addr;
+       ks_socket_t sock;
+};
+
+struct ks_dht2_transaction_s {
+       ks_pool_t *pool;
+       uint16_t transactionid;
+       ks_dht2_message_callback_t callback;
+       // @todo expiration data
+};
+
+
 struct ks_dht2_s {
        ks_pool_t *pool;
        ks_bool_t pool_alloc;
 
+       ks_bool_t autoroute;
+       ks_port_t autoroute_port;
+       
        ks_dht2_nodeid_t nodeid;
 
        ks_hash_t *registry_type;
@@ -34,11 +82,14 @@ struct ks_dht2_s {
 
        uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE];
        ks_size_t recv_buffer_length;
-};
-
-typedef ks_status_t (*ks_dht2_registry_callback_t)(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
 
+       uint16_t transactionid_next;
+       ks_hash_t *transactions_hash;
+};
 
+/**
+ *
+ */
 KS_DECLARE(ks_status_t) ks_dht2_alloc(ks_dht2_t **dht, ks_pool_t *pool);
 KS_DECLARE(ks_status_t) ks_dht2_prealloc(ks_dht2_t *dht, ks_pool_t *pool);
 KS_DECLARE(ks_status_t) ks_dht2_free(ks_dht2_t *dht);
@@ -47,15 +98,62 @@ KS_DECLARE(ks_status_t) ks_dht2_free(ks_dht2_t *dht);
 KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t *nodeid);
 KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht);
 
+KS_DECLARE(ks_status_t) ks_dht2_autoroute(ks_dht2_t *dht, ks_bool_t autoroute, ks_port_t port);
 
-KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr);
+KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, ks_dht2_endpoint_t **endpoint);
 KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
 
 
-KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
-KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback);
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_nodeid_alloc(ks_dht2_nodeid_t **nodeid, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_nodeid_prealloc(ks_dht2_nodeid_t *nodeid, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_nodeid_free(ks_dht2_nodeid_t *nodeid);
 
+KS_DECLARE(ks_status_t) ks_dht2_nodeid_init(ks_dht2_nodeid_t *nodeid, const ks_dht2_nodeid_raw_t *id);
+KS_DECLARE(ks_status_t) ks_dht2_nodeid_deinit(ks_dht2_nodeid_t *nodeid);
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_message_prealloc(ks_dht2_message_t *message, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message);
+
+KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data);
+KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message);
+
+KS_DECLARE(ks_status_t) ks_dht2_message_parse(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
+
+KS_DECLARE(ks_status_t) ks_dht2_message_query(ks_dht2_message_t *message,
+                                                                                         uint16_t transactionid,
+                                                                                         const char *query,
+                                                                                         struct bencode **args);
+KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message,
+                                                                                                uint8_t *transactionid,
+                                                                                                ks_size_t transactionid_length,
+                                                                                                struct bencode **args);
+
+/**
+ *
+ */
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_transaction_alloc(ks_dht2_transaction_t **transaction, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *trasnaction, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction);
+
+KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction,
+                                                                                                uint16_t transactionid,
+                                                                                                ks_dht2_message_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction);
+                                                                                                                                                                                                                                                                                               
 KS_END_EXTERN_C
 
 #endif /* KS_DHT_H */
diff --git a/libs/libks/src/dht/ks_dht_endpoint-int.h b/libs/libks/src/dht/ks_dht_endpoint-int.h
deleted file mode 100644 (file)
index 2d09714..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef KS_DHT_ENDPOINT_INT_H
-#define KS_DHT_ENDPOINT_INT_H
-
-#include "ks.h"
-
-KS_BEGIN_EXTERN_C
-
-KS_DECLARE(ks_status_t) ks_dht2_endpoint_alloc(ks_dht2_endpoint_t **endpoint, ks_pool_t *pool);
-KS_DECLARE(ks_status_t) ks_dht2_endpoint_prealloc(ks_dht2_endpoint_t *endpoint, ks_pool_t *pool);
-KS_DECLARE(ks_status_t) ks_dht2_endpoint_free(ks_dht2_endpoint_t *endpoint);
-
-KS_DECLARE(ks_status_t) ks_dht2_endpoint_init(ks_dht2_endpoint_t *endpoint, const ks_sockaddr_t *addr, ks_socket_t sock);
-KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint);
-                                               
-KS_END_EXTERN_C
-
-#endif /* KS_DHT_ENDPOINT_H */
-
-
-/* For Emacs:
-* Local Variables:
-* mode:c
-* indent-tabs-mode:t
-* tab-width:4
-* c-basic-offset:4
-* End:
-* For VIM:
-* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
-*/
index 286d8f43c1a3e7a10589058a982f8f833882ea41..d33a62dc07a3d4e31e656c65c29911df433588a6 100644 (file)
@@ -1,5 +1,5 @@
-#include "ks_dht_endpoint.h"
-#include "ks_dht_endpoint-int.h"
+#include "ks_dht.h"
+#include "ks_dht-int.h"
 
 /**
  *
diff --git a/libs/libks/src/dht/ks_dht_endpoint.h b/libs/libks/src/dht/ks_dht_endpoint.h
deleted file mode 100644 (file)
index 6d9df05..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef KS_DHT_ENDPOINT_H
-#define KS_DHT_ENDPOINT_H
-
-#include "ks.h"
-
-KS_BEGIN_EXTERN_C
-
-typedef struct ks_dht2_endpoint_s ks_dht2_endpoint_t;
-struct ks_dht2_endpoint_s {
-       ks_pool_t *pool;
-       ks_sockaddr_t addr;
-       ks_socket_t sock;
-};
-
-KS_END_EXTERN_C
-
-#endif /* KS_DHT_ENDPOINT_H */
-
-
-/* For Emacs:
-* Local Variables:
-* mode:c
-* indent-tabs-mode:t
-* tab-width:4
-* c-basic-offset:4
-* End:
-* For VIM:
-* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
-*/
index f7ee9608099030ee84cb62b3b42c8439c20f122e..921038b87b61ce15cefc5acd15c80c15035af174 100644 (file)
@@ -1,4 +1,5 @@
 #include "ks_dht.h"
+#include "ks_dht-int.h"
 
 /**
  *
@@ -46,7 +47,44 @@ KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message)
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length)
+KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data)
+{
+       ks_assert(message);
+       ks_assert(message->pool);
+
+       message->data = NULL;
+       message->args = NULL;
+       message->transactionid_length = 0;
+       message->type[0] = '\0';
+       if (alloc_data) {
+               message->data = ben_dict();
+       }
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message)
+{
+       ks_assert(message);
+
+       message->args = NULL;
+       message->type[0] = '\0';
+       message->transactionid_length = 0;
+       if (message->data) {
+               ben_free(message->data);
+               message->data = NULL;
+       }
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_message_parse(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length)
 {
        struct bencode *t;
        struct bencode *y;
@@ -58,8 +96,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u
        ks_assert(message);
        ks_assert(message->pool);
        ks_assert(buffer);
-
-       message->args = NULL;
+       ks_assert(!message->data);
 
     message->data = ben_decode((const void *)buffer, buffer_length);
        if (!message->data) {
@@ -115,16 +152,56 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht2_message_query(ks_dht2_message_t *message,
+                                                                                         uint16_t transactionid,
+                                                                                         const char *query,
+                                                                                         struct bencode **args)
 {
+       struct bencode *a;
+       uint16_t tid;
+       
        ks_assert(message);
+       ks_assert(query);
 
-       message->args = NULL;
-       message->type[0] = '\0';
-       message->transactionid_length = 0;
-       if (message->data) {
-               ben_free(message->data);
-               message->data = NULL;
+       tid = htons(transactionid);
+       
+    ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, 2));
+       ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1));
+       ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
+
+       // @note r joins message->data and will be freed with it
+       a = ben_dict();
+       ben_dict_set(message->data, ben_blob("a", 1), a);
+
+       if (args) {
+               *args = a;
+       }
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message,
+                                                                                                uint8_t *transactionid,
+                                                                                                ks_size_t transactionid_length,
+                                                                                                struct bencode **args)
+{
+       struct bencode *r;
+       
+       ks_assert(message);
+       ks_assert(transactionid);
+       
+    ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
+       ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1));
+       
+       // @note r joins message->data and will be freed with it
+       r = ben_dict();
+       ben_dict_set(message->data, ben_blob("r", 1), r);
+
+       if (args) {
+               *args = r;
        }
 
        return KS_STATUS_SUCCESS;
diff --git a/libs/libks/src/dht/ks_dht_message.h b/libs/libks/src/dht/ks_dht_message.h
deleted file mode 100644 (file)
index 1530dc9..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-#ifndef KS_DHT_MESSAGE_H
-#define KS_DHT_MESSAGE_H
-
-#include "ks.h"
-
-KS_BEGIN_EXTERN_C
-
-#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
-#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
-#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
-
-typedef struct ks_dht2_message_s ks_dht2_message_t;
-struct ks_dht2_message_s {
-       ks_pool_t *pool;
-       struct bencode *data;
-    uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
-       ks_size_t transactionid_length;
-       char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
-       struct bencode *args;
-};
-
-KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool);
-KS_DECLARE(ks_status_t) ks_dht2_message_prealloc(ks_dht2_message_t *message, ks_pool_t *pool);
-KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message);
-
-KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
-KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message);
-
-KS_END_EXTERN_C
-
-#endif /* KS_DHT_MESSAGE_H */
-
-/* For Emacs:
- * Local Variables:
- * mode:c
- * indent-tabs-mode:t
- * tab-width:4
- * c-basic-offset:4
- * End:
- * For VIM:
- * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
- */
-
index af5999c4b3892c7b483ae292a2a80cf65fcc612a..be4b18846357695c606bd3241df78731e5f96e2e 100644 (file)
@@ -1,4 +1,5 @@
-#include "ks_dht_nodeid.h"
+#include "ks_dht.h"
+#include "ks_dht-int.h"
 #include "sodium.h"
 
 /**
diff --git a/libs/libks/src/dht/ks_dht_nodeid.h b/libs/libks/src/dht/ks_dht_nodeid.h
deleted file mode 100644 (file)
index 39705e9..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-#ifndef KS_DHT_NODEID_H
-#define KS_DHT_NODEID_H
-
-#include "ks.h"
-
-KS_BEGIN_EXTERN_C
-
-#define KS_DHT_NODEID_LENGTH 20
-
-typedef struct ks_dht2_nodeid_raw_s ks_dht2_nodeid_raw_t;
-struct ks_dht2_nodeid_raw_s {
-    uint8_t id[KS_DHT_NODEID_LENGTH];
-};
-
-typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t;
-struct ks_dht2_nodeid_s {
-       ks_pool_t *pool;
-    uint8_t id[KS_DHT_NODEID_LENGTH];
-};
-
-KS_DECLARE(ks_status_t) ks_dht2_nodeid_alloc(ks_dht2_nodeid_t **nodeid, ks_pool_t *pool);
-KS_DECLARE(ks_status_t) ks_dht2_nodeid_prealloc(ks_dht2_nodeid_t *nodeid, ks_pool_t *pool);
-KS_DECLARE(ks_status_t) ks_dht2_nodeid_free(ks_dht2_nodeid_t *nodeid);
-
-KS_DECLARE(ks_status_t) ks_dht2_nodeid_init(ks_dht2_nodeid_t *nodeid, const ks_dht2_nodeid_raw_t *id);
-KS_DECLARE(ks_status_t) ks_dht2_nodeid_deinit(ks_dht2_nodeid_t *nodeid);
-
-KS_END_EXTERN_C
-
-#endif /* KS_DHT_NODEID_H */
-
-/* For Emacs:
- * Local Variables:
- * mode:c
- * indent-tabs-mode:t
- * tab-width:4
- * c-basic-offset:4
- * End:
- * For VIM:
- * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
- */
-
diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c
new file mode 100644 (file)
index 0000000..4946e1c
--- /dev/null
@@ -0,0 +1,87 @@
+#include "ks_dht.h"
+#include "ks_dht-int.h"
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_transaction_alloc(ks_dht2_transaction_t **transaction, ks_pool_t *pool)
+{
+       ks_dht2_transaction_t *tran;
+
+       ks_assert(transaction);
+       ks_assert(pool);
+       
+       *transaction = tran = ks_pool_alloc(pool, sizeof(ks_dht2_transaction_t));
+       tran->pool = pool;
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *transaction, ks_pool_t *pool)
+{
+       ks_assert(transaction);
+       ks_assert(pool);
+       
+       transaction->pool = pool;
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction)
+{
+       ks_assert(transaction);
+
+       ks_dht2_transaction_deinit(transaction);
+       ks_pool_free(transaction->pool, transaction);
+
+       return KS_STATUS_SUCCESS;
+}
+                                                                                               
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction,
+                                                                                                uint16_t transactionid,
+                                                                                                ks_dht2_message_callback_t callback)
+{
+       ks_assert(transaction);
+       ks_assert(transaction->pool);
+       ks_assert(callback);
+
+       transaction->transactionid = transactionid;
+       transaction->callback = callback;
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction)
+{
+       ks_assert(transaction);
+
+       transaction->transactionid = 0;
+       transaction->callback = NULL;
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 7fd5d31e0e47534aaf8c7767cbc9850cc44c6a61..d2dfbbd8a67cd1ecfa806849822f5280b8e37659 100644 (file)
@@ -1,7 +1,6 @@
 #include <ks.h>
 #include <../dht/ks_dht.h>
 #include <../dht/ks_dht-int.h>
-#include <../dht/ks_dht_endpoint-int.h>
 #include <tap.h>
 
 #define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
@@ -65,13 +64,13 @@ int main() {
     err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
        ok(err == KS_STATUS_SUCCESS);
        
-    err = ks_dht2_bind(dht1, &addr);
+    err = ks_dht2_bind(dht1, &addr, NULL);
     ok(err == KS_STATUS_SUCCESS);
 
        err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET);
        ok(err == KS_STATUS_SUCCESS);
        
-       err = ks_dht2_bind(&dht2, &addr);
+       err = ks_dht2_bind(&dht2, &addr, NULL);
        ok(err == KS_STATUS_SUCCESS);
 
        raddr = addr;
@@ -81,13 +80,13 @@ int main() {
        err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT, AF_INET6);
        ok(err == KS_STATUS_SUCCESS);
          
-    err = ks_dht2_bind(dht1, &addr);
+    err = ks_dht2_bind(dht1, &addr, NULL);
     ok(err == KS_STATUS_SUCCESS);
 
        err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 1, AF_INET6);
        ok(err == KS_STATUS_SUCCESS);
 
-       err = ks_dht2_bind(&dht2, &addr);
+       err = ks_dht2_bind(&dht2, &addr, NULL);
        ok(err == KS_STATUS_SUCCESS);
   }
 
@@ -105,8 +104,11 @@ int main() {
 
   err = ks_dht2_process(dht1, &raddr);
   ok(err == KS_STATUS_SUCCESS);
-  
 
+  err = ks_dht2_pulse(&dht2, 1000);
+  ok(err == KS_STATUS_SUCCESS);
+
+  diag("Cleanup\n");
   /* Cleanup and shutdown */
 
   err = ks_dht2_deinit(&dht2);