]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Renamed registries, added query registry with ping callback, unit test updated
authorShane Bryldt <astaelan@gmail.com>
Thu, 1 Dec 2016 21:16:35 +0000 (21:16 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:33 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_message.h
libs/libks/test/testdht2.c

index d253245e9f312a8e234847ca93a6664731df6830..9f10f5599e461d95cb911bd2647a91e450ec47cc 100644 (file)
@@ -9,7 +9,9 @@ KS_BEGIN_EXTERN_C
 KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
 KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
 
-                                               
+KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+
 KS_END_EXTERN_C
 
 #endif /* KS_DHT_INT_H */
index d11a5be1310fa1387189f5606cdccff049686f70..bf80f55fc14d7057c3acf1cdb5154c5e5ecc0e64 100644 (file)
@@ -70,8 +70,12 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const uint8_t *nodeid)
                return KS_STATUS_FAIL;
        }
 
-       ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
-       // @todo ks_hash_insert the q/r/e callbacks into y registry
+       ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+       ks_dht2_register_type(dht, "q", ks_dht2_process_query);
+       // @todo ks_hash_insert the r/e callbacks into type registry
+
+       ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+       ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping);
        
     dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
@@ -116,9 +120,13 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
        dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
 
-       if (dht->registry_y) {
-               ks_hash_destroy(&dht->registry_y);
-               dht->registry_y = NULL;
+       if (dht->registry_type) {
+               ks_hash_destroy(&dht->registry_type);
+               dht->registry_type = NULL;
+       }
+       if (dht->registry_query) {
+               ks_hash_destroy(&dht->registry_query);
+               dht->registry_query = NULL;
        }
 
        ks_dht2_nodeid_deinit(&dht->nodeid);
@@ -129,13 +137,25 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
+KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
 {
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
 
-       return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+       return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
+{
+       ks_assert(dht);
+       ks_assert(value);
+       ks_assert(callback);
+
+       return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
 /**
@@ -151,10 +171,7 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr)
        ks_assert(addr);
        ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
        ks_assert(addr->port);
-       
-       //if (!addr->port) {
-       //      addr->port = KS_DHT_DEFAULT_PORT;
-       //}
+
 
        dht->bind_ipv4 |= addr->family == AF_INET;
        dht->bind_ipv6 |= addr->family == AF_INET6;
@@ -281,7 +298,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
                return KS_STATUS_FAIL;
        }
        
-       if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, message.type, KS_UNLOCKED))) {
+       if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
                ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
        } else {
                ret = callback(dht, raddr, &message);
@@ -292,6 +309,101 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
        return ret;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       struct bencode *q;
+       struct bencode *a;
+       const char *qv;
+       ks_size_t qv_len;
+       char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE];
+       ks_dht2_registry_callback_t callback;
+       ks_status_t ret = KS_STATUS_FAIL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+
+    q = ben_dict_get_by_str(message->data, "q");
+    if (!q) {
+               ks_log(KS_LOG_DEBUG, "Message missing required key 'q'\n");
+               return KS_STATUS_FAIL;
+       }
+       
+    qv = ben_str_val(q);
+       qv_len = ben_str_len(q);
+    if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
+               ks_log(KS_LOG_DEBUG, "Message 'q' value has an unexpectedly large size of %d\n", qv_len);
+               return KS_STATUS_FAIL;
+       }
+
+       memcpy(query, qv, qv_len);
+       query[qv_len] = '\0';
+       ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
+
+       a = ben_dict_get_by_str(message->data, "a");
+       if (!a) {
+               ks_log(KS_LOG_DEBUG, "Message missing required key 'a'\n");
+               return KS_STATUS_FAIL;
+       }
+
+       message->args = a;
+
+       if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
+               ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+       } else {
+               ret = callback(dht, raddr, message);
+       }
+
+       return ret;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       struct bencode *id;
+       const char *idv;
+       ks_size_t idv_len;
+       ks_dht2_nodeid_t nid;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+       ks_assert(message->args);
+
+    id = ben_dict_get_by_str(message->args, "id");
+    if (!id) {
+               ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n");
+               return KS_STATUS_FAIL;
+       }
+       
+    idv = ben_str_val(id);
+       idv_len = ben_str_len(id);
+    if (idv_len != KS_DHT_NODEID_LENGTH) {
+               ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len);
+               return KS_STATUS_FAIL;
+       }
+
+       if (ks_dht2_nodeid_prealloc(&nid, dht->pool) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
+       if (ks_dht2_nodeid_init(&nid, (const uint8_t *)idv) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
+       //ks_log(KS_LOG_DEBUG, "Message query ping id is '%s'\n", id->id);
+       ks_log(KS_LOG_DEBUG, "Mesage query ping is valid\n");
+
+       ks_dht2_nodeid_deinit(&nid);
+
+       return KS_STATUS_SUCCESS;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 03633e2da64245eae53c32143c3f02bfa0788ec7..bd087b9c106fac867782e6dd6873b4707dddb72e 100644 (file)
@@ -21,7 +21,8 @@ struct ks_dht2_s {
 
        ks_dht2_nodeid_t nodeid;
 
-       ks_hash_t *registry_y;
+       ks_hash_t *registry_type;
+       ks_hash_t *registry_query;
 
        ks_bool_t bind_ipv4;
        ks_bool_t bind_ipv6;
@@ -51,7 +52,8 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr);
 KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
 
 
-KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
+KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
 
 
 KS_END_EXTERN_C
index 13ad52ee22691466718d7243ff779506ebe5e142..f7ee9608099030ee84cb62b3b42c8439c20f122e 100644 (file)
@@ -59,6 +59,8 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u
        ks_assert(message->pool);
        ks_assert(buffer);
 
+       message->args = NULL;
+
     message->data = ben_decode((const void *)buffer, buffer_length);
        if (!message->data) {
                ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
@@ -117,6 +119,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message)
 {
        ks_assert(message);
 
+       message->args = NULL;
        message->type[0] = '\0';
        message->transactionid_length = 0;
        if (message->data) {
index 0e0cd1f7658d9ca0f945b2eba1dd79619843e0c6..1530dc94f1d6d027e7737fa272771df546b564e5 100644 (file)
@@ -7,6 +7,7 @@ KS_BEGIN_EXTERN_C
 
 #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
+#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
 
 typedef struct ks_dht2_message_s ks_dht2_message_t;
 struct ks_dht2_message_s {
@@ -15,6 +16,7 @@ struct ks_dht2_message_s {
     uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
        ks_size_t transactionid_length;
        char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
+       struct bencode *args;
 };
 
 KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool);
index fa6f1ecc954a045ea5a3379112668c767fdeb09d..7fd5d31e0e47534aaf8c7767cbc9850cc44c6a61 100644 (file)
@@ -4,8 +4,8 @@
 #include <../dht/ks_dht_endpoint-int.h>
 #include <tap.h>
 
-#define TEST_DHT1_REGISTER_Y_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
-#define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
+#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
+#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
 
 ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
 {
@@ -59,7 +59,7 @@ int main() {
   err = ks_dht2_init(&dht2, NULL);
   ok(err == KS_STATUS_SUCCESS);
 
-  ks_dht2_register_y(dht1, "z", dht_z_callback);
+  ks_dht2_register_type(dht1, "z", dht_z_callback);
   
   if (have_v4) {
     err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
@@ -91,8 +91,16 @@ int main() {
        ok(err == KS_STATUS_SUCCESS);
   }
 
-  buflen = strlen(TEST_DHT1_REGISTER_Y_BUFFER);
-  memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_Y_BUFFER, buflen);
+  buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
+  memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
+  dht1->recv_buffer_length = buflen;
+
+  err = ks_dht2_process(dht1, &raddr);
+  ok(err == KS_STATUS_SUCCESS);
+
+
+  buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
+  memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);
   dht1->recv_buffer_length = buflen;
 
   err = ks_dht2_process(dht1, &raddr);