]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Implement serialization, deserialization & repopulation for dht table
authorcolm <colm@freeswitch1>
Fri, 6 Jan 2017 22:25:52 +0000 (17:25 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:38 +0000 (14:59 -0600)
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_bucket.c
libs/libks/test/testbuckets.c

index a4ca006c58a1b57b07def9831dfe2cbad9bb1692..8d183a4c4a789ad16c55bfcaba57016cd8cb464d 100644 (file)
@@ -570,6 +570,10 @@ KS_DECLARE(ks_status_t)        ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t
 
 KS_DECLARE(void)               ks_dhtrt_process_table(ks_dhtrt_routetable_t* table);
 
+KS_DECLARE(uint32_t)          ks_dhtrt_serialize(ks_dhtrt_routetable_t* table, void** ptr);
+KS_DECLARE(ks_status_t)        ks_dhtrt_deserialize(ks_dhtrt_routetable_t* table, void* ptr);
+
+
 /* debugging aids */
 KS_DECLARE(void)               ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level);
 
index 72063c8630ced1822df9f9b066974f2966afa78a..50e7526b5a1f7808b16c121d5307e7cd9bda0b9e 100644 (file)
@@ -106,6 +106,8 @@ typedef struct ks_dhtrt_internal_s {
        ks_dhtrt_deletednode_t *deleted_node;
        ks_dhtrt_deletednode_t *free_node_ex;
        uint32_t               deleted_count;
+    uint32_t               bucket_count;
+    uint32_t               header_count;
 } ks_dhtrt_internal_t;
 
 typedef struct ks_dhtrt_xort_s {
@@ -203,7 +205,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
                                                                                        ks_dht_t *dht,
                                                                                        ks_pool_t *pool) 
 {
-(void)ks_dhtrt_find_relatedbucketheader;
 
        unsigned char initmask[KS_DHT_NODEID_SIZE];
        memset(initmask, 0xff, sizeof(initmask));
@@ -224,6 +225,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
        initial_header->flags = BHF_LEFT;        /* fake left to allow splitting */ 
        internal->buckets = initial_header;
        initial_header->bucket =  ks_dhtrt_create_bucket(pool);
+       internal->header_count = 1;
+       internal->bucket_count = 1;
        table->pool = pool;
 
        *tableP = table;
@@ -281,6 +284,8 @@ KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **tableP)
        return;
 }
 
+
+
 KS_DECLARE(ks_status_t)         ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, 
                                                                                           ks_dht_nodeid_t nodeid,
                                                                                           enum ks_dht_nodetype_t type, 
@@ -500,6 +505,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
         newleft->left1bit = newright;
 
                ks_dhtrt_split_bucket(header, newleft, newright);
+               internal->header_count += 2;
+               ++internal->bucket_count;
 
                /* ok now we need to try again to see if the bucket has capacity */
                /* which bucket do care about */
@@ -1796,6 +1803,194 @@ static char *ks_dhtrt_printableid(uint8_t *id, char *buffer)
 }
 
 
+/*
+ *
+ * serialization and deserialization
+ * ---------------------------------
+*/
+
+typedef struct ks_dhtrt_serialized_bucket_s 
+{
+       uint16_t            count;
+       char               eye[4];
+       ks_dhtrt_nodeid_t  id;
+} ks_dhtrt_serialized_bucket_t;
+
+typedef struct ks_dhtrt_serialized_routetable_s
+{
+       uint32_t           size;
+       uint8_t            version;
+       uint8_t            count;
+       char               eye[4];
+} ks_dhtrt_serialized_routetable_t;
+
+#define DHTRT_SERIALIZATION_VERSION 1
+
+static void ks_dhtrt_serialize_node(ks_dht_node_t *source, ks_dht_node_t *dest) 
+{
+       memcpy(dest, source, sizeof(ks_dht_node_t));
+       memset(&dest->table, 0, sizeof(void*));
+       memset(&dest->reflock, 0, sizeof(void*));      
+}
+
+static void ks_dhtrt_serialize_bucket(ks_dhtrt_routetable_t *table, 
+                                                                               ks_dhtrt_serialized_routetable_t *stable,
+                                                                               ks_dhtrt_bucket_header_t* header, 
+                                                                               unsigned char* buffer) 
+{
+       uint8_t tzero = 0;
+       ks_dhtrt_serialized_bucket_t *s = (ks_dhtrt_serialized_bucket_t*)buffer;
+
+       memcpy(s->eye, "HEAD", 4);
+       memcpy(s->id, header->mask, KS_DHT_NODEID_SIZE);
+       buffer += sizeof(ks_dhtrt_serialized_bucket_t);
+       stable->size  += sizeof(ks_dhtrt_serialized_bucket_t);
+
+       if (header->bucket != 0) {
+               ks_dhtrt_bucket_t* bucket = header->bucket;
+
+               memcpy(&s->count, &bucket->count, sizeof(uint8_t));
+
+               for (int i=0; i< KS_DHT_BUCKETSIZE; ++i) {
+                       if (bucket->entries[i].inuse == 1) {
+                               ks_dhtrt_serialize_node(bucket->entries[i].gptr, (ks_dht_node_t*)buffer);
+                               buffer += sizeof(ks_dht_node_t);
+                               stable->size  += sizeof(ks_dht_node_t);
+                       }
+               }
+       }
+    else {
+               memcpy(&s->count, &tzero, sizeof(uint8_t));
+       }
+}
+
+static void ks_dhtrt_serialize_table(ks_dhtrt_routetable_t *table, 
+                                                                               ks_dhtrt_serialized_routetable_t *stable,  
+                                                                               unsigned char *buffer)
+{
+       ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
+       int stackix=0;
+
+       ks_dhtrt_internal_t *internal = table->internal;
+       ks_dhtrt_bucket_header_t *header = internal->buckets;
+
+       while (header) {
+               stack[stackix++] = header;
+
+               ++stable->count;
+
+               ks_dhtrt_serialize_bucket(table, stable, header, buffer);
+        buffer = (unsigned char*)stable + stable->size;       
+               header = header->left;
+
+               if (header == 0 && stackix > 1) {
+                       stackix -= 2;
+                       header =  stack[stackix];
+                       header = header->right;
+               }
+       }
+    return;
+}
+
+KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t *table, void **ptr)
+{
+       ks_dhtrt_internal_t *internal = table->internal;    
+       ks_rwl_write_lock(internal->lock);      /* grab write lock */
+
+       uint32_t buffer_size = 3200 * sizeof(ks_dht_node_t);
+       buffer_size +=  internal->header_count * sizeof(ks_dhtrt_serialized_bucket_t);
+       buffer_size +=   sizeof(ks_dhtrt_serialized_routetable_t);
+       unsigned char *buffer = (*ptr) = ks_pool_alloc(table->pool, buffer_size);
+
+       ks_dhtrt_serialized_routetable_t *stable = (ks_dhtrt_serialized_routetable_t*)buffer;
+       stable->size = sizeof(ks_dhtrt_serialized_routetable_t);
+       stable->version = DHTRT_SERIALIZATION_VERSION;
+       memcpy(stable->eye, "DHRT", 4);
+    
+       buffer +=  sizeof(ks_dhtrt_serialized_routetable_t);
+
+       ks_dhtrt_serialize_table(table, stable, buffer); 
+       ks_rwl_write_unlock(internal->lock);      /* write unlock */
+       return stable->size;  
+}
+
+
+static void ks_dhtrt_deserialize_node(ks_dhtrt_routetable_t *table, 
+                                                                               ks_dht_node_t *source, 
+                                                                               ks_dht_node_t *dest) 
+{
+       memcpy(dest, source, sizeof(ks_dht_node_t));
+       dest->table = table;
+       ks_rwl_create(&dest->reflock, table->pool);
+}
+
+
+KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t *table, void* buffer)
+{
+       ks_dhtrt_internal_t *internal = table->internal;
+       ks_rwl_write_lock(internal->lock);      /* grab write lock */
+       unsigned char *ptr = (unsigned char*)buffer; 
+
+       ks_dhtrt_serialized_routetable_t *stable = (ks_dhtrt_serialized_routetable_t*)buffer;
+       ptr += sizeof(ks_dhtrt_serialized_routetable_t);
+
+       /* unpack and chain the buckets */
+       for (int i=0; i<stable->count; ++i) {
+
+               ks_dhtrt_serialized_bucket_t *s = (ks_dhtrt_serialized_bucket_t*)ptr;
+
+               if (memcmp(s->eye, "HEAD", 4)) {
+                       assert(0);
+                       ks_rwl_write_unlock(internal->lock);      /* write unlock */
+                       return KS_STATUS_FAIL;
+               }
+
+               ptr += sizeof(ks_dhtrt_serialized_bucket_t);
+
+               /* currently adding the nodes individually   
+         * need a better way to do this that is compatible with the pending
+                * changes for supernode support
+        */
+
+               char buf[51];
+               ks_log(KS_LOG_DEBUG, "deserialize bucket [%s] count %d\n", ks_dhtrt_printableid(s->id, buf), s->count);
+
+               int mid = s->count >>1;    
+               ks_dht_node_t *fnode = NULL;
+               ks_dht_node_t *node = NULL;
+
+               for(int i0=0; i0<s->count; ++i0) {
+                       /* recreate the node */
+                       ks_dht_node_t *node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));  
+                       if (i0 == mid) fnode = node;
+                       ks_dhtrt_deserialize_node(table, (ks_dht_node_t*)ptr, node);
+                       ptr +=  sizeof(ks_dht_node_t);
+                       ks_dhtrt_insert_node(table, node, 0); 
+               }
+
+               /* 
+               *       now the bucket is complete - now trigger a find.
+               *       This staggers the series of finds.  We only do this for populated tables here. 
+               *       Once the table is loaded, process_table will as normal start the ping/find process to
+               *       update and populate the table.
+               */
+
+               if (s->count > 0) {
+                       if (fnode) {
+                               ks_dhtrt_find(table, internal, &fnode->nodeid);
+                       }
+                       else if (node) {
+                               ks_dhtrt_find(table, internal, &node->nodeid);
+                       }
+               } 
+       }
+
+       ks_rwl_write_unlock(internal->lock);      /* write unlock */
+       return KS_STATUS_SUCCESS;
+}
+
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 18890f251a95769fb7b8f78eb09d65683d8a65a2..fd1a07c043d9ea2252eab560ef5d358c792edddc 100644 (file)
@@ -620,8 +620,6 @@ void test09()
      char ipv4[] = "123.123.123.123";
     unsigned short port = 7000;
 
-    /* build a delete queue */
-
     int cix=0;
 
     for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
@@ -655,6 +653,88 @@ void test09()
 
 
 
+typedef struct ks_dhtrt_serialized_routetable_s
+{
+    uint32_t           size;
+    uint8_t            version;
+    uint8_t            count;
+    char               eye[4];
+} ks_dhtrt_serialized_routetable_t;
+
+
+void test10()
+{
+     printf("**** testbuckets - test10 start\n"); fflush(stdout);
+
+     ks_dht_node_t  *peer;
+     memset(g_nodeid1.id,  0xef, KS_DHT_NODEID_SIZE);
+     memset(g_nodeid2.id,  0xef, KS_DHT_NODEID_SIZE);
+
+     char ipv6[] = "1234:1234:1234:1234";
+     char ipv4[] = "123.123.123.123";
+    unsigned short port = 7000;
+
+    int cix=0;
+
+    for(int i0=0, i1=0; i0<2500; ++i0, ++i1) {
+        if (i0%20 == 0) {
+            g_nodeid2.id[cix]>>=1;
+            //ks_dhtrt_dump(rt, 7);
+            if ( g_nodeid2.id[cix] == 0) ++cix;
+            g_nodeid2.id[19] = 0;
+        }
+        else {
+            ++g_nodeid2.id[19];
+        }
+        ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
+        ks_dhtrt_touch_node(rt, g_nodeid2);
+        ks_dhtrt_release_node(peer);
+     }
+
+    /* this should expire all nodes after 15 minutes and 3 pings */
+       void *buffer = NULL;
+    uint32_t size = ks_dhtrt_serialize(rt, &buffer);
+
+    
+    if (size > 0) {
+        ks_dhtrt_serialized_routetable_t* p =  (ks_dhtrt_serialized_routetable_t*)buffer;
+        printf("\n\ntest10: version %d   bucket count %d   size %d\n\n", p->version, p->count, p->size);
+        ks_dhtrt_dump(rt, 7);
+    }
+    else {
+        printf("test10: error on serialize\n");
+        return;
+       }
+
+
+    ks_dhtrt_routetable_t* rt2;
+    ks_dhtrt_initroute(&rt2, dht, pool);
+    ks_dhtrt_deserialize(rt2, buffer);
+    ks_dhtrt_dump(rt2, 7);
+
+    ks_dht_nodeid_t  id;
+    memset(id.id, 0xef, 20);
+    id.id[0] = 0x0e;
+    id.id[19] = 0x05;
+
+    ks_dhtrt_touch_node(rt2, id);
+    ks_dht_node_t* n = ks_dhtrt_find_node(rt2, id);
+
+    if (n == NULL) {
+               printf("test10: failed  Unable to find reloaded node \n");
+        exit(200);
+       }
+    
+    ks_dhtrt_deinitroute(&rt2);
+
+    printf("test10: complete\n");
+
+    return;
+
+}
+
+
 
 
 
@@ -1152,6 +1232,13 @@ int main(int argc, char *argv[]) {
              continue;
          }
 
+         if (tests[tix] == 10) {
+             ks_dhtrt_initroute(&rt, dht, pool);
+             test10();
+             ks_dhtrt_deinitroute(&rt);
+             continue;
+         }
+
 
 
                if (tests[tix] == 30) {