]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: DHT Repopulate empty buckets
authorcolm <colm@freeswitch1>
Sat, 31 Dec 2016 01:30:11 +0000 (20:30 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/src/dht/ks_dht_job.c
libs/libks/test/testbuckets.c

index 1c33b914788a41aa2421c0f7099dedee5641aa81..76d797ef1c8f8d75bd7e5caee1d461cfb7bd8f69 100644 (file)
@@ -296,6 +296,11 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
                                                                          ks_dht_token_t *token,
                                                                          int64_t cas,
                                                                          ks_dht_storageitem_t *item);
+KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job,
+                                                                       ks_dht_nodeid_t *target,
+                                                                       uint32_t family,
+                                                                       ks_dht_job_callback_t query_callback,
+                                                                       ks_dht_job_callback_t finish_callback);
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
 
 
index 8392ba4017078a6e08c533f5f61e3dba378cb588..a40c1cf8c5791699bb08e25d2b9fdb16b3a32a70 100644 (file)
@@ -3046,6 +3046,43 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
        return ret;
 }
 
+KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job) 
+{
+        return ks_dht_search_findnode(dht,
+                                                                       job->query_family,
+                                                                       &job->query_target,
+                                                                       NULL, 
+                                                                       NULL);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
+                                                                               ks_dhtrt_routetable_t *rt, 
+                                                                               ks_dht_nodeid_t *target, 
+                                                                               ks_dht_job_callback_t callback)
+{
+       ks_dht_job_t *job = NULL;
+       ks_status_t ret = KS_STATUS_SUCCESS;
+
+       ks_assert(dht);
+       ks_assert(rt);
+       ks_assert(target);
+
+       ks_sockaddr_t taddr;   /* just to satisfy the api */
+
+       if ((ret = ks_dht_job_create(&job, dht->pool, &taddr, 3)) == KS_STATUS_SUCCESS) {
+
+               int32_t family = AF_INET;
+       
+               if (rt == dht->rt_ipv6) {
+                       family = AF_INET6;
+               }
+
+               ks_dht_job_build_search_findnode(job, target, family, ks_dht_exec_search_findnode, callback);
+       }
+
+       return ret;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 7f869f67eac0688f185485f6a85224a59839f43b..019688682ddc28c533a18944df5e3bd488dcaa2f 100644 (file)
@@ -145,6 +145,7 @@ struct ks_dht_job_s {
        int64_t query_cas;
        ks_dht_token_t query_token;
        ks_dht_storageitem_t *query_storageitem;
+    uint32_t query_family;
 
        // job specific response parameters
        ks_dht_nodeid_t response_id;
@@ -480,6 +481,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
                                                                                           ks_dht_search_callback_t callback,
                                                                                           ks_dht_search_t **search);
 
+KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
+                                                                                       ks_dhtrt_routetable_t *rt,
+                                                                                       ks_dht_nodeid_t *target,
+                                                                                       ks_dht_job_callback_t callback);
+                                                                               
+KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job);
+
+
 
 /**
  * route table methods
index 9fde747e7fe006158e8b348f460827a224759967..382c8155e4819b60bef3b8a9a518da07e56733f3 100644 (file)
@@ -58,6 +58,7 @@ typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
 /* internal structures */
 typedef struct ks_dhtrt_bucket_entry_s {
        ks_time_t  tyme;
+    ks_time_t  ping_tyme;
        uint8_t    id[KS_DHT_NODEID_SIZE];
        ks_dht_node_t *gptr;                                    /* ptr to peer */       
        uint8_t    inuse;
@@ -186,9 +187,9 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
                                                                                 unsigned int max);
 
 static
-void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
+void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry);
 static
-void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid);
+void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid);
 
 
 /* debugging */
@@ -484,20 +485,20 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                        return KS_STATUS_FAIL;
                }
 
-               /* shift right x bits : todo 1 bit for the moment */
+               /* shift right 1 bit */
                ks_dhtrt_shiftright(newmask);
 
                /* create the new bucket structures */
                ks_dhtrt_bucket_header_t *newleft  = ks_dhtrt_create_bucketheader(table->pool, header, newmask);
 
-        header->right1bit = newleft;
-        newleft->left1bit = header;
-
                newleft->bucket = ks_dhtrt_create_bucket(table->pool); 
                newleft->flags = BHF_LEFT;                                               /* flag as left hand side - therefore splitable */
 
                ks_dhtrt_bucket_header_t *newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask);
 
+        newright->right1bit = newleft;
+        newleft->left1bit = newright;
+
                ks_dhtrt_split_bucket(header, newleft, newright);
 
                /* ok now we need to try again to see if the bucket has capacity */
@@ -938,6 +939,7 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
        ks_time_t t0 = ks_time_now_sec();
 
        if (t0 - internal->last_process_table < internal->next_process_table_delta) {
+        /*printf("process table: next scan not scheduled\n");*/
                return;  
        } 
 
@@ -967,87 +969,90 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 
                                if (b->count == 0) {
 
-                                   if (t0 - b->findtyme >= KS_DHTRT_EXPIREDTIME) {   /* bucket has been empty for a while */
+                                   if (t0 - b->findtyme >= (ks_time_t)KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */
+
                         ks_dht_nodeid_t targetid;
-                                               if (header->left1bit) {
-                                                       ks_dhtrt_midmask(header->left1bit->mask, header->mask, targetid.id);
-                                               }
-                                               else if (header->right1bit) {
+
+                                               if (header->right1bit) {
                             ks_dhtrt_midmask(header->mask, header->right1bit->mask, targetid.id);
                                                }
                                                else {
-                                                       ks_dhtrt_shiftright(targetid.id);
+                                                       ks_dhtrt_nodeid_t rightid;
+                                                       memcpy(rightid, header->mask, KS_DHT_NODEID_SIZE);                          
+                                                       ks_dhtrt_shiftright(rightid);
+                                                       ks_dhtrt_midmask(header->mask, rightid, targetid.id);
                                                }
-                        ks_dhtrt_find(internal, &targetid);
-                        continue;
+
+                        ks_dhtrt_find(table, internal, &targetid);
+                        b->findtyme = t0;
                                        }
                                }
+                               else {
+                                       for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
+                                               ks_dhtrt_bucket_entry_t *e =  &b->entries[ix];
 
-                               for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-                                       ks_dhtrt_bucket_entry_t *e =  &b->entries[ix];
-
-                                       if (e->inuse == 1) {
-
-                                               if (e->gptr->type != KS_DHT_LOCAL) {   /* 'local' nodes do not get expired */
-
-                                                       /* more than n pings outstanding? */
+                                               if (e->inuse == 1) {
 
-                                                       if (e->flags == DHTPEER_DUBIOUS) {
-                                                               continue;
-                                                       }
+                            ks_time_t tdiff = t0 - e->tyme;  
 
-                                                       if ( e->flags != DHTPEER_EXPIRED             && 
-                                                                e->outstanding_pings >= KS_DHTRT_MAXPING ) {
-                                                               ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", 
-                                                                                                               ks_dhtrt_printableid(e->id, buf));
-                                                               e->flags =      DHTPEER_EXPIRED; 
-                                                               ++b->expired_count;
-                                                               e->outstanding_pings = 0;     /* extinguish all hope: do not retry again */ 
-                                                               continue;
-                                                       }
+                                                       if (e->gptr->type != KS_DHT_LOCAL) {   /* 'local' nodes do not get expired */
 
-                                                       /* if not on shortest interval and there are any outstanding pings - send another */
-                                                       if (  internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120 &&
-                                                                 e->outstanding_pings > 0)                                                     {
-                                                               ks_dhtrt_ping(internal, e);
+                                                               /* more than n pings outstanding? */
 
-                                                               if (e->outstanding_pings == 2) {
-                                                                       ++ping2_count;                 /* return in 60 seconds for final check */
-                                                               }
-                                                               else {
-                                                                       ++ping_count;
+                                                               if (e->flags == DHTPEER_DUBIOUS) {
+                                                                       continue;                     /* nothin' to see here */
                                                                }
 
-                                                               continue;
-                                                       }
-
-                            /* if on shortest interval and there are two outstanding pings - send another and final */
-                            if (  internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL60  &&
-                                  e->outstanding_pings >= 2)                                                    {
-                                ks_dhtrt_ping(internal, e);
-                                ++ping_count;
-                                continue;
-                            }
-                                                       
-                                                       ks_time_t tdiff = t0 - e->tyme;
+                                /* refresh empty buckets */
+                                                               if ( e->flags != DHTPEER_EXPIRED             && 
+                                     tdiff >= KS_DHTRT_EXPIREDTIME           &&        /* beyond expired time */ 
+                                                                        e->outstanding_pings >= KS_DHTRT_MAXPING ) {      /* has been retried    */ 
+                                                                       ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", 
+                                                                                                                       ks_dhtrt_printableid(e->id, buf));
+                                                                       e->flags =      DHTPEER_EXPIRED; 
+                                                                       ++b->expired_count;
+                                                                       e->outstanding_pings = 0;     /* extinguish all hope: do not retry again */ 
+                                                                       continue;
+                                                               }
 
-                                                       if (tdiff > KS_DHTRT_EXPIREDTIME) {       
-                                                               e->flags = DHTPEER_DUBIOUS;               /* mark as dubious          */
-                                                               ks_dhtrt_ping(internal, e);               /* final effort to activate */
-                                                               continue;                                 
-                                                       }
+                                                               /* re ping in-doubt nodes */
+                                                               if ( e->outstanding_pings > 0) {
+                                                                       ks_time_t tping = t0 - e->ping_tyme;         /* time since we last pinged */
+
+                                    if (e->outstanding_pings == KS_DHTRT_MAXPING - 1) {  /* final ping */ 
+                                                                               ks_dhtrt_ping(internal, e);
+                                                                               e->ping_tyme = t0;
+                                                                               ++ping2_count;
+                                                                       }
+                                                                       else if (tping >=  KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120) {
+                                                                               ks_dhtrt_ping(internal, e);
+                                                                               e->ping_tyme = t0;
+                                                                               ++ping_count;
+                                                                       }
+                                    continue;
+                                                               } 
+
+                                                               /* look for newly expired nodes */
+                                                               if (tdiff > KS_DHTRT_EXPIREDTIME) {       
+                                                                       e->flags = DHTPEER_DUBIOUS;               /* mark as dubious          */
+                                                                       ks_dhtrt_ping(internal, e);               /* final effort to activate */
+                                                                       e->ping_tyme = t0;
+                                                                       continue;                                 
+                                                               }
 
-                                                       if (tdiff > KS_DHTRT_INACTIVETIME) {          /* inactive for suspicious length */
-                                                               ks_dhtrt_ping(internal, e);               /* kick                           */
-                                                               ++ping_count;                            
-                                                               continue;
-                                                       }
+                                                               if (tdiff > KS_DHTRT_INACTIVETIME) {          /* inactive for suspicious length */
+                                                                       ks_dhtrt_ping(internal, e);               /* kick                           */
+                                                                       e->ping_tyme = t0;
+                                                                       ++ping_count;                            
+                                                                       continue;
+                                                               }
 
-                                               } /* end if not local */
+                                                       } /* end if not local */
 
-                                       }  /* end if e->inuse */
+                                               }  /* end if e->inuse */
 
-                               }       /* end for each bucket_entry */
+                                       }       /* end for each bucket_entry */
+                               }    /* if bucket->count == 0 .... else */            
 
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                                char buf[100];
@@ -1664,6 +1669,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
        char buf[100];
        ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n",
                                        ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
+    /*printf("ping:  %s\n", buf); fflush(stdout);*/
 #endif
        ks_dht_node_t* node = entry->gptr;
        ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port);
@@ -1673,19 +1679,14 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
 }
 
 static
-void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid) {
+void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *target) {
 
-#ifdef  KS_DHT_DEBUGPRINTF_
        char buf[100];
-       ks_log(KS_LOG_DEBUG, "Find queued for mask %s\n", ks_dhtrt_printableid(nodeid->id, buf));
-#endif
-
+       ks_log(KS_LOG_DEBUG, "Find queued for target %s\n", ks_dhtrt_printableid(target->id, buf));
+       ks_dht_queue_search_findnode(internal->dht, table, target, NULL);
     return;
 }
 
-
-
-
 /*
   strictly for shifting the bucketheader mask 
   so format must be a right filled mask (hex: ..ffffffff)
@@ -1723,7 +1724,8 @@ void ks_dhtrt_shiftleft(uint8_t *id) {
 static
 void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) {
 
-    int i = 0;
+    uint8_t i = 0;
+
     memset(midpt, 0, sizeof KS_DHT_NODEID_SIZE);
 
     for ( ; i < KS_DHT_NODEID_SIZE; ++i) {
@@ -1731,17 +1733,28 @@ void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) {
         if (leftid[i] == 0 && rightid[i] == 0) {
             continue;
         }
-        break;    /* first non zero */
+        else if (leftid[i] == 0 || rightid[i] == 0) {
+            midpt[i] = leftid[i] | rightid[i];
+            continue;
+        }
+        else {
+            if (leftid[i] == rightid[i]) {
+                               midpt[i] = leftid[i] >> 1;
+                i++;
+                       }
+            else { 
+                               uint16_t x = leftid[i] + rightid[i];
+                               x >>= 1;
+                               midpt[i++] = (uint8_t)x;
+                       }
+                       break;
+               }
     }
 
     if (i == KS_DHT_NODEID_SIZE) {
                return;
        }
 
-       uint16_t x = leftid[i] + rightid[i];
-       x >>= 1;
-       midpt[i++] = (uint8_t)x;
-
        if ( i < KS_DHT_NODEID_SIZE ) {
                memcpy(&midpt[i], &rightid[i], KS_DHT_NODEID_SIZE-i);
        }
index 9720282a7fe6e8ab95235ce272f19f79b9822666..7479a305fd9f9fe2b85ff33143cb1832593a01e8 100644 (file)
@@ -98,6 +98,23 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
        job->query_storageitem = item;
 }
 
+KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job,
+                                                                                  ks_dht_nodeid_t *target,
+                                                                                  uint32_t family,
+                                           ks_dht_job_callback_t query_callback,
+                                           ks_dht_job_callback_t finish_callback)
+{
+       ks_assert(job);
+       ks_assert(target);
+       ks_assert(family);
+
+       job->search = NULL;
+       job->query_callback = query_callback;
+       job->finish_callback = finish_callback;
+       job->query_target = *target;
+       job->query_family = family;
+}
+
 KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
 {
        ks_dht_job_t *j;
index f0cc32129e409ef659f4f04ff80b93c370174800..18890f251a95769fb7b8f78eb09d65683d8a65a2 100644 (file)
@@ -548,6 +548,117 @@ void test07()
 }
 
 
+void test08()
+{
+     printf("**** testbuckets - test08 start\n"); fflush(stdout);
+
+     ks_dht_node_t  *peer;
+     memset(g_nodeid1.id,  0xef, KS_DHT_NODEID_SIZE);
+     memset(g_nodeid2.id,  0xef, KS_DHT_NODEID_SIZE);
+
+     char ipv6[] = "1234:1234:1234:1234";
+     char ipv4[] = "123.123.123.123";
+    unsigned short port = 7000;
+
+    /* build a delete queue */
+
+    int cix=0;
+
+    for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
+        if (i0%20 == 0) {
+            g_nodeid2.id[cix]>>=1;
+            //ks_dhtrt_dump(rt, 7);
+            if ( g_nodeid2.id[cix] == 0) ++cix;
+            g_nodeid2.id[19] = 0;
+        }
+        else {
+            ++g_nodeid2.id[19];
+        }
+        ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
+        ks_dhtrt_touch_node(rt, g_nodeid2);
+        ks_dhtrt_release_node(peer);
+     }
+
+    cix = 0;
+
+     memset(g_nodeid2.id,  0xef, KS_DHT_NODEID_SIZE);
+     for (int i0=0, i1=0; i0<150; ++i0, ++i1) {
+         if (i0%20 == 0) {
+            g_nodeid2.id[cix]>>=1;
+            if ( g_nodeid2.id[cix] == 0) ++cix;
+            g_nodeid2.id[19] = 0;
+         }
+         else {
+            ++g_nodeid2.id[19];
+         }
+        ks_dht_node_t* n = ks_dhtrt_find_node(rt, g_nodeid2);
+        ks_dhtrt_release_node(n);
+        ks_dhtrt_delete_node(rt, n);
+     }
+
+       /* this should drive the search_findnode */
+
+   for(int i=0; i<45; ++i) {
+     printf("firing process table\n");
+     ks_dhtrt_process_table(rt);
+     ks_sleep(1000 * 1000 * 60);   /* sleep one minutes */
+   }
+
+    printf("**** testbuckets - test08 ended\n"); fflush(stdout);
+}
+
+
+void test09()
+{
+     printf("**** testbuckets - test09 start\n"); fflush(stdout);
+
+     ks_dht_node_t  *peer;
+     memset(g_nodeid1.id,  0xef, KS_DHT_NODEID_SIZE);
+     memset(g_nodeid2.id,  0xef, KS_DHT_NODEID_SIZE);
+
+     char ipv6[] = "1234:1234:1234:1234";
+     char ipv4[] = "123.123.123.123";
+    unsigned short port = 7000;
+
+    /* build a delete queue */
+
+    int cix=0;
+
+    for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
+        if (i0%20 == 0) {
+            g_nodeid2.id[cix]>>=1;
+            //ks_dhtrt_dump(rt, 7);
+            if ( g_nodeid2.id[cix] == 0) ++cix;
+            g_nodeid2.id[19] = 0;
+        }
+        else {
+            ++g_nodeid2.id[19];
+        }
+        ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
+        ks_dhtrt_touch_node(rt, g_nodeid2);
+        ks_dhtrt_release_node(peer);
+     }
+
+    /* this should expire all nodes after 15 minutes and 3 pings */
+
+   printf("\n\n\n\n");
+
+   for(int i=0; i<45; ++i) {
+     printf("firing process table\n");
+     ks_dhtrt_process_table(rt);
+     ks_sleep(1000 * 1000 * 30);   /* sleep 30 seconds */
+   }
+
+    printf("**** testbuckets - test09 ended\n"); fflush(stdout);
+}
+
+
+
+
+
+
+
+
 static int gindex = 1;
 static ks_mutex_t *glock;
 static int gstop = 0;
@@ -1020,7 +1131,6 @@ int main(int argc, char *argv[]) {
                        continue;
                }
 
-
             if (tests[tix] == 7) {
                 ks_dhtrt_initroute(&rt, dht, pool);
                 test07();
@@ -1028,6 +1138,22 @@ int main(int argc, char *argv[]) {
                 continue;
             }
 
+         if (tests[tix] == 8) {
+             ks_dhtrt_initroute(&rt, dht, pool);
+             test08();
+             ks_dhtrt_deinitroute(&rt);
+             continue;
+         }
+
+         if (tests[tix] == 9) {
+             ks_dhtrt_initroute(&rt, dht, pool);
+             test09();
+             ks_dhtrt_deinitroute(&rt);
+             continue;
+         }
+
+
+
                if (tests[tix] == 30) {
                        ks_dhtrt_initroute(&rt, dht, pool);
                        test30();