]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Indentify local nodes, allow query to distingush local v remote
authorcolm <colm@freeswitch1>
Sat, 10 Dec 2016 00:18:40 +0000 (19:18 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:35 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c

index f0050cf9b03fb98c2cb8c65b62644f11fb160f52..6484a297cf2e97124458a147febc450a57a626dc 100644 (file)
@@ -385,11 +385,11 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
        // @todo initialize or add local nodeid to appropriate route table
        if (ep->addr.family == AF_INET) {
                if (!dht->rt_ipv4) {
-                       ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, ep->nodeid);
+                       ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
                }
        } else {
                if (!dht->rt_ipv6) {
-                       ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, ep->nodeid);
+                       ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
                }
        }
        
index ac76542f9bd7b6978ba7fba81faeb91ddc8472d9..977e410ddeff25b5e63436072028cc5453cfc81d 100644 (file)
@@ -51,11 +51,15 @@ struct ks_dht_nodeid_s {
 };
 
 enum ipfamily { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6};
+enum ks_dht_nodetype_t { ks_dht_remote_t=0x01, 
+                         ks_dht_local_t=0x02, 
+                         ks_dht_both_t=ks_dht_remote_t+ks_dht_local_t };
 
 struct ks_dht_node_s {
     ks_dht_nodeid_t  nodeid;
     ks_sockaddr_t    addr;
-    enum ipfamily    family;                  /* in: AF_INET or AF_INET6 or both   */
+    enum ipfamily    family;                  /* AF_INET or AF_INET6 */
+    enum ks_dht_nodetype_t type;              /* local or remote */
     ks_dhtrt_routetable_t* table;
 };
 
@@ -68,6 +72,7 @@ struct ks_dhtrt_routetable_s {
 struct ks_dhtrt_querynodes_s {
     ks_dht_nodeid_t nodeid;                   /* in: id to query                   */
     enum ipfamily  family;                    /* in: AF_INET or AF_INET6 or both   */
+    enum ks_dht_nodetype_t type;              /* remote, local, or  both           */
     uint8_t        max;                       /* in: maximum to return             */
     uint8_t        count;                     /* out: number returned              */
     ks_dht_node_t* nodes[ KS_DHT_MESSAGE_QUERY_MAX_SIZE]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
@@ -230,11 +235,12 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transact
  * route table methods
  *
  */
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_dht_nodeid_t nodeid);
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool);
 KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
 
 KS_DECLARE(ks_status_t)        ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,
                                   ks_dht_nodeid_t nodeid,
+                                  enum ks_dht_nodetype_t type,
                                   char* ip, unsigned short port,
                                   ks_dht_node_t** node);
 
index 47365c42c942f0629a7694ca921e8c1e615d3b78..31435842e722a2dd6f6b81c817e14108bfc5d6bf 100644 (file)
@@ -53,7 +53,8 @@ typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
 typedef struct ks_dhtrt_bucket_entry_s {
        ks_time_t  tyme;
        uint8_t    id[KS_DHT_NODEID_SIZE];
-       ks_dht_node_t *gptr;                                    /* ptr to peer */          
+       ks_dht_node_t *gptr;                                    /* ptr to peer */       
+    enum ks_dht_nodetype_t type;   
        uint8_t    inuse;
        uint8_t    outstanding_pings;
        uint8_t    flags;                                         /* active, suspect, expired */
@@ -71,10 +72,11 @@ typedef struct ks_dhtrt_bucket_s {
 #define BHF_LEFT 0x80
 
 typedef struct ks_dhtrt_bucket_header_s {
-       struct ks_dhtrt_bucket_header_s* parent;
-       struct ks_dhtrt_bucket_header_s* left;
-       struct ks_dhtrt_bucket_header_s* right;
+       struct ks_dhtrt_bucket_header_s * parent;
+       struct ks_dhtrt_bucket_header_s * left;
+       struct ks_dhtrt_bucket_header_s * right;
        ks_dhtrt_bucket_t *      bucket;
+    ks_dhtrt_bucket_t *         bucketv6;
        ks_time_t                tyme;                             /* last processed time */
        unsigned char    mask[KS_DHT_NODEID_SIZE];      /* node id mask            */
        unsigned char    flags;                   
@@ -84,8 +86,8 @@ typedef struct ks_dhtrt_bucket_header_s {
 typedef struct ks_dhtrt_internal_s {
        uint8_t  localid[KS_DHT_NODEID_SIZE];
        ks_dhtrt_bucket_header_t *buckets;              /* root bucketheader */
-       ks_rwl_t *                         lock;                  /* lock for safe traversal of the tree */ 
-       uint8_t                            locked;
+       ks_rwl_t                           *lock;                 /* lock for safe traversal of the tree */ 
+       uint8_t                             locked;             
 } ks_dhtrt_internal_t;
 
 typedef struct ks_dhtrt_xort_s {
@@ -124,14 +126,14 @@ static
 ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t nodeid);
 
 
-static void 
-ks_dhtrt_shiftright(uint8_t *id); 
+static  
+void ks_dhtrt_shiftright(uint8_t *id); 
 static
 void ks_dhtrt_shiftleft(uint8_t *id);
-static void 
-ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor);
-static int 
-ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask);
+static  
+void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor);
+static  
+int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask);
 
 static
 ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
@@ -147,6 +149,7 @@ static
 uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t *xort);
 static
 uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
+                                         enum ks_dht_nodetype_t type, 
                                                                                 ks_dhtrt_bucket_header_t *header,
                                                                                 ks_dhtrt_sortedxors_t *xors,
                                                                                 unsigned char *hixor,
@@ -162,29 +165,16 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
 /* # define KS_DHT_DEBUGPRINTFX_  very verbose */
 
 
-/*
-  Public interface                     
-  ---------------
-  ks_dhtrt_initroute
-  ks_dhtrt_drinitroute
-
-  ks_dhtrt_insertnode
-*/
-
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_dht_nodeid_t nodeid) 
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool) 
 {
        unsigned char initmask[KS_DHT_NODEID_SIZE];
        memset(initmask, 0xff, sizeof(initmask));
 
        ks_dhtrt_routetable_t *table =   ks_pool_alloc(pool, sizeof(ks_dhtrt_routetable_t));
-       memset(table, 0, sizeof(ks_dhtrt_routetable_t));
 
        ks_dhtrt_internal_t *internal =   ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
-       memset(internal, 0, sizeof(ks_dhtrt_internal_t));
 
        /*ks_rwl_create(&internal->lock, pool);*/
-       if (nodeid.id != 0)      memcpy(internal->localid, nodeid.id, KS_DHT_NODEID_SIZE);
        table->internal = internal;
 
        /* initialize root bucket */
@@ -207,13 +197,13 @@ KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table)
        ks_pool_t *pool = (*table)->pool;
 
        ks_pool_free(pool, *table);
-       *table = NULL;
 
        return;
 }
 
 KS_DECLARE(ks_status_t)         ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, 
-                                                                                          ks_dht_nodeid_t nodeid, 
+                                                                                          ks_dht_nodeid_t nodeid,
+                                                                                          enum ks_dht_nodetype_t type, 
                                                                                           char *ip,
                                                                                           unsigned short port,
                                                                                           ks_dht_node_t **node) 
@@ -233,7 +223,8 @@ KS_DECLARE(ks_status_t)      ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
                }
        }
 
-       memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE);
+    memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE);
+    tnode->type = type;
 
        if ((ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) || 
                (ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS))      { 
@@ -268,13 +259,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
        ks_dhtrt_bucket_t *bucket = 0;
        int insanity = 0;
 
-       /* first see if it exists */
-       ks_dht_node_t *peer = ks_dhtrt_find_node(table, node->nodeid);
-
-       if (peer != 0)  {
-               return KS_STATUS_FAIL;
-       }
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); 
 
        bucket = header->bucket;
@@ -313,7 +297,7 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                if (newmask[KS_DHT_NODEID_SIZE-1] == 0) {  /* no more bits to shift - is this possible */
 #ifdef KS_DHT_DEBUGPRINTF_
                        char buffer[100];
-                       printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(peer->nodeid.id, buffer));
+                       printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, buffer));
 #endif
                        return KS_STATUS_FAIL;
                }
@@ -430,7 +414,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
 
        /* step 1 - look at immediate bucket */
        /* --------------------------------- */
-       cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort0, initid ,max);
+       cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, header, &xort0, initid ,max);
        max -= cnt;
        total += cnt;
 
@@ -438,7 +422,8 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
        printf(" bucket header %s yielded %d nodes; total=%d\n",  buffer, cnt, total);
 #endif
 
-       if (total >= query->max) {       /* is query answered ?  */
+       if (total >= query->max  ||
+        !header->parent       ) {       /* is query answered ?  */
                return ks_dhtrt_load_query(query, &xort0);
        }
 
@@ -462,7 +447,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
                }
        }
 
-       cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort1, initid ,max);
+       cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, header, &xort1, initid ,max);
        max -= cnt;
        total += cnt;
 
@@ -511,7 +496,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
 
                                prev->next = xortn;
                                prev = xortn;
-                               cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, lheader, xortn, leftid ,max);
+                               cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, lheader, xortn, leftid ,max);
                                max -= cnt;
 #ifdef KS_DHT_DEBUGPRINTF_
                                printf(" stage3: seaching left bucket header %s yielded %d nodes, total=%d\n",
@@ -529,7 +514,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
                                memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t));
                                prev->next = xortn1;
                                prev = xortn1;
-                               cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, rheader, xortn1, rightid , max);
+                               cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, rheader, xortn1, rightid , max);
                                max -= cnt;
 #ifdef KS_DHT_DEBUGPRINTF_
                                printf(" stage3: seaching right bucket header %s yielded %d nodes, total=%d\n", 
@@ -860,8 +845,9 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
        if ( free<KS_DHT_BUCKETSIZE ) {
                bucket->entries[free].inuse = 1;
                bucket->entries[free].gptr = node;
-               bucket->entries[free].tyme = ks_time_now();
-               bucket->entries[free].flags &= DHTPEER_ACTIVE;
+        bucket->entries[free].type = node->type;
+        bucket->entries[free].tyme = ks_time_now();
+        bucket->entries[free].flags &= DHTPEER_ACTIVE;
 
                ++bucket->count;
                memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE);
@@ -938,6 +924,7 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
 
 static
 uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
+                                         enum ks_dht_nodetype_t type,
                                                                                 ks_dhtrt_bucket_header_t *header,
                                                                                 ks_dhtrt_sortedxors_t *xors,
                                                                                 unsigned char *hixor,    /*todo: remove */
@@ -963,8 +950,9 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
        }
 
        for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-               if ( bucket->entries[ix].inuse == 1       &&
-                        ks_dhtrt_isactive( &(bucket->entries[ix])) ) {
+           if ( bucket->entries[ix].inuse == 1   &&
+             bucket->entries[ix].type & type  &&
+             ks_dhtrt_isactive( &(bucket->entries[ix])) ) {
                  
                        /* calculate xor value */
                        ks_dhtrt_xor(bucket->entries[ix].id, id, xorvalue );
@@ -1022,7 +1010,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
                           ks_dhtrt_printableid(current->bheader->mask,buf), current->count);
 #endif
                int xorix = current->startix; 
-               for (uint8_t ix = 0; ix<= current->count && loaded < query->max; ++ix ) {
+               for (uint8_t ix = 0; ix< current->count && loaded < query->max; ++ix ) {
                        unsigned int z =  current->xort[xorix].ix;
                        query->nodes[ix] = current->bheader->bucket->entries[z].gptr;
                        ++loaded;
@@ -1069,7 +1057,6 @@ void ks_dhtrt_shiftright(uint8_t *id)
        }
        return;
 }
-
 static
 void ks_dhtrt_shiftleft(uint8_t *id) {