]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Thread safe dht buckets continued
authorcolm <colm@freeswitch1>
Thu, 15 Dec 2016 02:07:04 +0000 (21:07 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:36 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht_bucket.c

index 53f8a2e2b2040c69fa741ab797af050c7d393541..796b0ce8250a84292e425150fba95f39b954471f 100644 (file)
@@ -65,6 +65,7 @@ typedef struct ks_dhtrt_bucket_s {
        ks_dhtrt_bucket_entry_t  entries[KS_DHT_BUCKETSIZE];
        uint8_t           count;
        uint8_t           expired_count;
+    ks_rwl_t     *lock;           /* lock for safe traversal of the entry array */    
 } ks_dhtrt_bucket_t; 
 
 
@@ -81,11 +82,17 @@ typedef struct ks_dhtrt_bucket_header_s {
        unsigned char    flags;                   
 } ks_dhtrt_bucket_header_t;
 
+typedef struct ks_dhtrt_deletednode_s {
+    ks_dht_node_t*  node;
+    struct ks_dhtrt_deletednode_s *next;
+} ks_dhtrt_deletednode_t;
 
 typedef struct ks_dhtrt_internal_s {
        uint8_t  localid[KS_DHT_NODEID_SIZE];
        ks_dhtrt_bucket_header_t *buckets;              /* root bucketheader */
-       ks_rwl_t                           *lock;                 /* lock for safe traversal of the tree */ 
+       ks_rwl_t                           *lock;                   /* lock for safe traversal of the tree */
+    ks_mutex_t             *deleted_node_lock;
+    ks_dhtrt_deletednode_t *deleted_node;
 } ks_dhtrt_internal_t;
 
 typedef struct ks_dhtrt_xort_s {
@@ -132,6 +139,10 @@ static
 void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor);
 static  
 int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask);
+static
+void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node);
+static
+void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table); 
 
 static
 ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
@@ -177,6 +188,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
        ks_dhtrt_internal_t *internal =   ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
 
        ks_rwl_create(&internal->lock, pool);
+    ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
        table->internal = internal;
 
        /* initialize root bucket */
@@ -210,15 +222,20 @@ KS_DECLARE(ks_status_t)    ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
                                                                                           unsigned short port,
                                                                                           ks_dht_node_t **node) 
 {
+    ks_dhtrt_internal_t* internal = table->internal;
+    ks_rwl_read_lock(internal->lock);      /* grab write lock and insert */
+
     ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
     assert(header != NULL);             /* should always find a header */
 
     ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
     if (bentry != 0) {
-       bentry->type = ks_time_now_sec();
-       (*node) = bentry->gptr;
-       return KS_STATUS_SUCCESS;
+               bentry->type = ks_time_now_sec();
+               (*node) = bentry->gptr;
+               ks_rwl_read_unlock(internal->lock);
+               return KS_STATUS_SUCCESS;
     }
+    ks_rwl_read_unlock(internal->lock);
 
        /* @todo - replace with reusable memory pool */
     ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
@@ -241,10 +258,7 @@ KS_DECLARE(ks_status_t)     ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
         return KS_STATUS_FAIL;
     }
 
-    ks_dhtrt_internal_t* internal = table->internal;
-    ks_rwl_write_lock(internal->lock);      /* grab write lock and insert */
     ks_status_t s = ks_dhtrt_insert_node(table, tnode);
-    ks_rwl_write_unlock(internal->lock);    /* release write lock */
        
        (*node) = tnode;
 
@@ -255,39 +269,56 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
 {
     ks_status_t s =  KS_STATUS_FAIL;
     ks_dhtrt_internal_t* internal = table->internal;
-    ks_rwl_write_lock(internal->lock);      /* grab write lock and delete */
+    ks_rwl_read_lock(internal->lock);      /* grab read lock */
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
 
        if (header != 0) {
                ks_dhtrt_bucket_t *bucket = header->bucket;
 
                if (bucket != 0) {                       /* we found a bucket*/
+                       ks_rwl_write_lock(bucket->lock);
                        s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
+                       ks_rwl_write_unlock(bucket->lock);
                }
 
        }
 
-    ks_rwl_write_unlock(internal->lock);   /* release write lock */
+    ks_rwl_read_unlock(internal->lock);   /* release write lock */
+    /* at this point no subsequent find/query will return the node - so we can
+       safely free it if we can grab the write lock
+       Having held the write lock on the bucket we know no other thread 
+       is awaiting a read/write lock on the node
+    */                      
 
-    ks_rwl_destroy(&(node->reflock));
-       ks_pool_free(table->pool, node);
+    if (ks_rwl_try_write_lock(node->reflock) ==  KS_STATUS_SUCCESS) {      /* grab exclusive lock on node */
+               ks_rwl_destroy(&(node->reflock));
+               ks_pool_free(table->pool, node);
+    }
+    else {
+        ks_dhtrt_queue_node_fordelete(table, node);
+       }
        return s;
 }
 
 static
 ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
 {
+    ks_dhtrt_internal_t* internal = table->internal;
        ks_dhtrt_bucket_t *bucket = 0;
        int insanity = 0;
 
+    ks_rwl_write_lock(internal->lock);
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); 
     assert(header != NULL);             /* should always find a header */ 
 
        bucket = header->bucket;
 
        if (bucket == 0) {
+       ks_rwl_write_unlock(internal->lock);       
        return  KS_STATUS_FAIL;  /* we were not able to find a bucket*/
     }
+
+       ks_rwl_write_lock(bucket->lock);
        
        while (bucket->count == KS_DHT_BUCKETSIZE) {
                if (insanity > 3200) assert(insanity < 3200);
@@ -295,8 +326,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                /* first - seek a stale entry to eject */
                if (bucket->expired_count) {
                        ks_status_t s = ks_dhtrt_insert_id(bucket, node);
-
-                       if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
+                       if (s == KS_STATUS_SUCCESS) {
+                                ks_rwl_write_unlock(bucket->lock);
+                 ks_rwl_write_unlock(internal->lock);
+                                return KS_STATUS_SUCCESS;
+                       }
                }
 
                /* 
@@ -310,6 +344,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                        char buffer[100];
                        printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, buffer));
 #endif
+               ks_rwl_write_unlock(bucket->lock);
+            ks_rwl_write_unlock(internal->lock);
                        return KS_STATUS_FAIL;
                }
                        
@@ -323,6 +359,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                        char buffer[100];
                        printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, buffer));
 #endif
+                       ks_rwl_write_unlock(bucket->lock);
+            ks_rwl_write_unlock(internal->lock);
                        return KS_STATUS_FAIL;
                }
 
@@ -343,9 +381,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                /* which bucket do care about */
                if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) {
                        bucket = newleft->bucket;
+            ks_rwl_write_lock(bucket->lock);                   /* lock new bucket */
+            ks_rwl_write_unlock(header->right->bucket->lock);   /* unlock old bucket */
                        header = newleft;
                } else {
                        bucket = newright->bucket;
+            /* note: we still hold a lock on the bucket */
                        header = newright;
                }
                ++insanity;
@@ -357,20 +398,49 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
        printf("into bucket %s\n",      ks_dhtrt_printableid(header->mask, buffer));
 #endif
 
-       /* by this point we have a viable bucket */
-       return ks_dhtrt_insert_id(bucket, node);
+       /* by this point we have a viable & locked bucket
+       so downgrade the internal lock to read.  safe as we hold the bucket write lock
+       preventing it being sptlit under us.
+    */
+    ks_rwl_write_unlock(internal->lock);    
+    ks_rwl_read_lock(internal->lock);
+
+       ks_status_t s = ks_dhtrt_insert_id(bucket, node);
+    ks_rwl_read_unlock(internal->lock);
+    ks_rwl_write_unlock(bucket->lock);
+    return s;
 }
 
-KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) {
+KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) 
+{
+
+       ks_dht_node_t* node = NULL;
+
+    ks_dhtrt_internal_t* internal = table->internal;
+    ks_rwl_read_lock(internal->lock);      /* grab read lock */
+
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
 
-       if (header == 0) return NULL;
+       if (header != 0) {
 
-       ks_dhtrt_bucket_t *bucket = header->bucket;
+               ks_dhtrt_bucket_t *bucket = header->bucket;
+
+               if (bucket != 0) {                       /* probably a logic error ?*/
+
+                       ks_rwl_read_lock(bucket->lock);
+                       ks_dht_node_t* node = ks_dhtrt_find_nodeid(bucket, nodeid.id);
+    
+                       if (node != NULL) {
+                               ks_rwl_read_lock(node->reflock);
+                       }
+
+                       ks_rwl_read_unlock(bucket->lock);
+               }
 
-       if (bucket == 0) return NULL;    /* probably a logic error ?*/
+       }
 
-       return ks_dhtrt_find_nodeid(bucket, nodeid.id);
+    ks_rwl_read_unlock(internal->lock);
+       return node;  
 }
 
 KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dht_nodeid_t nodeid) 
@@ -378,9 +448,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
     ks_status_t s = KS_STATUS_FAIL;
        ks_dhtrt_internal_t* internal = table->internal;
        ks_rwl_read_lock(internal->lock);      /* grab read lock */
+
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
 
        if (header != 0 && header->bucket != 0) {
+               ks_rwl_write_lock(header->bucket->lock);
                ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
 
                if (e != 0) { 
@@ -394,9 +466,9 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
                        e->flags = DHTPEER_ACTIVE;
                    s = KS_STATUS_SUCCESS;
                }
-
+               ks_rwl_write_unlock(header->bucket->lock);
        }
-       ks_rwl_read_lock(internal->lock);      /* release read lock */
+       ks_rwl_read_unlock(internal->lock);      /* release read lock */
        return s;
 }
 
@@ -407,15 +479,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table,        ks_dh
        ks_rwl_read_lock(internal->lock);      /* grab read lock */
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
 
-       if (header != 0) {
-
+       if (header != 0 && header->bucket != 0) {
+        ks_rwl_write_lock(header->bucket->lock);   
                ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
 
                if (e != 0) {
                        e->flags = DHTPEER_EXPIRED;
                        s = KS_STATUS_SUCCESS;
                }
-
+               ks_rwl_write_unlock(header->bucket->lock);
        }
        ks_rwl_read_unlock(internal->lock);      /* release read lock */
        return s;
@@ -536,7 +608,6 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
 
                        if (lheader) {            
                                xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
-                               memset(xortn, 0, sizeof(ks_dhtrt_sortedxors_t));
 
                                if (tofree == 0)   tofree = xortn;
 
@@ -558,7 +629,6 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
 
                        if (rheader) {
                                xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
-                               memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t));
                                prev->next = xortn1;
                                prev = xortn1;
                                cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
@@ -598,7 +668,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
 KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node)
 {
     return KS_STATUS_SUCCESS; 
-    /* return ks_rwl_read_unlock(node->reflock);*/
+    return ks_rwl_read_unlock(node->reflock);
 }
 
 
@@ -630,33 +700,66 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
                stack[stackix++] = header;
 
                if (header->bucket) {
+
                        ks_dhtrt_bucket_t *b = header->bucket;
 
-                       for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-                               ks_dhtrt_bucket_entry_t *e =  &b->entries[ix];
+                       if (ks_rwl_try_write_lock(b->lock) == KS_STATUS_SUCCESS) {
 
-                               if (e->inuse == 1) {
-                                       /* more than n pings outstanding? */
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+        char buf[100];
+        printf("process_table: LOCKING bucket %s\n",
+               ks_dhtrt_printableid(header->mask, buf));
+        fflush(stdout);
+#endif
 
-                                       if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
-                                               e->flags =      DHTPEER_EXPIRED; 
-                                               ++b->expired_count;
-                                               continue;
-                                       }
 
-                                       if (e->flags == DHTPEER_SUSPECT) {
-                                               ks_dhtrt_ping(e); 
-                                               continue;
-                                       }
+                               for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
+                                       ks_dhtrt_bucket_entry_t *e =  &b->entries[ix];
 
-                                       ks_time_t tdiff = t0 - e->tyme;
+                                       if (e->inuse == 1) {
+                                               /* more than n pings outstanding? */
 
-                                       if (tdiff > KS_DHTRT_INACTIVETIME) {
-                                               e->flags = DHTPEER_SUSPECT;
-                                               ks_dhtrt_ping(e);
-                                       }
-                               }
-                       }       /* end for each bucket_entry */
+                                               if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
+                                                       e->flags =      DHTPEER_EXPIRED; 
+                                                       ++b->expired_count;
+                                                       continue;
+                                               }
+
+                                               if (e->flags == DHTPEER_SUSPECT) {
+                                                       ks_dhtrt_ping(e); 
+                                                       continue;
+                                               }
+
+                                               ks_time_t tdiff = t0 - e->tyme;
+
+                                               if (tdiff > KS_DHTRT_INACTIVETIME) {
+                                                       e->flags = DHTPEER_SUSPECT;
+                                                       ks_dhtrt_ping(e);
+                                               }
+
+                                       }  /* end if e->inuse */
+
+                               }       /* end for each bucket_entry */
+
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+        char buf1[100];
+        printf("process_table: UNLOCKING bucket %s\n",
+               ks_dhtrt_printableid(header->mask, buf1));
+        fflush(stdout);
+#endif
+
+                               ks_rwl_write_unlock(b->lock);
+
+                       }   /* end of if trywrite_lock successful */
+            else {
+#ifdef  KS_DHT_DEBUGPRINTF_
+        char buf2[100];
+        printf("process_table: unble to LOCK bucket %s\n",
+               ks_dhtrt_printableid(header->mask, buf2));
+        fflush(stdout);
+#endif
+
+            }
                }
 
                header = header->left;
@@ -668,9 +771,46 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
                }
        }
     ks_rwl_read_unlock(internal->lock);      /* release read lock */
+
+    ks_dhtrt_process_deleted(table);
+
        return;
 }
 
+void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
+{
+       ks_dhtrt_internal_t* internal = table->internal;
+       ks_mutex_lock(internal->deleted_node_lock);
+
+       ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
+       ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
+
+       while(deleted) {
+               ks_dht_node_t* node = deleted->node;
+
+               if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {        
+               ks_rwl_destroy(&(node->reflock));
+               ks_pool_free(table->pool, node);
+            temp = deleted;
+            deleted = deleted->next;
+            ks_pool_free(table->pool, temp);
+                       if (prev != NULL) {
+                               prev->next = deleted;
+                       }
+            else {
+                internal->deleted_node = deleted;
+            }        
+               }
+        else {
+            prev = deleted;
+            deleted = prev->next;
+        }
+      
+       }
+
+       ks_mutex_unlock(internal->deleted_node_lock);
+}
+
 
 KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
        /* dump buffer headers */
@@ -728,7 +868,6 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
 {
        ks_dhtrt_bucket_header_t *header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t));
 
-       memset(header, 0, sizeof(ks_dhtrt_bucket_header_t));
        memcpy(header->mask, mask, sizeof(header->mask));  
        header->parent = parent;   
 
@@ -745,9 +884,7 @@ static
 ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool)
 {
        ks_dhtrt_bucket_t *bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t));
-
-       memset(bucket, 0, sizeof(ks_dhtrt_bucket_t));
-       /*ks_rwl_create(&bucket->lock, pool);*/
+       ks_rwl_create(&bucket->lock, pool);
        return bucket;
 }
 
@@ -995,6 +1132,15 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
                
        }
 
+    ks_rwl_read_lock(bucket->lock);    /* get a read lock : released in load_query when the results are copied */
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+        char buf[100];
+        printf("closestbucketnodes: LOCKING bucket %s\n",
+               ks_dhtrt_printableid(header->mask, buf));
+        fflush(stdout);
+#endif
+    
+
        for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
            if ( bucket->entries[ix].inuse == 1                              &&
              (family == ifboth || bucket->entries[ix].family == family)  &&
@@ -1064,14 +1210,35 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
                        query->nodes[ix] = current->bheader->bucket->entries[z].gptr;
             xorix =  current->xort[xorix].nextix;
                        ++loaded;
-               }                       
+               }
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+        char buf1[100];
+        printf("load_query: UNLOCKING bucket %s\n",
+               ks_dhtrt_printableid(current->bheader->mask, buf1));
+        fflush(stdout);
+#endif
+           ks_rwl_read_unlock(current->bheader->bucket->lock); /* release the read lock from findclosest_bucketnodes */
+                       
                if (loaded >= query->max) break;
                current = current->next;
        }
        query->count = loaded;
+
        return loaded;
 }
 
+void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node)
+{
+    ks_dhtrt_internal_t* internal = table->internal;
+    ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
+    deleted->node = node; 
+    ks_mutex_lock(internal->deleted_node_lock);
+    deleted->next = internal->deleted_node;
+    internal->deleted_node = deleted;
+    ks_mutex_unlock(internal->deleted_node_lock);
+}
+
+
 void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
        ++entry->outstanding_pings;
        /* @todo */