]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Improve dht route table query performance
authorcolm <colm@freeswitch1>
Wed, 21 Dec 2016 00:50:59 +0000 (19:50 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:37 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/test/testbuckets.c

index f0d2bbadd0336dd1853d1738609b0887a2ebd883..1e9f24e313daa92291606e1a44bcbd988bb50a9c 100644 (file)
@@ -81,6 +81,8 @@ typedef struct ks_dhtrt_bucket_header_s {
        struct ks_dhtrt_bucket_header_s * parent;
        struct ks_dhtrt_bucket_header_s * left;
        struct ks_dhtrt_bucket_header_s * right;
+    struct ks_dhtrt_bucket_header_s * left1bit;
+    struct ks_dhtrt_bucket_header_s * right1bit;
        ks_dhtrt_bucket_t *      bucket;
        ks_time_t                tyme;                             /* last processed time */
        unsigned char    mask[KS_DHT_NODEID_SIZE];      /* node id mask            */
@@ -354,7 +356,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
         char buf[100];
         ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
-        //fflush(stdout);
 #endif
 
        ks_rwl_write_lock(bucket->lock);
@@ -365,10 +366,10 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                /* first - seek a stale entry to eject */
                if (bucket->expired_count) {
                        ks_status_t s = ks_dhtrt_insert_id(bucket, node);
+
                        if (s == KS_STATUS_SUCCESS) {
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                                 ks_log(KS_LOG_DEBUG, "insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
-                                //fflush(stdout);
 #endif
                                 ks_rwl_write_unlock(bucket->lock);
                  ks_rwl_write_unlock(internal->lock);
@@ -389,7 +390,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 #endif
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                        ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-                       //fflush(stdout);
 #endif
                ks_rwl_write_unlock(bucket->lock);
             ks_rwl_write_unlock(internal->lock);
@@ -408,7 +408,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 #endif
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
             ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
-            //fflush(stdout);
 #endif
                        ks_rwl_write_unlock(bucket->lock);
             ks_rwl_write_unlock(internal->lock);
@@ -435,7 +434,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
             ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf));
             ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf));
-            //fflush(stdout);
 #endif
 
             ks_rwl_write_lock(bucket->lock);                   /* lock new bucket */
@@ -460,7 +458,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
     ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n",
                        ks_dhtrt_printableid(header->mask, buf));
-    //fflush(stdout);
 #endif
     ks_rwl_write_unlock(bucket->lock);
     return s;
@@ -485,7 +482,6 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
             char buf[100];
             ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-            //fflush(stdout);
 #endif
 
                        ks_rwl_read_lock(bucket->lock);
@@ -496,7 +492,6 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
                        }
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
             ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-            //fflush(stdout);
 #endif
                        ks_rwl_read_unlock(bucket->lock);
                }
@@ -520,7 +515,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                char buf[100];
                ks_log(KS_LOG_DEBUG, "Touch node: write bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-               //fflush(stdout);
 #endif
 
                ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
@@ -539,7 +533,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
                }
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                ks_log(KS_LOG_DEBUG, "Touch node: UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-               //fflush(stdout);
 #endif
                ks_rwl_write_unlock(header->bucket->lock);
        }
@@ -581,13 +574,11 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
 static
 uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) 
 {
-       uint8_t max = query->max;
        uint8_t total = 0;
        uint8_t cnt;
 
-       if (max == 0) return 0;                 /* sanity checks */
-    if (max > KS_DHTRT_MAXQUERYSIZE) {  /* enforce the maximum */
-               max = KS_DHTRT_MAXQUERYSIZE; 
+       if (query->max == 0) return 0;                  /* sanity checks */
+    if (query->max > KS_DHTRT_MAXQUERYSIZE) {  /* enforce the maximum */
         query->max = KS_DHTRT_MAXQUERYSIZE;
        }
 
@@ -597,7 +588,9 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
 
 #ifdef KS_DHT_DEBUGPRINTF_
        char buffer[100];
-       ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for  nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer));
+       ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for  nodeid %s\n", 
+                                                       query->max, 
+                                                       ks_dhtrt_printableid(query->nodeid.id, buffer));
        ks_log(KS_LOG_DEBUG, "   ...starting at mask: %s\n",  ks_dhtrt_printableid(header->mask, buffer));
 #endif
 
@@ -611,8 +604,8 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
 
        /* step 1 - look at immediate bucket */
        /* --------------------------------- */
+    int max = query->max;
        cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, header, &xort0, initid ,max);
-       max -= cnt;
        total += cnt;
 
 #ifdef KS_DHT_DEBUGPRINTF_
@@ -644,8 +637,8 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
                }
        }
 
+    max = query->count - total;
        cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, header, &xort1, initid ,max);
-       max -= cnt;
        total += cnt;
 
 #ifdef KS_DHT_DEBUGPRINTF_
@@ -668,56 +661,108 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
        memcpy(rightid, xort1.bheader->mask, KS_DHT_NODEID_SIZE);
 
        int insanity = 0;
-       ks_dhtrt_bucket_header_t *lheader; 
-       ks_dhtrt_bucket_header_t *rheader;
+       ks_dhtrt_bucket_header_t *lheader = 0; 
+       ks_dhtrt_bucket_header_t *rheader = 0;
+    ks_dhtrt_bucket_header_t *last_rheader = 0;
+    ks_dhtrt_bucket_header_t *last_lheader = 0;
        ks_dhtrt_sortedxors_t *prev = &xort1;
        ks_dhtrt_sortedxors_t *tofree = 0;
        ks_dhtrt_sortedxors_t *xortn;
        ks_dhtrt_sortedxors_t *xortn1;
 
        do {
+        last_lheader = lheader;
                lheader = 0;
+        last_rheader = rheader;
                rheader = 0;
                xortn = 0;
                xortn1 = 0;
 
                if (leftid[0] != 0xff) { 
+
                        ks_dhtrt_shiftleft(leftid);
-                       lheader = ks_dhtrt_find_bucketheader(table, leftid);
+
+            if (last_lheader && last_lheader->left1bit) {
+                               lheader = last_lheader->left1bit = ks_dhtrt_find_relatedbucketheader(last_lheader->left1bit, leftid);
+                       }
+            else {
+                               lheader = ks_dhtrt_find_bucketheader(table, leftid);
+                if (last_lheader) {
+                                       last_lheader->left1bit = lheader;    /* remember so we can take a shortcut next query */
+                               } 
+                       }
 
                        if (lheader) {            
                                xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
 
-                               if (tofree == 0)   tofree = xortn;
+                               if (tofree == 0) {
+                                       tofree = xortn;
+                               }
 
                                prev->next = xortn;
                                prev = xortn;
-                               cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, 
+                           max = query->max - total;
+                               cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, 
                                                                                                                        lheader, xortn, leftid ,max);
-                               max -= cnt;
+                total += cnt;
 #ifdef KS_DHT_DEBUGPRINTF_
                                ks_log(KS_LOG_DEBUG," stage3: seaching left bucket header %s yielded %d nodes, total=%d\n",
                                           ks_dhtrt_printableid(lheader->mask, buffer), cnt, total);
 #endif
                        }
+#ifdef  KS_DHT_DEBUGPRINTF_
+            else {
+                ks_log(KS_LOG_DEBUG," stage3: failed to find left header %s\n",
+                       ks_dhtrt_printableid(leftid, buffer));
+            }
+#endif
+
                }
 
-               if (max > 0 && rightid[KS_DHT_NODEID_SIZE-1] != 0x00) {
+               if (rightid[KS_DHT_NODEID_SIZE-1] != 0x00) {
+
                        ks_dhtrt_shiftright(rightid);
-                       rheader = ks_dhtrt_find_bucketheader(table, rightid);
+
+            if (last_rheader && last_rheader->right1bit) {
+                rheader = last_rheader->right1bit = ks_dhtrt_find_relatedbucketheader(last_rheader->right1bit, rightid);
+            }
+            else {
+                rheader = ks_dhtrt_find_bucketheader(table, rightid);
+                               if (rheader == last_rheader) {    /* did we get the same bucket header returned */
+                                       rheader = 0;                  /* yes: we are done on the left hand branch   */
+                               }
+                else {
+                       if (last_rheader) {
+                               last_rheader->left1bit = rheader;    /* remember so we can take a shortcut next query */
+                               }
+                               }
+            }
 
                        if (rheader) {
                                xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
+
+                if (tofree == 0) {
+                    tofree = xortn1;
+                }
+
                                prev->next = xortn1;
                                prev = xortn1;
+                               max = query->max - total;
                                cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
                                                                                                                        rheader, xortn1, rightid , max);
-                               max -= cnt;
+                total += cnt;
 #ifdef KS_DHT_DEBUGPRINTF_
                                ks_log(KS_LOG_DEBUG," stage3: seaching right bucket header %s yielded %d nodes, total=%d\n", 
                                           ks_dhtrt_printableid(rheader->mask, buffer), cnt, total);
 #endif
                        }
+#ifdef  KS_DHT_DEBUGPRINTF_
+            else {
+                ks_log(KS_LOG_DEBUG," stage3: failed to find right header %s\n",
+                       ks_dhtrt_printableid(rightid, buffer));
+            }
+#endif
+
                }
           
                if (!lheader && !rheader) {
@@ -729,8 +774,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
                if (insanity > 159) {
                        assert(insanity <= 159);
                }
-
-       } while (max < query->max);
+       } while (total < query->max);
 
 
        ks_dhtrt_load_query(query, &xort0);
@@ -831,6 +875,7 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 #endif
                                                                e->flags =      DHTPEER_EXPIRED; 
                                                                ++b->expired_count;
+                                e->outstanding_pings = 0;     /* extinguish all hope: do not retry again */ 
                                                                continue;
                                                        }
 
@@ -981,12 +1026,19 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
 
                                for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
                                        memset(buffer, 0, 100);
-                                       if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
-                                       else strcpy(buffer, "<free>");
-                                       ks_log(KS_LOG_DEBUG, "     slot %d: %d %d %s\n", ix, 
-                                               b->entries[ix].flags,  
+                                       if (b->entries[ix].inuse == 1) {
+                         ks_dhtrt_printableid(b->entries[ix].id, buffer);
+                         ks_dht_node_t *n = b->entries[ix].gptr;
+                                ks_log(KS_LOG_DEBUG, "     slot %d: flags:%d %d type:%d family:%d %s\n", ix,
+                                               b->entries[ix].flags,
                                                b->entries[ix].outstanding_pings,
+                                               n->type,
+                                               n->family,
                                                buffer);
+                                       }
+                                       else {
+                                                       ks_log(KS_LOG_DEBUG, "     slot %d: <free>\n", ix); 
+                                       }
                                }
 
                                ks_log(KS_LOG_DEBUG, "   --------------------------\n\n");
@@ -1312,7 +1364,6 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
         char buf[100];
         ks_log(KS_LOG_DEBUG, "closestbucketnodes: LOCKING bucket %s\n",
                ks_dhtrt_printableid(header->mask, buf));
-        //fflush(stdout);
 #endif
     
 
index e5204faf1b66d52e0e386a0a3a43e987ff3eb8fc..e4df3fe221cc00bb34fbdfe764e347f9eb7d16ce 100644 (file)
@@ -178,33 +178,45 @@ void test03()
    enum ks_afflags_t both = ifboth;
 
    ks_status_t status;
+   int ipv4_remote = 0;
+       int ipv4_local = 0;
 
    for (int i=0; i<200; ++i) {
      if (i%10 == 0) {
            ++nodeid.id[0];
+           nodeid.id[1] = 0;
      }
      else {
            ++nodeid.id[1];
      } 
-     ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
-     ks_dhtrt_touch_node(rt, nodeid);
+     ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+     if (s0 == KS_STATUS_SUCCESS) {
+        ks_dhtrt_touch_node(rt, nodeid);
+        ++ipv4_remote;
+     }
    }
 
    for (int i=0; i<2; ++i) {
      if (i%10 == 0) {
            ++nodeid.id[0];
+           nodeid.id[1] = 0;
      }
      else {
            ++nodeid.id[1];
      }
 
-     ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
-     ks_dhtrt_touch_node(rt, nodeid);
+     ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
+        if (s0 == KS_STATUS_SUCCESS) {
+               ks_dhtrt_touch_node(rt, nodeid);
+               ++ipv4_local;
+        }
    }
 
    for (int i=0; i<201; ++i) {
      if (i%10 == 0) {
            ++nodeid.id[0];
+           nodeid.id[1] = 0;
      }
      else {
            ++nodeid.id[1];
@@ -214,8 +226,12 @@ void test03()
    }
 
 
+   ks_dhtrt_dump(rt, 7);
+
+
    int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
-   printf("\n** local query count expected 2, actual %d\n", qcount); fflush(stdout);
+   printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
    qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, both);
    printf("\n*** remote query count expected 20, actual %d\n", qcount); fflush(stdout);
    qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both);
@@ -230,7 +246,7 @@ void test03()
    printf("\n*** AF_INET6 count expected 20, actual %d\n", qcount); fflush(stdout);
 
    qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv4);
-   printf("\n*** remote AF_INET  query count expected 20, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** remote AF_INET  query count expected 20, actual %d max %d\n", qcount, ipv4_remote); fflush(stdout);
    qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv6);
    printf("\n*** remote AF_INET6 query count expected 20, actual %d\n", qcount); fflush(stdout);
 
@@ -452,6 +468,88 @@ void test06()
 }
 
 
+void test30()
+{
+ printf("*** testbuckets - test03 start\n"); fflush(stdout);
+
+   ks_dht_node_t*  peer;
+   ks_dht_nodeid_t nodeid;
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+
+   char ipv6[] = "1234:1234:1234:1234";
+   char ipv4[] = "123.123.123.123";
+   unsigned short port = 7000;
+   enum ks_afflags_t both = ifboth;
+
+   ks_status_t status;
+   int ipv4_remote = 0;
+    int ipv4_local = 0;
+
+   for (int i=0; i<200; ++i) {
+     if (i%10 == 0) {
+           ++nodeid.id[0];
+           nodeid.id[1] = 0;
+     }
+     else {
+           ++nodeid.id[1];
+     }
+     ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+     if (s0 == KS_STATUS_SUCCESS) {
+        ks_dhtrt_touch_node(rt, nodeid);
+        ++ipv4_remote;
+     }
+   }
+
+   for (int i=0; i<2; ++i) {
+     if (i%10 == 0) {
+           ++nodeid.id[0];
+           nodeid.id[1] = 0;
+     }
+     else {
+           ++nodeid.id[1];
+     }
+
+     ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
+     if (s0 == KS_STATUS_SUCCESS) {
+        ks_dhtrt_touch_node(rt, nodeid);
+        ++ipv4_local;
+     }
+   }
+
+   for (int i=0; i<201; ++i) {
+     if (i%10 == 0) {
+           ++nodeid.id[0];
+           nodeid.id[1] = 0;
+     }
+     else {
+           ++nodeid.id[1];
+     }
+     ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
+     ks_dhtrt_touch_node(rt, nodeid);
+   }
+
+
+   ks_dhtrt_dump(rt, 7);
+
+
+   int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
+   printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
+   qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
+   printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
+   qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both);
+   printf("\n** local query count expected 20, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
+   return;
+}
+
+
+
+
+
+
+
 /* test resue of node memory */
 void test50()
 {
@@ -610,8 +708,11 @@ int main(int argc, char* argv[]) {
         ks_dht_create(&dht, NULL, NULL);
 
 
-   ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+  // ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+       
+       tpool = 0;
 
+       
    ks_status_t status;
    char *str = NULL;
    int bytes = 1024;
@@ -675,6 +776,15 @@ int main(int argc, char* argv[]) {
         continue;
      }
 
+
+     if (tests[tix] == 30) {
+        ks_dhtrt_initroute(&rt, dht, pool, tpool);
+        test30();
+        ks_dhtrt_deinitroute(&rt);
+        continue;
+     }
+
+
      if (tests[tix] == 50) {
         ks_dhtrt_initroute(&rt, dht, pool, tpool);
         test50();