]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Exclude non-active nodes from dhtrt_find_node
authorcolm <colm@freeswitch1>
Mon, 19 Dec 2016 21:14:23 +0000 (16:14 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:37 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/test/testbuckets.c

index 05828fe17c05f8bb437530c81a0e3b72cb15cc98..5577b49dc91f0cdcc08e444cda18573be2cfa106 100644 (file)
@@ -98,7 +98,7 @@ typedef struct ks_dhtrt_internal_s {
        ks_time_t              last_process_table;
     ks_mutex_t             *deleted_node_lock;
     ks_dhtrt_deletednode_t *deleted_node;
-    ks_dhtrt_deletednode_t *free_nodes;
+    ks_dhtrt_deletednode_t *free_node_ex;
     uint32_t               deleted_count;
 } ks_dhtrt_internal_t;
 
@@ -184,10 +184,14 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
 
 /* debugging */
 #define KS_DHT_DEBUGPRINTF_
-/* # define KS_DHT_DEBUGPRINTFX_  very verbose */
-/* # define KS_DHT_DEBUGLOCKPRINTF_  debug locking */
-
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool) 
+/* very verbose                   */
+/* # define KS_DHT_DEBUGPRINTFX_  */
+/* debug locking                  */
+/* # define KS_DHT_DEBUGLOCKPRINTF_  */
+
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
+                                           ks_pool_t *pool, 
+                                           ks_thread_pool_t* tpool) 
 {
        unsigned char initmask[KS_DHT_NODEID_SIZE];
        memset(initmask, 0xff, sizeof(initmask));
@@ -232,6 +236,7 @@ KS_DECLARE(ks_status_t)      ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
                                                                                           unsigned short port,
                                                                                           ks_dht_node_t **node) 
 {
+    ks_dht_node_t *tnode;
     ks_dhtrt_internal_t* internal = table->internal;
     ks_rwl_read_lock(internal->lock);      /* grab write lock and insert */
 
@@ -246,13 +251,15 @@ KS_DECLARE(ks_status_t)    ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
             bentry->flags = DHTPEER_ACTIVE;
         }
 
-               (*node) = bentry->gptr;
+               tnode = bentry->gptr;
+        ks_rwl_read_lock( tnode->reflock);
                ks_rwl_read_unlock(internal->lock);
+        (*node) = tnode;
                return KS_STATUS_SUCCESS;
     }
     ks_rwl_read_unlock(internal->lock);
 
-    ks_dht_node_t *tnode = ks_dhtrt_make_node(table);
+    tnode = ks_dhtrt_make_node(table);
        tnode->table = table;
 
        for (int i = 0; i < 5; ++i) {
@@ -274,7 +281,11 @@ KS_DECLARE(ks_status_t)     ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
     }
 
     ks_status_t s = ks_dhtrt_insert_node(table, tnode);
-       
+
+    if (tnode && s == KS_STATUS_SUCCESS) {
+               ks_rwl_read_lock( tnode->reflock);
+       }
+
        (*node) = tnode;
 
        return s;
@@ -466,7 +477,7 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
 
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
             char buf[100];
-            ks_log(KS_LOG_DEBUG, "Insert node: read LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+            ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
             //fflush(stdout);
 #endif
 
@@ -477,7 +488,7 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
                                ks_rwl_read_lock(node->reflock);
                        }
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
-            ks_log(KS_LOG_DEBUG, "Insert node: read UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+            ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
             //fflush(stdout);
 #endif
                        ks_rwl_read_unlock(bucket->lock);
@@ -857,10 +868,19 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
        ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
        ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
 
+#ifdef  KS_DHT_DEBUGPRINTF_
+    ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count);
+#endif
+
+
     /* reclaim excess memory */
        while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
                ks_dht_node_t* node = deleted->node;
 
+#ifdef  KS_DHT_DEBUGPRINTFX__
+           ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
+#endif
+
                if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {        
                ks_rwl_destroy(&(node->reflock));
                ks_pool_free(table->pool, &node);
@@ -868,7 +888,9 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
             deleted = deleted->next;
             ks_pool_free(table->pool, &temp);
             --internal->deleted_count;
-
+#ifdef  KS_DHT_DEBUGPRINTFX_
+                       ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count);                   
+#endif
                        if (prev != NULL) {
                                prev->next = deleted;
                        }
@@ -878,10 +900,18 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
      
                }
         else {
+#ifdef  KS_DHT_DEBUGPRINTFX__
+            ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
+#endif
             prev = deleted;
             deleted = prev->next;
         }
        }
+
+#ifdef  KS_DHT_DEBUGPRINTF_
+    ks_log(KS_LOG_DEBUG, "ALLOC process_deleted exit: internal->deleted_count %d\n", internal->deleted_count);
+#endif
+
        ks_mutex_unlock(internal->deleted_node_lock);
 }
 
@@ -913,7 +943,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
                                        memset(buffer, 0, 100);
                                        if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
                                        else strcpy(buffer, "<free>");
-                                       ks_log(KS_LOG_DEBUG, "     slot %d: %s\n", ix, buffer);
+                                       ks_log(KS_LOG_DEBUG, "     slot %d: %d %s\n", ix, b->entries[ix].flags,  buffer);
                                }
 
                                ks_log(KS_LOG_DEBUG, "   --------------------------\n\n");
@@ -947,8 +977,8 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
 
 #ifdef KS_DHT_DEBUGPRINTF_
        char buffer[100];
-       ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer));
-       if (parent) ks_log(KS_LOG_DEBUG, "from parent mask: %s ",  ks_dhtrt_printableid(parent->mask, buffer));
+       ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer));
+       if (parent) ks_log(KS_LOG_DEBUG, "  ... from parent mask: %s\n",  ks_dhtrt_printableid(parent->mask, buffer));
        printf("\n"); 
 #endif
        return header;
@@ -1049,7 +1079,7 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
        char buffer[100];
        ks_log(KS_LOG_DEBUG, "\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer));
        ks_log(KS_LOG_DEBUG, " into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count);
-       ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
+       ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
 #endif
        return;
 }
@@ -1138,7 +1168,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t
        for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
 #ifdef KS_DHT_DEBUGPRINTFX_
                char bufferx[100];
-               if ( bucket->entries[ix].inuse == 1) {
+               if ( bucket->entries[ix].inuse == 1 && bucket->entries[ix].flags == DHTPEER_ACTIVE ) {
                        ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%x\n", ix,
                                   ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
                                   bucket->entries[ix].inuse  );
@@ -1162,7 +1192,7 @@ ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
 
        for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
 #ifdef KS_DHT_DEBUGPRINTFX_
-               char bufferx[100];_
+               char bufferx[100];
                ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%c\n", ix,
                           ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
                           bucket->entries[ix].inuse  );
@@ -1315,19 +1345,22 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t*
 {
     ks_dhtrt_internal_t* internal = table->internal;
        ks_mutex_lock(internal->deleted_node_lock);
-       ks_dhtrt_deletednode_t* deleted = internal->free_nodes;
+       ks_dhtrt_deletednode_t* deleted = internal->free_node_ex;   /* grab a free stub */
 
     if (deleted) {
-        internal->free_nodes = deleted->next;    
+        internal->free_node_ex = deleted->next;    
     }
     else {
         deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
     }
 
     deleted->node = node;
-       deleted->next = internal->deleted_node;
-       internal->deleted_node = deleted;
+       deleted->next = internal->deleted_node;  
+       internal->deleted_node = deleted;                         /* add to deleted queue */
        ++internal->deleted_count;
+#ifdef  KS_DHT_DEBUGPRINTFX_
+       ks_log(KS_LOG_DEBUG, "ALLOC: Queue for delete %d\n", internal->deleted_count);
+#endif
        ks_mutex_unlock(internal->deleted_node_lock);
 }
 
@@ -1340,12 +1373,15 @@ ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
     /* to to reuse a deleted node */
        if (internal->deleted_count) {
                ks_dhtrt_deletednode_t *deleted =  internal->deleted_node;
-        node = deleted->node;
+        node = deleted->node;                        /* take the node */
         memset(node, 0, sizeof(ks_dht_node_t));
+        deleted->node = 0;                           /* avoid accidents */     
         internal->deleted_node = deleted->next; 
-        deleted->next =  internal->free_nodes;
-        internal->free_nodes = deleted;  
-        --internal->deleted_count;      
+        deleted->next =  internal->free_node_ex;     /* save the stub for reuse */
+        --internal->deleted_count; 
+#ifdef  KS_DHT_DEBUGPRINTFX_
+        ks_log(KS_LOG_DEBUG, "ALLOC: Reusing a node struct %d\n", internal->deleted_count);
+#endif     
      }
      ks_mutex_unlock(internal->deleted_node_lock);
 
index 279354214170683093302fdb783a543582c05802..43fd1b2136a2bca391ed544c9351d3f4e220257f 100644 (file)
@@ -447,27 +447,108 @@ void test06()
 }
 
 
+/* test resue of node memory */
+void test50()
+{
+   printf("*** testbuckets - test50 start\n"); fflush(stdout);
+
+   ks_dht_node_t*  peer;
+   ks_dht_nodeid_t nodeid, nodeid2;
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+   memset(nodeid2.id,  0xef, KS_DHT_NODEID_SIZE);
+
+   char ipv6[] = "1234:1234:1234:1234";
+   char ipv4[] = "123.123.123.123";
+   unsigned short port = 7000;
+   enum ks_afflags_t both = ifboth;
+
+   ks_status_t status;
+
+   for (int i=0,i2=0; i<200; ++i, ++i2) {
+     if (i%20 == 0) {
+           nodeid.id[0] =  nodeid.id[0] / 2;
+           if(i2%20 == 0) {
+               i2 = 0;
+               nodeid.id[1] =  nodeid.id[1] / 2;
+           }
+           else {
+               ++nodeid.id[2];
+           }
+     }
+     else {
+           ++nodeid.id[1];
+     }
+     ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+     ks_dhtrt_touch_node(rt, nodeid);
+   }
+
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+   for (int i=0,i2=0; i<200; ++i, ++i2) {
+     if (i%20 == 0) { 
+           nodeid.id[0] =  nodeid.id[0] / 2;
+           if(i2%20 == 0) {
+               i2 = 0; 
+               nodeid.id[1] =  nodeid.id[1] / 2;
+           }
+           else {
+               ++nodeid.id[2];
+           }
+     }
+     else {
+           ++nodeid.id[1];
+     }
+     ks_dht_node_t *n = ks_dhtrt_find_node(rt, nodeid);
+     if (n != NULL) {
+        ks_dhtrt_release_node(n);
+               ks_dhtrt_delete_node(rt, n);
+        }
+   }
+
+   ks_dhtrt_process_table(rt);
+
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+
+   for (int i=0,i2=0; i<200; ++i, ++i2) {
+     if (i%20 == 0) {
+           nodeid.id[0] =  nodeid.id[0] / 2;
+           if(i2%20 == 0) {
+               i2 = 0;
+               nodeid.id[1] =  nodeid.id[1] / 2;
+           }
+           else {
+               ++nodeid.id[2];
+           }
+     }
+     else {
+           ++nodeid.id[1];
+     }
+     ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+     ks_dhtrt_touch_node(rt, nodeid);
+   }
+
+       printf("*** testbuckets - test50 start\n"); fflush(stdout);
+       return;
+}
+
+
+
 int main(int argc, char* argv[]) {
 
    printf("testdhtbuckets - start\n");
 
-   int tests[10];
+   int tests[100];
   
        if (argc == 0) {
+      tests[0] = 1;
       tests[1] = 1;
       tests[2] = 1;
       tests[3] = 1;
       tests[4] = 1;
-      tests[5] = 1;
-      tests[6] = 0;
-      tests[7] = 0;
-      tests[8] = 0;
-      tests[9] = 0;
        }
        else {
-               for(int tix=1; tix<10 && tix<argc; ++tix) {
+               for(int tix=1; tix<100 && tix<argc; ++tix) {
                        long i = strtol(argv[tix], NULL, 0);
-                       tests[i] = 1;
+                       tests[tix] = i;
                }
        }
 
@@ -494,41 +575,65 @@ int main(int argc, char* argv[]) {
    ks_dhtrt_initroute(&rt, pool, tpool);
    ks_dhtrt_deinitroute(&rt);
 
-   if (tests[1] == 1) {
-       ks_dhtrt_initroute(&rt, pool, tpool);
-       test01();
-       ks_dhtrt_deinitroute(&rt);
-   }
 
-   if (tests[2] == 1) {
-       ks_dhtrt_initroute(&rt, pool, tpool);
-       test02();
-       ks_dhtrt_deinitroute(&rt);
-   }
+   for(int tix=0; tix<argc; ++tix) {
+
+         if (tests[tix] == 1) {
+               ks_dhtrt_initroute(&rt, pool, tpool);
+               test01();
+               ks_dhtrt_deinitroute(&rt);
+        continue;
+        }
+
+        if (tests[tix] == 2) {
+               ks_dhtrt_initroute(&rt, pool, tpool);
+               test02();
+               ks_dhtrt_deinitroute(&rt);
+        continue;
+     }
+
+     if (tests[tix] == 3) {  
+                ks_dhtrt_initroute(&rt, pool, tpool);
+               test03();
+               ks_dhtrt_deinitroute(&rt);
+        continue; 
+     }
+
+     if (tests[tix] == 4) {
+               ks_dhtrt_initroute(&rt, pool, tpool);
+               test04();
+               ks_dhtrt_deinitroute(&rt);
+        continue;
+     }
+
+     if (tests[tix] == 5) {
+               ks_dhtrt_initroute(&rt, pool, tpool);
+               test05();
+               ks_dhtrt_deinitroute(&rt);
+        continue;
+     }
+
+     if (tests[tix] == 6) {
+               ks_dhtrt_initroute(&rt, pool, tpool);
+               test06();
+               ks_dhtrt_deinitroute(&rt);
+        continue;
+     }
+
+     if (tests[tix] == 50) {
+        ks_dhtrt_initroute(&rt, pool, tpool);
+        test50();
+        ks_dhtrt_deinitroute(&rt);
+        continue;
+     }
 
-   if (tests[3] == 1) {  
-    ks_dhtrt_initroute(&rt, pool, tpool);
-       test03();
-       ks_dhtrt_deinitroute(&rt);
-   }
 
-   if (tests[4] == 1) {
-       ks_dhtrt_initroute(&rt, pool, tpool);
-       test04();
-       ks_dhtrt_deinitroute(&rt);
-   }
 
-   if (tests[5] == 1) {
-       ks_dhtrt_initroute(&rt, pool, tpool);
-       test05();
-       ks_dhtrt_deinitroute(&rt);
-   }
 
-   if (tests[6] == 1) {
-       ks_dhtrt_initroute(&rt, pool, tpool);
-    test06();
-    ks_dhtrt_deinitroute(&rt);
    }
+
+
+
    return 0;
 
 }