]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: add threadpool to init_routetable
authorcolm <colm@freeswitch1>
Sat, 17 Dec 2016 02:06:38 +0000 (21:06 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:37 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c

index 0d53283b028768f3f2fb26920ae6718f90ada84c..a1b4c65fc353eea86cf2e3ba7adcfcb2f6553b4b 100644 (file)
@@ -533,7 +533,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
         * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
         */
        if (ep->addr.family == AF_INET) {
-               if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool)) != KS_STATUS_SUCCESS) goto done;
+               if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
                if ((ret = ks_dhtrt_create_node(dht->rt_ipv4,
                                                                                ep->nodeid,
                                                                                KS_DHT_LOCAL,
@@ -541,7 +541,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
                                                                                ep->addr.port,
                                                                                &ep->node)) != KS_STATUS_SUCCESS) goto done;
        } else {
-               if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
+               if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
                if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
                                                                                ep->nodeid,
                                                                                KS_DHT_LOCAL,
index 59311de80112344f5367ee9cd8d003ddb6bdc08e..c9f60ef1819b2340c93248a41a3fa3658ee8a8bd 100644 (file)
@@ -368,7 +368,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
  * route table methods
  *
  */
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool);
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool);
 KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
 
 KS_DECLARE(ks_status_t)        ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,
index 8d7ef6defceeeafd8ba7db2fc1b0cbc1cccb1e03..05828fe17c05f8bb437530c81a0e3b72cb15cc98 100644 (file)
@@ -42,7 +42,7 @@
 #define KS_DHTRT_INACTIVETIME  (15*60)  
 #define KS_DHTRT_MAXPING  3
 #define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)  
-
+#define KS_DHTRT_RECYCLE_NODE_THRESHOLD  100
 
 /* peer flags */
 #define DHTPEER_DUBIOUS 0
@@ -61,6 +61,7 @@ typedef struct ks_dhtrt_bucket_entry_s {
        uint8_t    inuse;
        uint8_t    outstanding_pings;
        uint8_t    flags;                                         /* active, suspect, expired */
+    uint8_t    touched;                   /* did we ever get a touch */
 } ks_dhtrt_bucket_entry_t;
 
 typedef struct ks_dhtrt_bucket_s {
@@ -92,10 +93,12 @@ typedef struct ks_dhtrt_deletednode_s {
 typedef struct ks_dhtrt_internal_s {
        uint8_t  localid[KS_DHT_NODEID_SIZE];
        ks_dhtrt_bucket_header_t *buckets;              /* root bucketheader */
+    ks_thread_pool_t       *tpool;
        ks_rwl_t                           *lock;                   /* lock for safe traversal of the tree */
        ks_time_t              last_process_table;
     ks_mutex_t             *deleted_node_lock;
     ks_dhtrt_deletednode_t *deleted_node;
+    ks_dhtrt_deletednode_t *free_nodes;
     uint32_t               deleted_count;
 } ks_dhtrt_internal_t;
 
@@ -148,6 +151,8 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t*
 static
 void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table); 
 
+static
+ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table);
 static
 ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
 static
@@ -182,7 +187,7 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
 /* # define KS_DHT_DEBUGPRINTFX_  very verbose */
 /* # define KS_DHT_DEBUGLOCKPRINTF_  debug locking */
 
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool) 
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool
 {
        unsigned char initmask[KS_DHT_NODEID_SIZE];
        memset(initmask, 0xff, sizeof(initmask));
@@ -192,6 +197,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
        ks_dhtrt_internal_t *internal =   ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
 
        ks_rwl_create(&internal->lock, pool);
+    internal->tpool = tpool;
     ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
        table->internal = internal;
 
@@ -235,14 +241,18 @@ KS_DECLARE(ks_status_t)    ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
     ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
     if (bentry != 0) {
                bentry->tyme = ks_time_now_sec();
+        
+        if (bentry->touched) {
+            bentry->flags = DHTPEER_ACTIVE;
+        }
+
                (*node) = bentry->gptr;
                ks_rwl_read_unlock(internal->lock);
                return KS_STATUS_SUCCESS;
     }
     ks_rwl_read_unlock(internal->lock);
 
-       /* @todo - replace with reusable memory pool */
-    ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
+    ks_dht_node_t *tnode = ks_dhtrt_make_node(table);
        tnode->table = table;
 
        for (int i = 0; i < 5; ++i) {
@@ -297,23 +307,12 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
 
                        ks_rwl_write_unlock(bucket->lock);
                }
-
        }
 
-    ks_rwl_read_unlock(internal->lock);   /* release write lock */
-    /* at this point no subsequent find/query will return the node - so we can
-       safely free it if we can grab the write lock
-       Having held the write lock on the bucket we know no other thread 
-       is awaiting a read/write lock on the node
-    */                      
+       ks_rwl_read_unlock(internal->lock);   /* release write lock */
+       /* at this point no subsequent find/query will return the node */
 
-    if (ks_rwl_try_write_lock(node->reflock) ==  KS_STATUS_SUCCESS) {      /* grab exclusive lock on node */
-               ks_rwl_destroy(&(node->reflock));
-               ks_pool_free(table->pool, &node);
-    }
-    else {
-        ks_dhtrt_queue_node_fordelete(table, node);
-       }
+       ks_dhtrt_queue_node_fordelete(table, node);
        return s;
 }
 
@@ -511,6 +510,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
                if (e != 0) { 
                        e->tyme = ks_time_now_sec();
                        e->outstanding_pings = 0;
+            e->touched = 1;
 
                        if (e->flags == DHTPEER_EXPIRED) {
                                --header->bucket->expired_count;
@@ -758,10 +758,13 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
        ks_dhtrt_internal_t *internal = table->internal;
 
     ks_time_t t0 = ks_time_now_sec();
+
     if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
                return;  
     } 
 
+    ks_log(KS_LOG_DEBUG,"process_table in progress\n");
+
        ks_rwl_read_lock(internal->lock);      /* grab read lock */
 
        ks_dhtrt_bucket_header_t *header = internal->buckets;
@@ -780,7 +783,6 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
                                char buf[100];
                                ks_log(KS_LOG_DEBUG,"process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
-                               //fflush(stdout);
 #endif
 
                                for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
@@ -819,7 +821,6 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
         char buf1[100];
         ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
-        //fflush(stdout);
 #endif
 
                                ks_rwl_write_unlock(b->lock);
@@ -829,7 +830,6 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 #ifdef  KS_DHT_DEBUGPRINTF_
                                char buf2[100];
                                ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
-                               //fflush(stdout);
 #endif
             }
                }
@@ -857,7 +857,8 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
        ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
        ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
 
-       while(deleted) {
+    /* reclaim excess memory */
+       while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
                ks_dht_node_t* node = deleted->node;
 
                if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {        
@@ -867,20 +868,20 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
             deleted = deleted->next;
             ks_pool_free(table->pool, &temp);
             --internal->deleted_count;
+
                        if (prev != NULL) {
                                prev->next = deleted;
                        }
             else {
                 internal->deleted_node = deleted;
-            }        
+            }   
+     
                }
         else {
             prev = deleted;
             deleted = prev->next;
         }
-      
        }
-
        ks_mutex_unlock(internal->deleted_node_lock);
 }
 
@@ -1313,15 +1314,47 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
 void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node)
 {
     ks_dhtrt_internal_t* internal = table->internal;
-    ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
-    deleted->node = node; 
-    ks_mutex_lock(internal->deleted_node_lock);
-    deleted->next = internal->deleted_node;
-    internal->deleted_node = deleted;
-    ++internal->deleted_count;
-    ks_mutex_unlock(internal->deleted_node_lock);
+       ks_mutex_lock(internal->deleted_node_lock);
+       ks_dhtrt_deletednode_t* deleted = internal->free_nodes;
+
+    if (deleted) {
+        internal->free_nodes = deleted->next;    
+    }
+    else {
+        deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
+    }
+
+    deleted->node = node;
+       deleted->next = internal->deleted_node;
+       internal->deleted_node = deleted;
+       ++internal->deleted_count;
+       ks_mutex_unlock(internal->deleted_node_lock);
 }
 
+ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
+{
+    ks_dht_node_t *node = NULL;
+       ks_dhtrt_internal_t *internal = table->internal;
+       ks_mutex_lock(internal->deleted_node_lock);
+
+    /* to to reuse a deleted node */
+       if (internal->deleted_count) {
+               ks_dhtrt_deletednode_t *deleted =  internal->deleted_node;
+        node = deleted->node;
+        memset(node, 0, sizeof(ks_dht_node_t));
+        internal->deleted_node = deleted->next; 
+        deleted->next =  internal->free_nodes;
+        internal->free_nodes = deleted;  
+        --internal->deleted_count;      
+     }
+     ks_mutex_unlock(internal->deleted_node_lock);
+
+     if (!node) {
+        node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
+     }
+
+     return node;
+}
 
 void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
        ++entry->outstanding_pings;