]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Fixed autorouting, must be checked before message generation as the message...
authorShane Bryldt <astaelan@gmail.com>
Fri, 9 Dec 2016 00:49:07 +0000 (00:49 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:34 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht.c

index 5cf4413cc2affb13ade4155777682cb2c7657691..eb9db01561a414f6d44abbcdaf5e795aa146b7da 100644 (file)
@@ -229,6 +229,40 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
+{
+       // @todo lookup standard def for IPV6 max size
+       char ip[48];
+       ks_dht_endpoint_t *ep = NULL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(endpoint);
+
+       *endpoint = NULL;
+       
+    ks_ip_route(ip, sizeof(ip), raddr->host);
+
+       // @todo readlock hash
+       if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+               ks_sockaddr_t addr;
+               ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
+               if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) {
+                       return KS_STATUS_FAIL;
+               }
+       }
+
+       if (!ep) {
+               ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
+               return KS_STATUS_FAIL;
+       }
+       
+       return KS_STATUS_SUCCESS;
+}
+
 /**
  *
  */
@@ -237,7 +271,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
-
+       // @todo writelock registry
        return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
@@ -249,7 +283,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
-
+       // @todo writelock registry
        return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
@@ -261,7 +295,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
        ks_assert(dht);
        ks_assert(value);
        ks_assert(callback);
-
+       // @todo writelock registry
        return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
@@ -286,7 +320,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        dht->bind_ipv4 |= addr->family == AF_INET;
        dht->bind_ipv6 |= addr->family == AF_INET6;
 
-       // @todo start of ks_dht_endpoint_bind
        if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
                return KS_STATUS_FAIL;
        }
@@ -311,7 +344,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE);
        ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE);
 
-       // @todo end of ks_dht_endpoint_bind
        
        epindex = dht->endpoints_size++;
        dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
@@ -329,11 +361,11 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        // @todo initialize or add local nodeid to appropriate route table
        if (ep->addr.family == AF_INET) {
                if (!dht->rt_ipv4) {
-                       //ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, (ks_dhtrt_nodeid_t));
+                       //ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, &ep->nodeid);
                }
        } else {
                if (!dht->rt_ipv6) {
-                       //ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
+                       //ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, &ep->nodeid);
                }
        }
        
@@ -369,6 +401,7 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
                        
                                raddr.family = dht->endpoints[i]->addr.family;
                                if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
+                                       // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool
                                        ks_dht_process(dht, dht->endpoints[i], &raddr);
                                }
                        }
@@ -538,40 +571,23 @@ KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht)
  */
 KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
 {
-       // @todo lookup standard def for IPV6 max size
-       char ip[48];
-       ks_dht_endpoint_t *ep;
        // @todo calculate max IPV6 payload size?
        char buf[1000];
        ks_size_t buf_len;
        
        ks_assert(dht);
        ks_assert(message);
+       ks_assert(message->endpoint);
        ks_assert(message->data);
 
        // @todo blacklist check
 
-       ks_ip_route(ip, sizeof(ip), message->raddr.host);
-
-       if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
-               ks_sockaddr_t addr;
-               ks_addr_set(&addr, ip, dht->autoroute_port, message->raddr.family);
-               if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) {
-                       return KS_STATUS_FAIL;
-               }
-       }
-
-       if (!ep) {
-               ks_log(KS_LOG_DEBUG, "No route available to %s\n", message->raddr.host);
-               return KS_STATUS_FAIL;
-       }
-
        buf_len = ben_encode2(buf, sizeof(buf), message->data);
 
        ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
        ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
 
-       return ks_socket_sendto(ep->sock, (void *)buf, &buf_len, &message->raddr);
+       return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
 }
 
 /**
@@ -594,6 +610,10 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
        ks_assert(transactionid);
        ks_assert(errorstr);
 
+       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
        if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
@@ -639,14 +659,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
        ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
-       ks_assert(ep);
        ks_assert(raddr);
        ks_assert(query);
        ks_assert(callback);
        ks_assert(message);
 
        *message = NULL;
-       
+
+       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
     // @todo atomic increment or mutex
        transactionid = dht->transactionid_next++;
 
@@ -706,13 +729,16 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
        ks_status_t ret = KS_STATUS_FAIL;
 
        ks_assert(dht);
-       ks_assert(ep);
        ks_assert(raddr);
        ks_assert(transactionid);
        ks_assert(message);
        
        *message = NULL;
-       
+
+       if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
        if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
                goto done;
        }
@@ -753,7 +779,7 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k
                return KS_STATUS_FAIL;
        }
 
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
        ks_q_push(dht->send_q, (void *)message);
@@ -777,12 +803,11 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
                return KS_STATUS_FAIL;
        }
        
-       ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
        ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
        ks_q_push(dht->send_q, (void *)message);
-       //ks_dht_send(dht, raddr, message);
 
        return KS_STATUS_SUCCESS;
 }
@@ -818,7 +843,8 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_
        if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
                goto done;
        }
-       
+
+       // @todo readlocking registry for calling from threadpool
        if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
                ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
        } else {
@@ -874,6 +900,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me
 
        message->args = a;
 
+       // @todo readlocking registry for calling from threadpool
        if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
                ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
        } else {
@@ -903,7 +930,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n");
                return KS_STATUS_FAIL;
        }
-       // todo end of ks_dht_message_parse_response
+       // @todo end of ks_dht_message_parse_response
 
        message->args = r;
 
@@ -923,7 +950,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
                           transaction->raddr.host,
                           transaction->raddr.port);
        } else {
-               // @todo mark transaction for later removal
                transaction->finished = KS_TRUE;
                ret = transaction->callback(dht, message);
        }
@@ -969,7 +995,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
        
        memcpy(error, et, es_len);
        error[es_len] = '\0';
-       // todo end of ks_dht_message_parse_error
+       // @todo end of ks_dht_message_parse_error
 
        message->args = e;
 
@@ -989,10 +1015,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
                           transaction->raddr.host,
                           transaction->raddr.port);
        } else {
-               // @todo mark transaction for later removal
                ks_dht_message_callback_t callback;
                transaction->finished = KS_TRUE;
 
+               // @todo readlock on registry
                if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
                        ret = callback(dht, message);
                } else {
@@ -1045,20 +1071,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
                                                          &r) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
-       
-       //if (ks_dht_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
-       //      goto done;
-       //}
 
-       //if (ks_dht_message_init(response, message->endpoint, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
-       //      goto done;
-       //}
-
-       //if (ks_dht_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) {
-       //      goto done;
-       //}
-       
-       ben_dict_set(r, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+       ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
 
        ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
        ks_q_push(dht->send_q, (void *)response);
@@ -1082,8 +1096,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        ks_bool_t want6 = KS_FALSE;
        ks_dht_message_t *response = NULL;
        struct bencode *r = NULL;
-       uint8_t buffer[1000];
-       ks_size_t buffer_length = 0;
+       uint8_t buffer4[1000];
+       uint8_t buffer6[1000];
+       ks_size_t buffer4_length = 0;
+       ks_size_t buffer6_length = 0;
 
        ks_assert(dht);
        ks_assert(message);
@@ -1141,10 +1157,22 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
        
        ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
 
-       // @todo get closest nodes to target from route table
 
-       // @todo compact into buffer
-       if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv, &message->raddr, buffer, &buffer_length, sizeof(buffer)) != KS_STATUS_SUCCESS) {
+       if (want4) {
+               // @todo get closest nodes to target from ipv4 route table
+               // @todo compact nodes into buffer4
+       }
+       if (want6) {
+               // @todo get closest nodes to target from ipv6 route table
+               // @todo compact nodes into buffer6
+       }
+
+       // @todo remove this, testing only
+       if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv,
+                                                                       &message->raddr,
+                                                                       message->raddr.family == AF_INET ? buffer4 : buffer6,
+                                                                       message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length,
+                                                                       message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
        }
 
@@ -1158,21 +1186,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
                return KS_STATUS_FAIL;
        }
 
-       //if (ks_dht_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
-       //      goto done;
-       //}
-
-       //if (ks_dht_message_init(response, message->endpoint, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
-       //      goto done;
-       //}
-
-       //if (ks_dht_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) {
-       //      goto done;
-       //}
-       
-       ben_dict_set(r, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-       // @todo populate nodes/nodes6
-       ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer, buffer_length));
+       ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+       if (want4) {
+               ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
+       }
+       if (want6) {
+               ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
+       }
 
        ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
        ks_q_push(dht->send_q, (void *)response);