]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Bucket synchronization fixes & Updated tests
authorcolm <colm@freeswitch1>
Fri, 16 Dec 2016 00:43:33 +0000 (19:43 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:36 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/test/testbuckets.c

index 4958a7622e5e0459582e0729b8c4dbaa450496e2..e4815d0a50191a4ba85f41c172068a9f547cdaa7 100644 (file)
 
 /* change for testing */
 #define KS_DHT_BUCKETSIZE 20
-#define KS_DHTRT_INACTIVETIME  (5*60)   
+#define KS_DHTRT_INACTIVETIME  (15*60)  
 #define KS_DHTRT_MAXPING  3
+#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)  
+
 
 /* peer flags */
 #define DHTPEER_ACTIVE 1
@@ -91,8 +93,10 @@ typedef struct ks_dhtrt_internal_s {
        uint8_t  localid[KS_DHT_NODEID_SIZE];
        ks_dhtrt_bucket_header_t *buckets;              /* root bucketheader */
        ks_rwl_t                           *lock;                   /* lock for safe traversal of the tree */
+       ks_time_t              last_process_table;
     ks_mutex_t             *deleted_node_lock;
     ks_dhtrt_deletednode_t *deleted_node;
+    uint32_t               deleted_count;
 } ks_dhtrt_internal_t;
 
 typedef struct ks_dhtrt_xort_s {
@@ -176,7 +180,7 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
 /* debugging */
 #define KS_DHT_DEBUGPRINTF_
 /* # define KS_DHT_DEBUGPRINTFX_  very verbose */
-
+/* # define KS_DHT_DEBUGLOCKPRINTF_  debug locking */
 
 KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool) 
 {
@@ -230,7 +234,8 @@ KS_DECLARE(ks_status_t)      ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
 
     ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
     if (bentry != 0) {
-               bentry->type = ks_time_now_sec();
+               bentry->tyme = ks_time_now_sec();
+        bentry->type = type;
                (*node) = bentry->gptr;
                ks_rwl_read_unlock(internal->lock);
                return KS_STATUS_SUCCESS;
@@ -255,6 +260,7 @@ KS_DECLARE(ks_status_t)      ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
     if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
         ( ks_rwl_create(&tnode->reflock, table->pool) !=  KS_STATUS_SUCCESS))       {
         ks_pool_free(table->pool, tnode);
+               ks_rwl_read_unlock(internal->lock);
         return KS_STATUS_FAIL;
     }
 
@@ -276,8 +282,18 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
                ks_dhtrt_bucket_t *bucket = header->bucket;
 
                if (bucket != 0) {                       /* we found a bucket*/
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+                       char buf[100];
+                       printf("delete node: LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+                       fflush(stdout);
+#endif
                        ks_rwl_write_lock(bucket->lock);
                        s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+                       printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+                       fflush(stdout);
+#endif
+
                        ks_rwl_write_unlock(bucket->lock);
                }
 
@@ -317,6 +333,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
        ks_rwl_write_unlock(internal->lock);       
        return  KS_STATUS_FAIL;  /* we were not able to find a bucket*/
     }
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+        char buf[100];
+        printf("insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+        fflush(stdout);
+#endif
 
        ks_rwl_write_lock(bucket->lock);
        
@@ -327,6 +348,10 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                if (bucket->expired_count) {
                        ks_status_t s = ks_dhtrt_insert_id(bucket, node);
                        if (s == KS_STATUS_SUCCESS) {
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+                                printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+                                fflush(stdout);
+#endif
                                 ks_rwl_write_unlock(bucket->lock);
                  ks_rwl_write_unlock(internal->lock);
                                 return KS_STATUS_SUCCESS;
@@ -341,8 +366,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 
                if ( !(header->flags & BHF_LEFT) )      {       /* only the left handside node can be split */
 #ifdef KS_DHT_DEBUGPRINTF_
-                       char buffer[100];
-                       printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, buffer));
+                       char bufx[100];
+                       printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, bufx));
+#endif
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+                       printf("insert node: UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+                       fflush(stdout);
 #endif
                ks_rwl_write_unlock(bucket->lock);
             ks_rwl_write_unlock(internal->lock);
@@ -356,8 +385,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
 
                if (newmask[KS_DHT_NODEID_SIZE-1] == 0) {  /* no more bits to shift - is this possible */
 #ifdef KS_DHT_DEBUGPRINTF_
-                       char buffer[100];
-                       printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, buffer));
+                       char bufx[100];
+                       printf(" nodeid %s was not inserted\n",  ks_dhtrt_printableid(node->nodeid.id, bufx));
+#endif
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+            printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+            fflush(stdout);
 #endif
                        ks_rwl_write_unlock(bucket->lock);
             ks_rwl_write_unlock(internal->lock);
@@ -381,6 +414,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
                /* which bucket do care about */
                if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) {
                        bucket = newleft->bucket;
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+            printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf));
+            printf("insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf));
+            fflush(stdout);
+#endif
+
             ks_rwl_write_lock(bucket->lock);                   /* lock new bucket */
             ks_rwl_write_unlock(header->right->bucket->lock);   /* unlock old bucket */
                        header = newleft;
@@ -398,15 +437,13 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
        printf("into bucket %s\n",      ks_dhtrt_printableid(header->mask, buffer));
 #endif
 
-       /* by this point we have a viable & locked bucket
-       so downgrade the internal lock to read.  safe as we hold the bucket write lock
-       preventing it being sptlit under us.
-    */
-    ks_rwl_write_unlock(internal->lock);    
-    ks_rwl_read_lock(internal->lock);
-
        ks_status_t s = ks_dhtrt_insert_id(bucket, node);
-    ks_rwl_read_unlock(internal->lock);
+    ks_rwl_write_unlock(internal->lock);
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+    printf("insert node: UNLOCKING bucket %s\n",
+                       ks_dhtrt_printableid(header->mask, buf));
+    fflush(stdout);
+#endif
     ks_rwl_write_unlock(bucket->lock);
     return s;
 }
@@ -427,13 +464,22 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
 
                if (bucket != 0) {                       /* probably a logic error ?*/
 
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+            char buf[100];
+            printf("insert node: read LOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+            fflush(stdout);
+#endif
+
                        ks_rwl_read_lock(bucket->lock);
                        node = ks_dhtrt_find_nodeid(bucket, nodeid.id);
     
                        if (node != NULL) {
                                ks_rwl_read_lock(node->reflock);
                        }
-
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+            printf("insert node: read UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+            fflush(stdout);
+#endif
                        ks_rwl_read_unlock(bucket->lock);
                }
 
@@ -453,10 +499,16 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
 
        if (header != 0 && header->bucket != 0) {
                ks_rwl_write_lock(header->bucket->lock);
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+               char buf[100];
+               printf("insert node: write bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+               fflush(stdout);
+#endif
+
                ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
 
                if (e != 0) { 
-                       e->tyme = ks_time_now();
+                       e->tyme = ks_time_now_sec();
                        e->outstanding_pings = 0;
 
                        if (e->flags ==  DHTPEER_EXPIRED) {
@@ -466,6 +518,10 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table,  ks_dh
                        e->flags = DHTPEER_ACTIVE;
                    s = KS_STATUS_SUCCESS;
                }
+#ifdef  KS_DHT_DEBUGLOCKPRINTF_
+               printf("insert node: UNLOCKING bucket %s\n",  ks_dhtrt_printableid(header->mask, buf));
+               fflush(stdout);
+#endif
                ks_rwl_write_unlock(header->bucket->lock);
        }
        ks_rwl_read_unlock(internal->lock);      /* release read lock */
@@ -698,12 +754,16 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 
        ks_dhtrt_internal_t *internal = table->internal;
 
+    ks_time_t t0 = ks_time_now_sec();
+    if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
+               return;  
+    } 
+
        ks_rwl_read_lock(internal->lock);      /* grab read lock */
 
        ks_dhtrt_bucket_header_t *header = internal->buckets;
        ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
        int stackix=0;
-       ks_time_t t0 = ks_time_now(); 
 
        while (header) {
                stack[stackix++] = header;
@@ -715,13 +775,11 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
                        if (ks_rwl_try_write_lock(b->lock) == KS_STATUS_SUCCESS) {
 
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
-        char buf[100];
-        printf("process_table: LOCKING bucket %s\n",
-               ks_dhtrt_printableid(header->mask, buf));
-        fflush(stdout);
+                               char buf[100];
+                               printf("process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+                               fflush(stdout);
 #endif
 
-
                                for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
                                        ks_dhtrt_bucket_entry_t *e =  &b->entries[ix];
 
@@ -757,8 +815,7 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
 
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
         char buf1[100];
-        printf("process_table: UNLOCKING bucket %s\n",
-               ks_dhtrt_printableid(header->mask, buf1));
+        printf("process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
         fflush(stdout);
 #endif
 
@@ -767,12 +824,10 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
                        }   /* end of if trywrite_lock successful */
             else {
 #ifdef  KS_DHT_DEBUGPRINTF_
-        char buf2[100];
-        printf("process_table: unble to LOCK bucket %s\n",
-               ks_dhtrt_printableid(header->mask, buf2));
-        fflush(stdout);
+                               char buf2[100];
+                               printf("process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
+                               fflush(stdout);
 #endif
-
             }
                }
 
@@ -808,6 +863,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
             temp = deleted;
             deleted = deleted->next;
             ks_pool_free(table->pool, temp);
+            --internal->deleted_count;
                        if (prev != NULL) {
                                prev->next = deleted;
                        }
@@ -936,8 +992,7 @@ ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *hea
        if (bucket == 0)  return NULL;
 
        for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-#ifdef KS_DHT_DEBUGPRINTF_
-#endif
+
                if ( bucket->entries[ix].inuse == 1       &&
                         (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
                        return &(bucket->entries[ix]);
@@ -963,7 +1018,9 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
        int rix = 0;
 
        for ( ; rix<KS_DHT_BUCKETSIZE; ++rix) {
+
                if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) {
+
                        /* move it to the left */
                        memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_NODEID_SIZE);
                        dest->entries[lix].gptr   = source->entries[rix].gptr;
@@ -1014,20 +1071,24 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
        uint8_t ix = 0;
 
        for (; ix<KS_DHT_BUCKETSIZE; ++ix)      {
+
                if (bucket->entries[ix].inuse == 0) {
+
                        if (free == KS_DHT_BUCKETSIZE) {
                                free = ix; /* use this one       */
                        }
+
                }
                else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) {
                        expiredix = ix;
                }
+
                else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
 #ifdef KS_DHT_DEBUGPRINTF_
                        char buffer[100];
                        printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
 #endif
-                       bucket->entries[ix].tyme = ks_time_now();
+                       bucket->entries[ix].tyme = ks_time_now_sec();
                        bucket->entries[ix].flags &= DHTPEER_ACTIVE;
                        return KS_STATUS_SUCCESS;  /* already exists */
                }
@@ -1044,7 +1105,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
                bucket->entries[free].gptr = node;
         bucket->entries[free].type = node->type;
         bucket->entries[free].family = node->family;  
-        bucket->entries[free].tyme = ks_time_now();
+        bucket->entries[free].tyme = ks_time_now_sec();
         bucket->entries[free].flags &= DHTPEER_ACTIVE;
 
                if (free !=  expiredix) {  /* are we are taking a free slot rather than replacing an expired node? */
@@ -1092,14 +1153,13 @@ static
 ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
 {
 #ifdef KS_DHT_DEBUGPRINTF_
-
        char buffer[100];
        printf("\ndeleting node for: %s\n",      ks_dhtrt_printableid(id, buffer));
 #endif
 
        for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
 #ifdef KS_DHT_DEBUGPRINTFX_
-         char bufferx[100];_
+               char bufferx[100];_
                printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix,
                           ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
                           bucket->entries[ix].inuse  );
@@ -1156,6 +1216,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
     
 
        for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
+
            if ( bucket->entries[ix].inuse == 1                              &&
              (family == ifboth || bucket->entries[ix].family == family)  &&
              (bucket->entries[ix].type & type)                           &&
@@ -1210,6 +1271,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
 {
        ks_dhtrt_sortedxors_t *current = xort;
        uint8_t loaded = 0;
+
        while (current) {
 #ifdef KS_DHT_DEBUGPRINTF_
                char buf[100];
@@ -1217,6 +1279,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
                           ks_dhtrt_printableid(current->bheader->mask,buf), current->count);
 #endif
                int xorix = current->startix; 
+
                for (uint8_t ix = 0;
                                ix< current->count && loaded < query->max && xorix !=  KS_DHT_BUCKETSIZE;
                                ++ix )   {
@@ -1225,6 +1288,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
             xorix =  current->xort[xorix].nextix;
                        ++loaded;
                }
+
 #ifdef  KS_DHT_DEBUGLOCKPRINTF_
         char buf1[100];
         printf("load_query: UNLOCKING bucket %s\n",
@@ -1249,6 +1313,7 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t*
     ks_mutex_lock(internal->deleted_node_lock);
     deleted->next = internal->deleted_node;
     internal->deleted_node = deleted;
+    ++internal->deleted_count;
     ks_mutex_unlock(internal->deleted_node_lock);
 }
 
@@ -1300,34 +1365,6 @@ void ks_dhtrt_shiftleft(uint8_t *id) {
        return;
 }
 
-/* Determine whether id1 or id2 is closer to ref */
-
-/*
-  @todo: remove ?  simple memcpy seems to do the job ?
-
-  static int
-  ks_dhtrt_xorcmp(const uint8_t *id1, const uint8_t *id2, const uint8_t *ref);
-
-  static int ks_dhtrt_xorcmp(const uint8_t *id1, const uint8_t *id2, const uint8_t *ref)
-  {
-  int i;
-  for (i = 0; i < KS_DHT_NODEID_SIZE; i++) {
-  uint8_t xor1, xor2;
-  if (id1[i] == id2[i]) {
-  continue;
-  }
-  xor1 = id1[i] ^ ref[i];
-  xor2 = id2[i] ^ ref[i];
-  if (xor1 < xor2) {
-  return -1;             / * id1 is closer * /
-  }
-  return 1;                              / * id2 is closer * /
-  }
-  return 0;            / * id2 and id2 are identical ! * /
-  }
-*/
-
-
 /* create an xor value from two ids */
 static void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor)
 {
index 0468a2cb0a6d57c087a54d9cb5ae809a23919f8e..43094bdba0717d173e97810e40ef27289d418c80 100644 (file)
@@ -1,5 +1,6 @@
 #pragma GCC diagnostic ignored "-Wunused-but-set-variable"
 #pragma GCC diagnostic ignored "-Wunused-variable"
+#pragma GCC diagnostic ignored "-Wunused-function"
 
 //#include "ks.h"
 #include "../src/dht/ks_dht.h"
@@ -7,6 +8,8 @@
 ks_dhtrt_routetable_t* rt;
 ks_pool_t* pool;
 
+static ks_thread_t *threads[10];
+
 
 int doquery(ks_dhtrt_routetable_t* rt, uint8_t* id, enum ks_dht_nodetype_t type, enum ks_afflags_t family)
 {
@@ -22,7 +25,8 @@ int doquery(ks_dhtrt_routetable_t* rt, uint8_t* id, enum ks_dht_nodetype_t type,
 
 void test01()
 {
- printf("testbuckets - test01 start\n"); fflush(stdout);
+       printf("*** testbuckets - test01 start\n"); fflush(stdout);
+
    ks_dhtrt_routetable_t* rt;
    ks_dhtrt_initroute(&rt, pool);
    ks_dhtrt_deinitroute(&rt);
@@ -66,11 +70,13 @@ void test01()
        exit(104);
    }
 
-   printf("*** testbuckets - test01 complete\n"); fflush(stdout);
+   printf("*** testbuckets - test01 complete\n\n\n"); fflush(stdout);
 }
 
 void test02()
 {
+       printf("*** testbuckets - test02 start\n"); fflush(stdout);
+
    ks_dht_node_t*  peer;
    ks_dht_nodeid_t nodeid;
    memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
@@ -126,6 +132,8 @@ void test02()
    qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4);
    printf("\n*** AF_INET count expected 4, actual %d\n", qcount); fflush(stdout);
 
+   printf("*** testbuckets - test02 finished\n"); fflush(stdout);
+
    return;
 }
 
@@ -133,6 +141,8 @@ void test02()
 
 void test03()
 {
+ printf("*** testbuckets - test03 start\n"); fflush(stdout);
+
    ks_dht_node_t*  peer;
    ks_dht_nodeid_t nodeid;
    memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
@@ -177,33 +187,36 @@ void test03()
 
 
    int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
-   printf("\n** local query count expected 3, actual %d\n", qcount); fflush(stdout);
+   printf("\n** local query count expected 2, actual %d\n", qcount); fflush(stdout);
    qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, both);
-   printf("\n*** remote query count expected 6, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** remote query count expected 20, actual %d\n", qcount); fflush(stdout);
    qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both);
-   printf("\n*** both query count expected 9, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** both query count expected 20, actual %d\n", qcount); fflush(stdout);
 
    qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, ifv4);
-   printf("\n*** local AF_INET  query count expected 1, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** local AF_INET  query count expected 2, actual %d\n", qcount); fflush(stdout);
    qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, ifv6);
-   printf("\n*** local AF_INET6 query count expected 2, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** local AF_INET6 query count expected 0, actual %d\n", qcount); fflush(stdout);
 
    qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv6);
-   printf("\n*** AF_INET6 count expected 5, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** AF_INET6 count expected 20, actual %d\n", qcount); fflush(stdout);
 
    qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv4);
-   printf("\n** remote AF_INET  query count expected 3, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** remote AF_INET  query count expected 20, actual %d\n", qcount); fflush(stdout);
    qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv6);
-   printf("\n*** remote AF_INET6 query count expected 3, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** remote AF_INET6 query count expected 20, actual %d\n", qcount); fflush(stdout);
 
    qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4);
-   printf("\n*** AF_INET count expected 4, actual %d\n", qcount); fflush(stdout);
+   printf("\n*** AF_INET count expected 20, actual %d\n", qcount); fflush(stdout);
 
+   printf("*** testbuckets - test03 finished\n\n\n"); fflush(stdout);
    return;
 }
 
 void test04()
 {
+   printf("*** testbuckets - test04 start\n"); fflush(stdout);
+
    ks_dht_node_t*  peer;
    ks_dht_nodeid_t nodeid;
    memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
@@ -242,16 +255,193 @@ void test04()
 
     printf("*** query on 10k nodes in %d ms\n", tx);
  
+    printf("*** testbuckets - test04 finished\n\n\n"); fflush(stdout);
 
    return;
 }
 
+/* test read/write node locking */
+void test05()
+{
+ printf("*** testbuckets - test05 start\n"); fflush(stdout);
+
+   ks_dht_node_t*  peer, *peer1, *peer2;
+   ks_dht_nodeid_t nodeid;
+   ks_status_t s;
+
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+
+   char ipv6[] = "1234:1234:1234:1234";
+   char ipv4[] = "123.123.123.123";
+   unsigned short port = 7001;
+
+   ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+   
+   peer1 = ks_dhtrt_find_node(rt, nodeid);
+   printf("test05 - first find compelete\n"); fflush(stdout);
+
+   peer2 =  ks_dhtrt_find_node(rt, nodeid);
+   printf("test05 - second find compelete\n");   fflush(stdout);
+
+   ks_dhtrt_delete_node(rt, peer);
+   printf("test05 - delete compelete\n");   fflush(stdout);
 
+   s = ks_dhtrt_release_node(peer1);
+   if (s == KS_STATUS_FAIL) printf("release 1 failed\n"); fflush(stdout);
 
-int main(int argx, char* argv[]) {
+
+
+   s = ks_dhtrt_release_node(peer2);
+   if (s == KS_STATUS_FAIL) printf("release 1 failed\n"); 
+
+   printf("*** testbuckets - test05 finished\n\n\n"); fflush(stdout);
+
+   return;
+}
+
+static int gindex = 1;
+static ks_mutex_t *glock;
+static int gstop = 0;
+
+static int test06loops = 1000;
+static int test06nodes = 200;  /* max at 255 */ 
+
+static void *test06ex1(ks_thread_t *thread, void *data)
+{
+   while(!gstop) {
+      ks_dhtrt_process_table(rt);
+      ks_sleep(100);
+   }
+   return NULL;
+}
+
+
+static void *test06ex2(ks_thread_t *thread, void *data)
+{
+   ks_dht_nodeid_t nodeid;
+   ks_dhtrt_querynodes_t query;
+
+
+   while(!gstop) {
+
+       memset(&query, 0, sizeof(query));
+       memset(query.nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+       query.max = 30;
+       query.family = ifv4;
+       query.type = KS_DHT_REMOTE;
+
+
+       ks_dhtrt_findclosest_nodes(rt, &query);
+       ks_sleep(10000);
+   
+       for(int i=0; i<query.count; ++i) {
+           ks_dhtrt_release_node(query.nodes[i]);
+           ks_sleep(10000);
+       } 
+       ks_sleep(2000000);
+
+   }
+   return NULL;
+}
+
+static void *test06ex(ks_thread_t *thread, void *data)
+{
+   ks_dht_node_t*  peer;
+   ks_dht_nodeid_t nodeid;
+   char ipv6[] = "1234:1234:1234:1234";
+   char ipv4[] = "123.123.123.123";
+   unsigned short port = 7000;
+
+   memset(nodeid.id,  0xef, KS_DHT_NODEID_SIZE);
+
+   int *pi = data;
+   int i = *pi;
+   ks_mutex_lock(glock);
+   nodeid.id[0] = ++gindex;
+   ks_mutex_unlock(glock);
+
+   printf("starting thread with i of %d\n", gindex); fflush(stdout);
+
+   for(int loop=0; loop<test06loops; ++loop) {
+
+       for (int i=0; i<test06nodes; ++i) {
+                ++nodeid.id[19];
+               ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
+        ks_sleep(1000);
+       }
+
+       for (int i=0; i<test06nodes; ++i) {
+                peer = ks_dhtrt_find_node(rt, nodeid);
+                if (peer) {
+                        ks_dhtrt_delete_node(rt, peer);
+             ks_sleep(400);
+               }
+               --nodeid.id[19];
+       }
+
+   }
+
+   return 0;
+   
+}
+
+void test06()
+{
+    int i;
+    ks_mutex_create(&glock, KS_MUTEX_FLAG_DEFAULT, pool);
+
+    ks_thread_t* t0;
+    ks_thread_create(&t0, test06ex1, NULL, pool); 
+
+    ks_thread_t* t1;
+    ks_thread_create(&t1, test06ex2, NULL, pool); 
+
+    for(i = 0; i < 10; i++) {
+        ks_thread_create(&threads[i], test06ex, &i, pool);
+    }
+
+    printf("all threads started\n"); fflush(stdout);
+
+    for(i = 0; i < 10; i++) {
+        ks_thread_join(threads[i]);
+    }
+    gstop = 1;
+
+    ks_thread_join(t1); 
+
+    ks_thread_join(t0); 
+
+    printf("all threads completed\n"); fflush(stdout);
+   ks_dhtrt_dump(rt, 7);
+
+    return;
+}
+
+
+int main(int argc, char* argv[]) {
 
    printf("testdhtbuckets - start\n");
 
+   int tests[10];
+  
+       if (argc == 0) {
+      tests[1] = 1;
+      tests[2] = 1;
+      tests[3] = 1;
+      tests[4] = 1;
+      tests[5] = 1;
+      tests[6] = 0;
+      tests[7] = 0;
+      tests[8] = 0;
+      tests[9] = 0;
+       }
+       else {
+               for(int tix=1; tix<10 && tix<argc; ++tix) {
+                       long i = strtol(argv[tix], NULL, 0);
+                       tests[i] = 1;
+               }
+       }
+
    ks_init();
 
    ks_status_t status;
@@ -272,12 +462,41 @@ int main(int argx, char* argv[]) {
    ks_dhtrt_initroute(&rt, pool);
    ks_dhtrt_deinitroute(&rt);
 
-   ks_dhtrt_initroute(&rt, pool);
-   test01();
-   test02();
-   test03();
-   test04();
+   if (tests[1] == 1) {
+       ks_dhtrt_initroute(&rt, pool);
+       test01();
+       ks_dhtrt_deinitroute(&rt);
+   }
+
+   if (tests[2] == 1) {
+       ks_dhtrt_initroute(&rt, pool);
+       test02();
+       ks_dhtrt_deinitroute(&rt);
+   }
+
+   if (tests[3] == 1) {  
+    ks_dhtrt_initroute(&rt, pool);
+       test03();
+       ks_dhtrt_deinitroute(&rt);
+   }
+
+   if (tests[4] == 1) {
+       ks_dhtrt_initroute(&rt, pool);
+       test04();
+       ks_dhtrt_deinitroute(&rt);
+   }
 
+   if (tests[5] == 1) {
+       ks_dhtrt_initroute(&rt, pool);
+       test05();
+       ks_dhtrt_deinitroute(&rt);
+   }
+
+   if (tests[6] == 1) {
+       ks_dhtrt_initroute(&rt, pool);
+    test06();
+    ks_dhtrt_deinitroute(&rt);
+   }
    return 0;
 
 }