]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Drive ping logic from dhtrt_process_table
authorcolm <colm@freeswitch1>
Tue, 20 Dec 2016 15:35:40 +0000 (10:35 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:37 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/test/testbuckets.c

index 6c7331887381a7a9a1455f41ebc0a84123a711b8..ff457c4078fa4433b7ef36df8721a0965f548fac 100644 (file)
@@ -537,7 +537,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
         * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
         */
        if (ep->addr.family == AF_INET) {
-               if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
+               if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
                if ((ret = ks_dhtrt_create_node(dht->rt_ipv4,
                                                                                ep->nodeid,
                                                                                KS_DHT_LOCAL,
@@ -545,7 +545,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
                                                                                ep->addr.port,
                                                                                &ep->node)) != KS_STATUS_SUCCESS) goto done;
        } else {
-               if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
+               if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
                if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
                                                                                ep->nodeid,
                                                                                KS_DHT_LOCAL,
index 1f320b997a3887e103177fc27ba1e1fc1bf5daa8..4e4f07456b345d1d7b2fccef90429e6daa40649b 100644 (file)
@@ -406,7 +406,10 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
  * route table methods
  *
  */
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool);
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, 
+                                                                                       ks_dht_t *dht, 
+                                                                                       ks_pool_t *pool, 
+                                                                                       ks_thread_pool_t* tpool);
 KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
 
 KS_DECLARE(ks_status_t)        ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,
index 5577b49dc91f0cdcc08e444cda18573be2cfa106..a04717ada8d347ac0901c107b522a4189b44ba25 100644 (file)
 
 /* change for testing */
 #define KS_DHT_BUCKETSIZE 20
-#define KS_DHTRT_INACTIVETIME  (15*60)  
+#define KS_DHTRT_INACTIVETIME  (10*60) 
+#define KS_DHTRT_EXPIREDTIME   (15*60)
 #define KS_DHTRT_MAXPING  3
 #define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)  
+#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120)
 #define KS_DHTRT_RECYCLE_NODE_THRESHOLD  100
 
 /* peer flags */
@@ -49,6 +51,7 @@
 #define DHTPEER_EXPIRED 1
 #define DHTPEER_ACTIVE  2
 
+
 typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
 
 /* internal structures */
@@ -79,7 +82,6 @@ typedef struct ks_dhtrt_bucket_header_s {
        struct ks_dhtrt_bucket_header_s * left;
        struct ks_dhtrt_bucket_header_s * right;
        ks_dhtrt_bucket_t *      bucket;
-    ks_dhtrt_bucket_t *         bucketv6;
        ks_time_t                tyme;                             /* last processed time */
        unsigned char    mask[KS_DHT_NODEID_SIZE];      /* node id mask            */
        unsigned char    flags;                   
@@ -93,9 +95,11 @@ typedef struct ks_dhtrt_deletednode_s {
 typedef struct ks_dhtrt_internal_s {
        uint8_t  localid[KS_DHT_NODEID_SIZE];
        ks_dhtrt_bucket_header_t *buckets;              /* root bucketheader */
+       ks_dht_t               *dht; 
     ks_thread_pool_t       *tpool;
        ks_rwl_t                           *lock;                   /* lock for safe traversal of the tree */
        ks_time_t              last_process_table;
+    ks_time_t              next_process_table_delta;
     ks_mutex_t             *deleted_node_lock;
     ks_dhtrt_deletednode_t *deleted_node;
     ks_dhtrt_deletednode_t *free_node_ex;
@@ -130,6 +134,8 @@ ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool);
 static
 ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *table, ks_dhtrt_nodeid_t id);
 static
+ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id);
+static
 ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id);
 
 static
@@ -178,7 +184,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
                                                                                 unsigned int max);
 
 static
-void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
+void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
 
 
 
@@ -187,12 +193,15 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
 /* very verbose                   */
 /* # define KS_DHT_DEBUGPRINTFX_  */
 /* debug locking                  */
-/* # define KS_DHT_DEBUGLOCKPRINTF_  */
+#define KS_DHT_DEBUGLOCKPRINTF_  
 
 KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
-                                           ks_pool_t *pool, 
-                                           ks_thread_pool_t* tpool) 
+                                                                                       ks_dht_t *dht,
+                                                                                       ks_pool_t *pool, 
+                                                                                       ks_thread_pool_t* tpool) 
 {
+(void)ks_dhtrt_find_relatedbucketheader;
+
        unsigned char initmask[KS_DHT_NODEID_SIZE];
        memset(initmask, 0xff, sizeof(initmask));
 
@@ -202,6 +211,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
 
        ks_rwl_create(&internal->lock, pool);
     internal->tpool = tpool;
+       internal->dht   = dht;  
+    internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL; 
     ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
        table->internal = internal;
 
@@ -305,15 +316,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                        char buf[100];
             ks_log(KS_LOG_DEBUG, "Delete node: LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-                       //printf("delete node: LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
-                       //fflush(stdout);
 #endif
                        ks_rwl_write_lock(bucket->lock);
                        s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                        ks_log(KS_LOG_DEBUG, "Delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
-                       //printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
-                       //fflush(stdout);
 #endif
 
                        ks_rwl_write_unlock(bucket->lock);
@@ -767,13 +774,20 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
        /*                                                                                                      */
 
        ks_dhtrt_internal_t *internal = table->internal;
+    int ping_count = 0;
 
     ks_time_t t0 = ks_time_now_sec();
 
-    if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
+    /*
+    printf("process_table: %" PRId64 "   %" PRId64 "\n", t0 - internal->last_process_table, internal->next_process_table_delta);
+    */
+
+    if (t0 - internal->last_process_table < internal->next_process_table_delta) {
                return;  
     } 
 
+    internal->last_process_table = t0;
+
     ks_log(KS_LOG_DEBUG,"process_table in progress\n");
 
        ks_rwl_read_lock(internal->lock);      /* grab read lock */
@@ -805,22 +819,40 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 
                                                        /* more than n pings outstanding? */
 
-                                                       if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
+                            if (e->flags == DHTPEER_DUBIOUS) {
+                                continue;
+                            }
+
+                                                       if ( e->flags != DHTPEER_EXPIRED             && 
+                                                                e->outstanding_pings >= KS_DHTRT_MAXPING ) {
+#ifdef  KS_DHT_DEBUGPRINTF_
+                                                               ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", 
+                                                                                                               ks_dhtrt_printableid(e->id, buf));
+#endif
                                                                e->flags =      DHTPEER_EXPIRED; 
                                                                ++b->expired_count;
                                                                continue;
                                                        }
 
-                                                       if (e->flags == DHTPEER_DUBIOUS) {
-                                                               ks_dhtrt_ping(e);
-                                                               continue;
+                                                       /* if there are any outstanding pings - send another */
+                                                       if (e->outstanding_pings > 0) {
+                                ks_dhtrt_ping(internal, e);
+                                ++ping_count;
+                                continue;
                                                        }
 
                                                        ks_time_t tdiff = t0 - e->tyme;
 
-                                                       if (tdiff > KS_DHTRT_INACTIVETIME) {
-                                                               e->flags = DHTPEER_DUBIOUS;
-                                                               ks_dhtrt_ping(e);
+                            if (tdiff > KS_DHTRT_EXPIREDTIME) {       
+                                e->flags = DHTPEER_DUBIOUS;               /* mark as dubious          */
+                                ks_dhtrt_ping(internal, e);               /* final effort to activate */
+                                                               continue;                                 
+                            }
+
+                                                       if (tdiff > KS_DHTRT_INACTIVETIME) {          /* inactive for suspicious length */
+                                                               ks_dhtrt_ping(internal, e);               /* kick                           */
+                                                               ++ping_count;                            
+                                                               continue;
                                                        }
 
                                                } /* end if not local */
@@ -830,8 +862,8 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
                                }       /* end for each bucket_entry */
 
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
-        char buf1[100];
-        ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
+                               char buf1[100];
+                               ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
 #endif
 
                                ks_rwl_write_unlock(b->lock);
@@ -857,6 +889,14 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 
     ks_dhtrt_process_deleted(table);
 
+    if (ping_count == 0) {
+               internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
+       }
+    else {
+               internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_SHORTINTERVAL;
+       }
+    ks_log(KS_LOG_DEBUG,"process_table complete\n");
+
        return;
 }
 
@@ -943,7 +983,10 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
                                        memset(buffer, 0, 100);
                                        if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
                                        else strcpy(buffer, "<free>");
-                                       ks_log(KS_LOG_DEBUG, "     slot %d: %d %s\n", ix, b->entries[ix].flags,  buffer);
+                                       ks_log(KS_LOG_DEBUG, "     slot %d: %d %d %s\n", ix, 
+                                               b->entries[ix].flags,  
+                                               b->entries[ix].outstanding_pings,
+                                               buffer);
                                }
 
                                ks_log(KS_LOG_DEBUG, "   --------------------------\n\n");
@@ -979,7 +1022,6 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
        char buffer[100];
        ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer));
        if (parent) ks_log(KS_LOG_DEBUG, "  ... from parent mask: %s\n",  ks_dhtrt_printableid(parent->mask, buffer));
-       printf("\n"); 
 #endif
        return header;
 }
@@ -1018,6 +1060,32 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *tabl
        return NULL;
 }
 
+static
+ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id)
+{
+    /*
+         using the passed bucket header as a starting point find the right bucket.
+         This is a shortcut used in query to shorten the search path for queries extending beyond a single bucket.
+    */
+
+    while (header) {
+        if ( header->bucket ) {
+            return header;
+        }
+
+        /* left hand side is more restrictive (closer) so should be tried first */
+        if (header->left != 0 && (ks_dhtrt_ismasked(id, header->left->mask))) {
+            header = header->left;
+        } else {
+            header = header->right;
+        }
+    }
+
+    return NULL;
+}
+
+
+
 static
 ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t nodeid) 
 {
@@ -1392,18 +1460,17 @@ ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
      return node;
 }
 
-void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
+void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry) {
        ++entry->outstanding_pings;
-       /* @todo */
-       /* set the appropriate command in the node and queue if for processing */
-       /*ks_dht_node_t *node = entry->gptr; */
-       /* ++entry->outstanding_pings; */
 
 #ifdef KS_DHT_DEBUGPRINTF_
        char buf[100];
-       printf("  ping queued for nodeid %s count %d\n",
+       ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n",
                   ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
 #endif
+    ks_dht_node_t* node = entry->gptr;
+    ks_dht_ping(internal->dht, &node->addr, NULL);
+
        return;
 }
 
index 43fd1b2136a2bca391ed544c9351d3f4e220257f..e5204faf1b66d52e0e386a0a3a43e987ff3eb8fc 100644 (file)
@@ -5,6 +5,7 @@
 //#include "ks.h"
 #include "../src/dht/ks_dht.h"
 
+ks_dht_t* dht;
 ks_dhtrt_routetable_t* rt;
 ks_pool_t* pool;
 ks_thread_pool_t* tpool;
@@ -30,10 +31,10 @@ void test01()
        printf("*** testbuckets - test01 start\n"); fflush(stdout);
 
    ks_dhtrt_routetable_t* rt;
-   ks_dhtrt_initroute(&rt, pool, tpool);
+   ks_dhtrt_initroute(&rt, dht, pool, tpool);
    ks_dhtrt_deinitroute(&rt);
  
-   ks_dhtrt_initroute(&rt, pool, tpool);
+   ks_dhtrt_initroute(&rt, dht, pool, tpool);
    ks_dht_nodeid_t nodeid, homeid;
    memset(homeid.id,  0xdd, KS_DHT_NODEID_SIZE);
    homeid.id[19] = 0;
@@ -255,14 +256,18 @@ void test04()
 
    ks_status_t status;
 
-   for (int i=0,i2=0; i<10000; ++i) {
-     if (i%40 == 0) {
-           ++nodeid.id[0];
-           if(i2%40 == 0) {
-               ++nodeid.id[1];
+   for (int i=0,i2=0,i3=0; i<10000; ++i, ++i2,  ++i3) {
+     if (i%20 == 0) {
+           nodeid.id[0] =  nodeid.id[0] / 2;
+           if(i2%20 == 0) {
+               nodeid.id[1] = nodeid.id[1] / 2;
+               i2 = 0;
+                  if(i3%20 == 0) {
+                          nodeid.id[2] = nodeid.id[2] / 2;
+                                }
            }
            else {
-               ++nodeid.id[2];
+               ++nodeid.id[3];
            }
      }
      else {
@@ -531,6 +536,54 @@ void test50()
 }
 
 
+/* test process_table */
+void test51()
+{
+   printf("*** testbuckets - test51 start\n"); fflush(stdout);
+
+   ks_dht_node_t*  peer;
+   ks_dht_nodeid_t nodeid, nodeid2;
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+   memset(nodeid2.id,  0xef, KS_DHT_NODEID_SIZE);
+
+   char ipv6[] = "1234:1234:1234:1234";
+   char ipv4[] = "123.123.123.123";
+   unsigned short port = 7000;
+   enum ks_afflags_t both = ifboth;
+
+   ks_status_t status;
+
+   for (int i=0,i2=0; i<2; ++i, ++i2) {
+     if (i%20 == 0) {
+           nodeid.id[0] =  nodeid.id[0] / 2;
+           if(i2%20 == 0) {
+               i2 = 0;
+               nodeid.id[1] =  nodeid.id[1] / 2;
+           }
+           else {
+               ++nodeid.id[2];
+           }
+     }
+     else {
+           ++nodeid.id[1];
+     }
+     ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+     ks_dhtrt_touch_node(rt, nodeid);
+   }
+
+   for(int ix=0; ix<50; ++ix) {
+               ks_dhtrt_process_table(rt);     
+        ks_sleep(1000 * 1000 * 120);
+        printf("*** pulse ks_dhtrt_process_table\n");
+        if ( ix%2 == 0) ks_dhtrt_dump(rt, 7);
+   }
+
+       printf("*** testbuckets - test51 complete\n"); fflush(stdout);
+
+   return;  
+}
+
 
 int main(int argc, char* argv[]) {
 
@@ -554,8 +607,10 @@ int main(int argc, char* argv[]) {
 
    ks_init();
    ks_global_set_default_logger(7);
+        ks_dht_create(&dht, NULL, NULL);
+
 
-   ks_thread_pool_create(&tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+   ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
 
    ks_status_t status;
    char *str = NULL;
@@ -572,61 +627,67 @@ int main(int argc, char* argv[]) {
 
    printf("init/deinit routeable\n"); fflush(stdout);
 
-   ks_dhtrt_initroute(&rt, pool, tpool);
+   ks_dhtrt_initroute(&rt, dht, pool, tpool);
    ks_dhtrt_deinitroute(&rt);
 
 
    for(int tix=0; tix<argc; ++tix) {
 
          if (tests[tix] == 1) {
-               ks_dhtrt_initroute(&rt, pool, tpool);
+               ks_dhtrt_initroute(&rt, dht, pool, tpool);
                test01();
                ks_dhtrt_deinitroute(&rt);
         continue;
         }
 
         if (tests[tix] == 2) {
-               ks_dhtrt_initroute(&rt, pool, tpool);
+               ks_dhtrt_initroute(&rt, dht, pool, tpool);
                test02();
                ks_dhtrt_deinitroute(&rt);
         continue;
      }
 
      if (tests[tix] == 3) {  
-                ks_dhtrt_initroute(&rt, pool, tpool);
+                ks_dhtrt_initroute(&rt, dht, pool, tpool);
                test03();
                ks_dhtrt_deinitroute(&rt);
         continue; 
      }
 
      if (tests[tix] == 4) {
-               ks_dhtrt_initroute(&rt, pool, tpool);
+               ks_dhtrt_initroute(&rt, dht, pool, tpool);
                test04();
                ks_dhtrt_deinitroute(&rt);
         continue;
      }
 
      if (tests[tix] == 5) {
-               ks_dhtrt_initroute(&rt, pool, tpool);
+               ks_dhtrt_initroute(&rt, dht, pool, tpool);
                test05();
                ks_dhtrt_deinitroute(&rt);
         continue;
      }
 
      if (tests[tix] == 6) {
-               ks_dhtrt_initroute(&rt, pool, tpool);
+               ks_dhtrt_initroute(&rt, dht, pool, tpool);
                test06();
                ks_dhtrt_deinitroute(&rt);
         continue;
      }
 
      if (tests[tix] == 50) {
-        ks_dhtrt_initroute(&rt, pool, tpool);
+        ks_dhtrt_initroute(&rt, dht, pool, tpool);
         test50();
         ks_dhtrt_deinitroute(&rt);
         continue;
      }
 
+     if (tests[tix] == 51) {
+        ks_dhtrt_initroute(&rt, dht, pool, tpool);
+        test51();
+        ks_dhtrt_deinitroute(&rt);
+        continue;
+     }