]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Mark suspect and expired nodes. Add to makefile
authorcolm <colm@freeswitch1>
Wed, 7 Dec 2016 04:20:13 +0000 (23:20 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:34 +0000 (14:59 -0600)
libs/libks/Makefile.am
libs/libks/src/dht/ks_dht_bucket.c

index 341925e6d46238b7587453381b49c6f0c2b2964c..d976ad117042bb110b2c4f30a5a837d3aea948bd 100644 (file)
@@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k
 libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
 libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
 libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
-libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
+libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_bucket.c
 libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c 
 #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h
 
@@ -29,7 +29,7 @@ library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include
 library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h
 library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h
 library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h
-library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h
+library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h src/dht/ks_dht_bucket.h
 
 tests: libks.la
        $(MAKE) -C test tests
index 1b2a5f49f9333ac27c467f07699af59fee402f8d..aea4f1dcec5e5c8de0acd360ad8c301cd6c6da19 100644 (file)
 
 /* change for testing */
 #define KS_DHT_BUCKETSIZE 20
+#define KS_DHTRT_INACTIVETIME  (5*60)    
+#define KS_DHTRT_MAXPING  3
 
 /* peer flags */
-#define DHTPEER_ACTIVE  0x01
-#define DHTPEER_SUSPECT 0x02
-#define DHTPEER_EXPIRED 0x04
+#define DHTPEER_ACTIVE  1
+#define DHTPEER_SUSPECT 2
+#define DHTPEER_EXPIRED 3
 
 /* internal structures */
+typedef struct ks_dhtrt_rw_lock_s {
+   ks_pool_t* pool;   
+   ks_mutex_t*  mutex;
+   ks_cond_t*   rcond;
+   volatile uint16_t     read_count;     
+   ks_cond_t*   wcond;
+   volatile uint16_t     write_count;    /* hopefully never more than 1 ! */
+} ks_dhtrt_rw_lock;
+
 typedef struct ks_dhtrt_bucket_entry_s {
     ks_time_t    tyme;
     unsigned char id[KS_DHT_IDSIZE];
     ks_dhtrt_node* gptr;                    /* ptr to peer */      
     uint8_t    inuse;
+    uint8_t    outstanding_pings;
     uint8_t    flags;                     /* active, suspect, expired */
-    struct ks_dhtrt_bucket_entry_s* left;
-    struct ks_dhtrt_bucket_entry_s* right;
-    struct ks_dhtrt_bucket_entry_s* prev;
 } ks_dhtrt_bucket_entry;
 
 typedef struct ks_dhtrt_bucket_s {
     ks_dhtrt_bucket_entry  entries[KS_DHT_BUCKETSIZE];
-    ks_dhtrt_bucket_entry* first;     /* sorted order - first*/
-    ks_dhtrt_bucket_entry* last;      /* sorted order - last*/
-    ks_dhtrt_bucket_entry* avail;     /* available chain    */ 
-    unsigned short       count;
+    uint8_t       count;
+    uint8_t       expired_count;
 } ks_dhtrt_bucket; 
 
 
@@ -73,13 +80,14 @@ typedef struct ks_dhtrt_bucket_header {
     struct ks_dhtrt_bucket_header* left;
     struct ks_dhtrt_bucket_header* right;
     ks_dhtrt_bucket*   bucket;
-    unsigned char    mask[KS_DHT_IDSIZE];
-    unsigned char    flags;
+    ks_time_t        tyme;                 /* last processed time */
+    unsigned char    mask[KS_DHT_IDSIZE];  /* node id mask        */
+    unsigned char    flags;           
 } ks_dhtrt_bucket_header;
 
 
 typedef struct ks_dhtrt_internal_s {
-   ks_dhtrt_bucket_header* buckets;
+   ks_dhtrt_bucket_header* buckets;       /* root bucketheader */
    /*   */
   
 } ks_dhtrt_internal;
@@ -150,6 +158,29 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
                                                                                        unsigned char* hixor,
                                                                                        unsigned int max);
 
+static
+void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry);
+
+static
+ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock);
+
+static
+void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock);
+static
+ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock);
+static
+ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock);
+
+
+
 /* debugging */
 #define KS_DHT_DEBUGPRINTF_
 
@@ -241,6 +272,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr
         if (insanity > 3200) assert(insanity < 3200);
 
            /* first - seek a stale entry to eject */
+        if (bucket->expired_count) {
+           ks_status_t s = ks_dhtrt_insert_id(bucket, peer);
+           if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
+        }
+
            /* 
                   todo: attempting a ping at at this point would require us
                   to suspend this process ... tricky...assume right now we will go ahead and
@@ -312,9 +348,12 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable* table,  ks_dhtr
 {
    ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid);
    if (header == 0) return KS_STATUS_FAIL;
+   if (header->bucket == 0)  return KS_STATUS_FAIL;
    ks_dhtrt_bucket_entry* e = ks_dhtrt_find_bucketentry(header, nodeid);
-   if (e != 0) {
+   if (e != 0) { 
       e->tyme = ks_time_now();
+      e->outstanding_pings = 0;
+      if (e->flags ==  DHTPEER_EXPIRED)  --header->bucket->expired_count;
       e->flags = DHTPEER_ACTIVE;
       return KS_STATUS_SUCCESS;
    }
@@ -497,14 +536,32 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable* table)
     ks_dhtrt_bucket_header* header = internal->buckets;
     ks_dhtrt_bucket_header* stack[KS_DHT_IDSIZE * 8];
     int stackix=0;
+    ks_time_t t0 = ks_time_now(); 
 
     while(header) {
          stack[stackix++] = header;
          if (header->bucket) {
-             /*ks_dhtrt_bucket* b = header->bucket;*/
+             ks_dhtrt_bucket* b = header->bucket;
              for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-
-             }
+                ks_dhtrt_bucket_entry* e =  &b->entries[ix];
+                  if (e->inuse == 1) {
+                  /* more than n pings outstanding? */
+                  if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
+                     e->flags =  DHTPEER_EXPIRED; 
+                     ++b->expired_count;
+                     continue;
+                  }
+                  if (e->flags == DHTPEER_SUSPECT) {
+                     ks_dhtrt_ping(e); 
+                     continue;
+                  }
+                  ks_time_t tdiff = t0 - e->tyme;
+                  if (tdiff > KS_DHTRT_INACTIVETIME) {
+                     e->flags = DHTPEER_SUSPECT;
+                     ks_dhtrt_ping(e);
+                  }
+                }
+             }   /* end for each bucket_entry */
          }
          header = header->left;
          if (header == 0 && stackix > 1) {
@@ -513,8 +570,7 @@ KS_DECLARE(void)  ks_dhtrt_process_table(ks_dhtrt_routetable* table)
              header = header->right;
          }
     }
-
-
+    return;
 }
 
 
@@ -570,6 +626,16 @@ void colm() {
  ks_dhtrt_xorcmp(0, 0, 0);
  ks_dhtrt_split_bucket(0, 0, 0);
  ks_dhtrt_shiftleft(0);
+ ks_dhtrt_initrwlock( 0);
+ ks_dhtrt_deinitrwlock( 0);
+
+ ks_dhtrt_getreadlock( 0);
+ ks_dhtrt_getwritelock( 0);
+ ks_dhtrt_tryreadlock( 0);
+ ks_dhtrt_trywritelock( 0);
+ ks_dhtrt_releasereadlock( 0);
+ ks_dhtrt_releasewritelock( 0);
+
 }
 
 
@@ -697,6 +763,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
            assert(0);
        }
     uint8_t free = KS_DHT_BUCKETSIZE;
+    uint8_t expiredix = KS_DHT_BUCKETSIZE;
        
        /* find free .. but also check that it is not already here! */
     uint8_t ix = 0;
@@ -705,6 +772,9 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
             if (free == KS_DHT_BUCKETSIZE) {
                free = ix; /* use this one   */
             }
+        }
+        else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) {
+            expiredix = ix;
         }
                else if (!memcmp(bucket->entries[ix].id, peer->id, KS_DHT_IDSIZE)) {
 #ifdef  KS_DHT_DEBUGPRINTF_
@@ -716,6 +786,12 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
             return KS_STATUS_SUCCESS;  /* already exists */
         }
        }
+
+    if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
+        /* bump this one - but only if we have no other option */
+        free =  expiredix;
+        --bucket->expired_count;
+    }
        
        if ( free<KS_DHT_BUCKETSIZE ) {
                bucket->entries[free].inuse = 1;
@@ -732,7 +808,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer)
         return KS_STATUS_SUCCESS;
        }
        
-       assert(0);   /* should not reach this point */
     return KS_STATUS_FAIL;
 }
         
@@ -884,6 +959,20 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xor
         return loaded;
 }
 
+void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry) {
+    ++entry->outstanding_pings;
+    /* @todo */
+    /* set the appropriate command in the node and queue if for processing */
+     /*ks_dht_node_t* node = entry->gptr; */
+#ifdef  KS_DHT_DEBUGPRINTF_
+         char buf[100];
+         printf("  ping queued for nodeid %s count %d\n",
+                   ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
+#endif
+    return;
+}
+
+
 /*
    strictly for shifting the bucketheader mask 
    so format must be a right filled mask (hex: ..ffffffff)
@@ -960,6 +1049,81 @@ static int ks_dhtrt_ismasked(const unsigned char *id, const unsigned char *mask)
     return 1;
 }
 
+static
+ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock) 
+{
+    ks_status_t s  = ks_mutex_create(&lock->mutex, 0, lock->pool);
+    if (s != KS_STATUS_SUCCESS) return s;
+    s  = ks_cond_create_ex(&lock->rcond, lock->pool, lock->mutex);
+    if (s != KS_STATUS_SUCCESS) return s;
+    s  = ks_cond_create_ex(&lock->wcond, lock->pool, lock->mutex);
+    return s;
+}
+
+static
+void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock) 
+{
+    ks_cond_destroy(&lock->rcond);
+    ks_cond_destroy(&lock->wcond);
+    ks_mutex_destroy(&lock->mutex);
+    memset(lock, 0, sizeof(ks_dhtrt_rw_lock));
+}
+
+static
+void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock)
+{
+    ks_mutex_lock(lock->mutex);
+    while (lock->write_count > 0) {
+       ks_cond_wait(lock->rcond);
+    }
+    ++lock->read_count;
+    ks_mutex_unlock(lock->mutex);
+}
+
+static
+ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock)
+{
+     return KS_STATUS_FAIL;
+}
+
+static
+void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock) 
+{
+     ks_mutex_lock(lock->mutex);
+     --lock->read_count;
+     if (lock->read_count == 0)  
+          ks_cond_signal(lock->wcond);
+     ks_mutex_unlock(lock->mutex);
+}
+
+static
+void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock) 
+{
+     ks_mutex_lock(lock->mutex);
+     while (lock->read_count > 0) {
+       ks_cond_wait(lock->wcond);
+     }
+     ++lock->write_count;
+     ks_mutex_unlock(lock->mutex);
+}
+
+static
+ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock)
+{
+  return KS_STATUS_FAIL;
+}
+
+static
+void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock)
+{
+    ks_mutex_lock(lock->mutex);
+    --lock->write_count;
+    assert(lock->write_count==0);
+    ks_cond_broadcast(lock->rcond);   
+    ks_mutex_unlock(lock->mutex);
+}
+
+
 static char* ks_dhtrt_printableid(const unsigned char* id, char* buffer)
 {
    char* t = buffer;