]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Thread safe dht buckets
authorcolm <colm@freeswitch1>
Wed, 14 Dec 2016 01:47:55 +0000 (20:47 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:36 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht_bucket.c

index 3a68bceadc699e8f697a25e64a5999749c2687c5..53f8a2e2b2040c69fa741ab797af050c7d393541 100644 (file)
@@ -65,8 +65,6 @@ typedef struct ks_dhtrt_bucket_s {
        ks_dhtrt_bucket_entry_t  entries[KS_DHT_BUCKETSIZE];
        uint8_t           count;
        uint8_t           expired_count;
-       ks_rwl_t *        lock;
-       uint8_t           locked; 
 } ks_dhtrt_bucket_t; 
 
 
@@ -88,7 +86,6 @@ typedef struct ks_dhtrt_internal_s {
        uint8_t  localid[KS_DHT_NODEID_SIZE];
        ks_dhtrt_bucket_header_t *buckets;              /* root bucketheader */
        ks_rwl_t                           *lock;                 /* lock for safe traversal of the tree */ 
-       uint8_t                             locked;             
 } ks_dhtrt_internal_t;
 
 typedef struct ks_dhtrt_xort_s {
@@ -141,11 +138,14 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 static
 ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node);
 static
-void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
+ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
 static
 char *ks_dhtrt_printableid(uint8_t *id, char *buffer);
 static
 unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t *entry);
+
+static
+uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query);
 static
 uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t *xort);
 static
@@ -176,7 +176,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
 
        ks_dhtrt_internal_t *internal =   ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
 
-       /*ks_rwl_create(&internal->lock, pool);*/
+       ks_rwl_create(&internal->lock, pool);
        table->internal = internal;
 
        /* initialize root bucket */
@@ -236,32 +236,42 @@ KS_DECLARE(ks_status_t)    ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
     tnode->type = type;
 
     if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
-        ( ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS)                 ||
-        ( ks_rwl_create(&tnode->reflock, table->pool) !=  KS_STATUS_SUCCESS))   {
+        ( ks_rwl_create(&tnode->reflock, table->pool) !=  KS_STATUS_SUCCESS))       {
         ks_pool_free(table->pool, tnode);
         return KS_STATUS_FAIL;
     }
+
+    ks_dhtrt_internal_t* internal = table->internal;
+    ks_rwl_write_lock(internal->lock);      /* grab write lock and insert */
+    ks_status_t s = ks_dhtrt_insert_node(table, tnode);
+    ks_rwl_write_unlock(internal->lock);    /* release write lock */
        
        (*node) = tnode;
 
-       return KS_STATUS_SUCCESS;
+       return s;
 }
 
 KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
 {
+    ks_status_t s =  KS_STATUS_FAIL;
+    ks_dhtrt_internal_t* internal = table->internal;
+    ks_rwl_write_lock(internal->lock);      /* grab write lock and delete */
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
 
        if (header != 0) {
                ks_dhtrt_bucket_t *bucket = header->bucket;
 
-               if (bucket != 0) {                       /* we were not able to find a bucket*/
-                       ks_dhtrt_delete_id(bucket, node->nodeid.id);
+               if (bucket != 0) {                       /* we found a bucket*/
+                       s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
                }
+
        }
 
-    ks_rwl_destroy(&node->reflock);
+    ks_rwl_write_unlock(internal->lock);   /* release write lock */
+
+    ks_rwl_destroy(&(node->reflock));
        ks_pool_free(table->pool, node);
-       return KS_STATUS_SUCCESS;
+       return s;
 }
 
 static
@@ -365,40 +375,64 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
 
 KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dht_nodeid_t nodeid) 
 {
+    ks_status_t s = KS_STATUS_FAIL;
+       ks_dhtrt_internal_t* internal = table->internal;
+       ks_rwl_read_lock(internal->lock);      /* grab read lock */
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
 
-       if (header == 0) return KS_STATUS_FAIL;
-       if (header->bucket == 0)  return KS_STATUS_FAIL;
+       if (header != 0 && header->bucket != 0) {
+               ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
 
-       ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
+               if (e != 0) { 
+                       e->tyme = ks_time_now();
+                       e->outstanding_pings = 0;
 
-       if (e != 0) { 
-               e->tyme = ks_time_now();
-               e->outstanding_pings = 0;
-               if (e->flags ==  DHTPEER_EXPIRED)  --header->bucket->expired_count;
-               e->flags = DHTPEER_ACTIVE;
-               return KS_STATUS_SUCCESS;
-       }
+                       if (e->flags ==  DHTPEER_EXPIRED) {
+                               --header->bucket->expired_count;
+                       }
 
-       return KS_STATUS_FAIL;
+                       e->flags = DHTPEER_ACTIVE;
+                   s = KS_STATUS_SUCCESS;
+               }
+
+       }
+       ks_rwl_read_lock(internal->lock);      /* release read lock */
+       return s;
 }
 
 KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table,     ks_dht_nodeid_t nodeid)
 {
+    ks_status_t s = KS_STATUS_FAIL;
+    ks_dhtrt_internal_t *internal = table->internal;
+       ks_rwl_read_lock(internal->lock);      /* grab read lock */
        ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
 
-       if (header == 0) return KS_STATUS_FAIL;
+       if (header != 0) {
 
-       ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
+               ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
+
+               if (e != 0) {
+                       e->flags = DHTPEER_EXPIRED;
+                       s = KS_STATUS_SUCCESS;
+               }
 
-       if (e != 0) {
-               e->flags = DHTPEER_EXPIRED;
-               return KS_STATUS_SUCCESS;
        }
-       return KS_STATUS_FAIL;
+       ks_rwl_read_unlock(internal->lock);      /* release read lock */
+       return s;
 }
 
-KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) 
+KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
+{
+       uint8_t count = 0;
+       ks_dhtrt_internal_t *internal = table->internal;
+       ks_rwl_read_lock(internal->lock);      /* grab read lock */
+       count = ks_dhtrt_findclosest_locked_nodes(table, query);
+       ks_rwl_read_unlock(internal->lock);      /* release read lock */
+       return count;
+}
+
+static
+uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) 
 {
        uint8_t max = query->max;
        uint8_t total = 0;
@@ -416,9 +450,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
        printf(" starting at mask: %s\n",  ks_dhtrt_printableid(header->mask, buffer));
 #endif
 
-
        ks_dhtrt_sortedxors_t xort0;
-
        memset(&xort0, 0 , sizeof(xort0));
 
        ks_dhtrt_nodeid_t initid;
@@ -586,6 +618,9 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
        /*                                                                                                      */
 
        ks_dhtrt_internal_t *internal = table->internal;
+
+       ks_rwl_read_lock(internal->lock);      /* grab read lock */
+
        ks_dhtrt_bucket_header_t *header = internal->buckets;
        ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
        int stackix=0;
@@ -632,6 +667,7 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
                        header = header->right;
                }
        }
+    ks_rwl_read_unlock(internal->lock);      /* release read lock */
        return;
 }
 
@@ -645,6 +681,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
        ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
        int stackix = 0;
 
+    ks_rwl_read_lock(internal->lock);      /* grab read lock */
        while (header) {
                stack[stackix++] = header;
                /* walk and report left handsize */
@@ -655,7 +692,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
                        ks_dhtrt_bucket_t *b = header->bucket;
                        printf("   bucket holds %d entries\n", b->count);
                         
-                       if (level == 7) {
+                       if (b->count > 0 && level == 7) {
                                printf("   --------------------------\n");
 
                                for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
@@ -678,7 +715,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
                        header = header->right;
                }
        }                        
-
+    ks_rwl_read_unlock(internal->lock);      /* release read lock */
        return;
 }
 
@@ -774,12 +811,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
        int lix = 0;
        int rix = 0;
 
-       /* ****************** */
-       /* bucket write lock  */
-       /* ****************** */
-       /*ks_rwl_write_lock(source->lock);*/
-       source->locked=1;
-       
        for ( ; rix<KS_DHT_BUCKETSIZE; ++rix) {
                if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) {
                        /* move it to the left */
@@ -796,11 +827,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
                        --source->count;
                }
        }
-       /* *********************** */
-       /*      end bucket write lock  */
-       /* *********************** */
-       source->locked=0;
-       /*ks_rwl_write_unlock(source->lock);*/
 
        /* give original bucket to the new left hand side header */
        right->bucket = source;
@@ -826,7 +852,7 @@ static
 ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
 {
        /* sanity checks */
-       if (!bucket || bucket->count >= KS_DHT_BUCKETSIZE) {
+       if (!bucket || bucket->count > KS_DHT_BUCKETSIZE) {
                assert(0);
        }
 
@@ -848,7 +874,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
                else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
 #ifdef KS_DHT_DEBUGPRINTF_
                        char buffer[100];
-                       printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
+                       printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
 #endif
                        bucket->entries[ix].tyme = ks_time_now();
                        bucket->entries[ix].flags &= DHTPEER_ACTIVE;
@@ -856,12 +882,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
                }
        }
 
-       /* ****************** */
-       /* bucket write lock  */
-       /* ****************** */
-       /*ks_rwl_write_lock(bucket->lock);*/
-       bucket->locked = 1;
-
        if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
                /* bump this one - but only if we have no other option */
                free =  expiredix;
@@ -876,23 +896,18 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
         bucket->entries[free].tyme = ks_time_now();
         bucket->entries[free].flags &= DHTPEER_ACTIVE;
 
-               ++bucket->count;
+               if (free !=  expiredix) {  /* are we are taking a free slot rather than replacing an expired node? */
+                       ++bucket->count;       /* yes: increment total count */
+               }
+
                memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE);
-               bucket->locked = 0;
-               /*ks_rwl_write_unlock(bucket->lock);*/
 #ifdef KS_DHT_DEBUGPRINTF_
                char buffer[100];
-               printf("Inserting node %s\n",  ks_dhtrt_printableid(node->nodeid.id, buffer));
+               printf("Inserting node %s at %d\n",  ks_dhtrt_printableid(node->nodeid.id, buffer), free);
 #endif 
                return KS_STATUS_SUCCESS;
        }
 
-       bucket->locked = 0;
-       /*ks_rwl_write_unlock(bucket->lock);*/
-       /* ********************** */
-       /* end bucket write lock  */
-       /* ********************** */
-
        return KS_STATUS_FAIL;
 }
         
@@ -923,7 +938,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t
 }
 
 static
-void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
+ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
 {
 #ifdef KS_DHT_DEBUGPRINTF_
 
@@ -932,20 +947,22 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
 #endif
 
        for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTFX_
+         char bufferx[100];_
                printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix,
-                          ks_dhtrt_printableid(bucket->entries[ix].id, buffer),
+                          ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
                           bucket->entries[ix].inuse  );
 #endif
                if ( bucket->entries[ix].inuse == 1       &&
                         (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
                        bucket->entries[ix].inuse = 0;
-                       bucket->entries[ix].gptr = 0;
+                       bucket->entries[ix].gptr  = 0;
                        bucket->entries[ix].flags = 0;
-                       return;
+            --bucket->count;
+                       return KS_STATUS_SUCCESS;
                }
        }
-       return;
+       return KS_STATUS_FAIL;
 }