]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Added initial registry for 'y' keys, and some unit testing
authorShane Bryldt <astaelan@gmail.com>
Thu, 1 Dec 2016 04:37:36 +0000 (04:37 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:33 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_endpoint.c
libs/libks/test/testdht2.c

index f878faaabf98aa9613ea6eb8983bf74286a265e4..968462aa1e39ae857616071d004d28a1867bdc6f 100644 (file)
@@ -8,7 +8,12 @@ KS_BEGIN_EXTERN_C
 
 KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
 KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
-
+KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht,
+                                                                         struct bencode **message,
+                                                                         uint8_t *transactionid,
+                                                                         ks_size_t *transactionid_len,
+                                                                         char *messagetype);
+                                               
 KS_END_EXTERN_C
 
 #endif /* KS_DHT_INT_H */
index e86ebee983347778c8fd169b487ed5a6f60058bb..d3e76643a7a254d6d31bcda67a7cd059778e6a8b 100644 (file)
@@ -68,6 +68,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const uint8_t *nodeid)
        if (ks_dht2_nodeid_init(&dht->nodeid, nodeid) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
+
+       ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+       // @todo ks_hash_insert the q/r/e callbacks into y registry
        
     dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
@@ -90,16 +93,43 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
        ks_assert(dht);
 
        dht->recv_buffer_length = 0;
-       // @todo dht->endpoints_poll deinit
-       // @todo dht->endpoints deinit
+       for (int32_t i = 0; i < dht->endpoints_size; ++i) {
+               ks_dht2_endpoint_t *ep = dht->endpoints[i];
+               //ks_hash_remove(dht->endpoints_hash, ep->addr.host);
+               ks_dht2_endpoint_deinit(ep);
+               ks_dht2_endpoint_free(ep);
+       }
+       if (dht->endpoints) {
+               ks_pool_free(dht->pool, dht->endpoints);
+               dht->endpoints = NULL;
+       }
+       if (dht->endpoints_poll) {
+               ks_pool_free(dht->pool, dht->endpoints_poll);
+               dht->endpoints_poll = NULL;
+       }
        ks_hash_destroy(&dht->endpoints_hash);
        dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
+
+       ks_hash_destroy(&dht->registry_y);
+
        ks_dht2_nodeid_deinit(&dht->nodeid);
        
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
+{
+       ks_assert(dht);
+       ks_assert(value);
+       ks_assert(callback);
+
+       return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+}
+
 /**
  *
  */
@@ -220,15 +250,12 @@ KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
  */
 KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
 {
-       struct bencode *message;
-       struct bencode *t;
-       struct bencode *y;
-       const char *tv;
-       const char *yv;
-       ks_size_t tv_len;
-       ks_size_t yv_len;
-       uint16_t transactionid;
-       char messagetype;
+       struct bencode *message = NULL;
+       uint8_t transactionid[KS_DHT_TRANSACTIONID_MAX_SIZE];
+       ks_size_t transactionid_len;
+       char messagetype[KS_DHT_MESSAGETYPE_MAX_SIZE];
+       ks_dht2_registry_callback_t callback;
+       ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
        ks_assert(raddr);
@@ -241,53 +268,99 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
 
        // @todo blacklist check for bad actor nodes
 
-       message = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
-       if (!message) {
-               ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
+       if (ks_dht2_parse(dht, &message, transactionid, &transactionid_len, messagetype) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
+       
+       if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, messagetype, KS_UNLOCKED))) {
+               ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", messagetype);
+       } else {
+               ret = callback(dht, raddr, transactionid, transactionid_len, message);
+       }
+
+       ben_free(message);
+       return ret;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht,
+                                                                         struct bencode **message,
+                                                                         uint8_t *transactionid,
+                                                                         ks_size_t *transactionid_len,
+                                                                         char *messagetype)
+{
+       struct bencode *msg = NULL;
+       struct bencode *t;
+       struct bencode *y;
+       const char *tv;
+       const char *yv;
+       ks_size_t tv_len;
+       ks_size_t yv_len;
+
+       ks_assert(dht);
+       ks_assert(message);
+       ks_assert(transactionid);
+       ks_assert(messagetype);
+
+       msg = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
+       if (!msg) {
+               ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
+           goto failure;
+       }
 
        ks_log(KS_LOG_DEBUG, "Message decoded\n");
-       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message));
+       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(msg));
 
-       t = ben_dict_get_by_str(message, "t");
+       t = ben_dict_get_by_str(msg, "t");
        if (!t) {
                ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n");
-               return KS_STATUS_FAIL;
+               goto failure;
        }
 
        tv = ben_str_val(t);
        tv_len = ben_str_len(t);
-       if (tv_len != sizeof(uint16_t)) {
-               ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpected size of %d\n", tv_len);
-               return KS_STATUS_FAIL;
+       if (tv_len > KS_DHT_TRANSACTIONID_MAX_SIZE) {
+               ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len);
+               goto failure;
        }
 
-       transactionid = ntohs(*((uint16_t *)tv));
-       ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", transactionid);
+       memcpy(transactionid, tv, tv_len);
+       *transactionid_len = tv_len;
+       // @todo hex output of transactionid
+       //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid);
 
-       y = ben_dict_get_by_str(message, "y");
+       y = ben_dict_get_by_str(msg, "y");
        if (!y) {
                ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n");
-               return KS_STATUS_FAIL;
+               goto failure;
        }
 
        yv = ben_str_val(y);
        yv_len = ben_str_len(y);
-       if (yv_len != 1) {
-               ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpected size of %d\n", yv_len);
-               return KS_STATUS_FAIL;
+       if (yv_len >= KS_DHT_MESSAGETYPE_MAX_SIZE) {
+               ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len);
+               goto failure;
        }
 
-       messagetype = (char)yv[0];
-       ks_log(KS_LOG_DEBUG, "Message type is '%c'\n", messagetype);
-
-       // @todo dispatch callback from the 'y' registry
+       memcpy(messagetype, yv, yv_len);
+       messagetype[yv_len] = '\0';
+       ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", messagetype);
        
+       *message = msg;
        return KS_STATUS_SUCCESS;
+       
+ failure:
+       if (msg) {
+               ben_free(msg);
+       }
+       *message = NULL;
+       *transactionid_len = 0;
+       messagetype[0] = '\0';
+       return KS_STATUS_FAIL;
 }
 
-
 /* For Emacs:
  * Local Variables:
  * mode:c
index 0d916aecce3e6ced52100b86968fab0d39b05aac..626b3566f551b80f5c33fba285614622f8e15022 100644 (file)
@@ -12,7 +12,8 @@ KS_BEGIN_EXTERN_C
 
 #define KS_DHT_DEFAULT_PORT 5309
 #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
-
+#define KS_DHT_TRANSACTIONID_MAX_SIZE 20
+#define KS_DHT_MESSAGETYPE_MAX_SIZE 20
 
 typedef struct ks_dht2_s ks_dht2_t;
 struct ks_dht2_s {
@@ -21,6 +22,8 @@ struct ks_dht2_s {
 
        ks_dht2_nodeid_t nodeid;
 
+       ks_hash_t *registry_y;
+
        ks_bool_t bind_ipv4;
        ks_bool_t bind_ipv6;
 
@@ -33,6 +36,12 @@ struct ks_dht2_s {
        ks_size_t recv_buffer_length;
 };
 
+typedef ks_status_t (*ks_dht2_registry_callback_t)(ks_dht2_t *dht,
+                                                                                                  ks_sockaddr_t *raddr,
+                                                                                                  uint8_t *transactionid,
+                                                                                                  ks_size_t transactionid_len,
+                                                                                                  struct bencode *message);
+
 
 KS_DECLARE(ks_status_t) ks_dht2_alloc(ks_dht2_t **dht, ks_pool_t *pool);
 KS_DECLARE(ks_status_t) ks_dht2_prealloc(ks_dht2_t *dht, ks_pool_t *pool);
@@ -46,6 +55,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht);
 KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr);
 KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
 
+
+KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
+
+
 KS_END_EXTERN_C
 
 #endif /* KS_DHT_H */
index 413d33e90a85d7e3dd10e3500e0c97a1aa054ed4..7368a793f454fc67dcfb5f3ddbe0569b7586eba2 100644 (file)
@@ -66,6 +66,8 @@ KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint)
 {
        ks_assert(endpoint);
 
+       ks_socket_close(&endpoint->sock);
+
        return KS_STATUS_SUCCESS;
 }
 
index 1895896374025247b8d970d9670dd1b76ca0299d..55096209cfe2de87203934eac3184decae9f55ce 100644 (file)
@@ -4,10 +4,18 @@
 #include <../dht/ks_dht_endpoint-int.h>
 #include <tap.h>
 
+#define TEST_DHT1_REGISTER_Y_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
 #define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
 
+ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_len, struct bencode *message)
+{
+       diag("dht_z_callback\n");
+       ok(transactionid[0] == '4' && transactionid[1] == '2');
+       return KS_STATUS_SUCCESS;
+}
+
 int main() {
-  ks_size_t buflen = strlen(TEST_DHT1_PROCESS_BUFFER);
+  ks_size_t buflen;
   ks_status_t err;
   int mask = 0;
   ks_dht2_t *dht1 = NULL;
@@ -21,7 +29,7 @@ int main() {
   ok(!err);
 
   ks_global_set_default_logger(7);
-  
+
   err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL);
   ok(err == KS_STATUS_SUCCESS);
   have_v4 = !zstr_buf(v4);
@@ -50,6 +58,8 @@ int main() {
   
   err = ks_dht2_init(&dht2, NULL);
   ok(err == KS_STATUS_SUCCESS);
+
+  ks_dht2_register_y(dht1, "z", dht_z_callback);
   
   if (have_v4) {
     err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
@@ -81,8 +91,8 @@ int main() {
        ok(err == KS_STATUS_SUCCESS);
   }
 
-  // @todo populate dht1->recv_buffer and dht1->recv_buffer_length
-  memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_BUFFER, buflen);
+  buflen = strlen(TEST_DHT1_REGISTER_Y_BUFFER);
+  memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_Y_BUFFER, buflen);
   dht1->recv_buffer_length = buflen;
 
   err = ks_dht2_process(dht1, &raddr);