/* change for testing */
#define KS_DHT_BUCKETSIZE 20
-#define KS_DHTRT_INACTIVETIME (15*60)
+#define KS_DHTRT_INACTIVETIME (10*60)
+#define KS_DHTRT_EXPIREDTIME (15*60)
#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
+#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120)
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
/* peer flags */
#define DHTPEER_EXPIRED 1
#define DHTPEER_ACTIVE 2
+
typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
/* internal structures */
struct ks_dhtrt_bucket_header_s * left;
struct ks_dhtrt_bucket_header_s * right;
ks_dhtrt_bucket_t * bucket;
- ks_dhtrt_bucket_t * bucketv6;
ks_time_t tyme; /* last processed time */
unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */
unsigned char flags;
typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
+ ks_dht_t *dht;
ks_thread_pool_t *tpool;
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_time_t last_process_table;
+ ks_time_t next_process_table_delta;
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
ks_dhtrt_deletednode_t *free_node_ex;
static
ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *table, ks_dhtrt_nodeid_t id);
static
+ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id);
+static
ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id);
static
unsigned int max);
static
-void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
+void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
/* very verbose */
/* # define KS_DHT_DEBUGPRINTFX_ */
/* debug locking */
-/* # define KS_DHT_DEBUGLOCKPRINTF_ */
+#define KS_DHT_DEBUGLOCKPRINTF_
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
- ks_pool_t *pool,
- ks_thread_pool_t* tpool)
+ ks_dht_t *dht,
+ ks_pool_t *pool,
+ ks_thread_pool_t* tpool)
{
+(void)ks_dhtrt_find_relatedbucketheader;
+
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
ks_rwl_create(&internal->lock, pool);
internal->tpool = tpool;
+ internal->dht = dht;
+ internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
table->internal = internal;
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //printf("delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_lock(bucket->lock);
s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(bucket->lock);
/* */
ks_dhtrt_internal_t *internal = table->internal;
+ int ping_count = 0;
ks_time_t t0 = ks_time_now_sec();
- if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
+ /*
+ printf("process_table: %" PRId64 " %" PRId64 "\n", t0 - internal->last_process_table, internal->next_process_table_delta);
+ */
+
+ if (t0 - internal->last_process_table < internal->next_process_table_delta) {
return;
}
+ internal->last_process_table = t0;
+
ks_log(KS_LOG_DEBUG,"process_table in progress\n");
ks_rwl_read_lock(internal->lock); /* grab read lock */
/* more than n pings outstanding? */
- if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
+ if (e->flags == DHTPEER_DUBIOUS) {
+ continue;
+ }
+
+ if ( e->flags != DHTPEER_EXPIRED &&
+ e->outstanding_pings >= KS_DHTRT_MAXPING ) {
+#ifdef KS_DHT_DEBUGPRINTF_
+ ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
+ ks_dhtrt_printableid(e->id, buf));
+#endif
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
continue;
}
- if (e->flags == DHTPEER_DUBIOUS) {
- ks_dhtrt_ping(e);
- continue;
+ /* if there are any outstanding pings - send another */
+ if (e->outstanding_pings > 0) {
+ ks_dhtrt_ping(internal, e);
+ ++ping_count;
+ continue;
}
ks_time_t tdiff = t0 - e->tyme;
- if (tdiff > KS_DHTRT_INACTIVETIME) {
- e->flags = DHTPEER_DUBIOUS;
- ks_dhtrt_ping(e);
+ if (tdiff > KS_DHTRT_EXPIREDTIME) {
+ e->flags = DHTPEER_DUBIOUS; /* mark as dubious */
+ ks_dhtrt_ping(internal, e); /* final effort to activate */
+ continue;
+ }
+
+ if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */
+ ks_dhtrt_ping(internal, e); /* kick */
+ ++ping_count;
+ continue;
}
} /* end if not local */
} /* end for each bucket_entry */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
- char buf1[100];
- ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
+ char buf1[100];
+ ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
#endif
ks_rwl_write_unlock(b->lock);
ks_dhtrt_process_deleted(table);
+ if (ping_count == 0) {
+ internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
+ }
+ else {
+ internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_SHORTINTERVAL;
+ }
+ ks_log(KS_LOG_DEBUG,"process_table complete\n");
+
return;
}
memset(buffer, 0, 100);
if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
else strcpy(buffer, "<free>");
- ks_log(KS_LOG_DEBUG, " slot %d: %d %s\n", ix, b->entries[ix].flags, buffer);
+ ks_log(KS_LOG_DEBUG, " slot %d: %d %d %s\n", ix,
+ b->entries[ix].flags,
+ b->entries[ix].outstanding_pings,
+ buffer);
}
ks_log(KS_LOG_DEBUG, " --------------------------\n\n");
char buffer[100];
ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer));
if (parent) ks_log(KS_LOG_DEBUG, " ... from parent mask: %s\n", ks_dhtrt_printableid(parent->mask, buffer));
- printf("\n");
#endif
return header;
}
return NULL;
}
+static
+ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id)
+{
+ /*
+ using the passed bucket header as a starting point find the right bucket.
+ This is a shortcut used in query to shorten the search path for queries extending beyond a single bucket.
+ */
+
+ while (header) {
+ if ( header->bucket ) {
+ return header;
+ }
+
+ /* left hand side is more restrictive (closer) so should be tried first */
+ if (header->left != 0 && (ks_dhtrt_ismasked(id, header->left->mask))) {
+ header = header->left;
+ } else {
+ header = header->right;
+ }
+ }
+
+ return NULL;
+}
+
+
+
static
ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t nodeid)
{
return node;
}
-void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
+void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry) {
++entry->outstanding_pings;
- /* @todo */
- /* set the appropriate command in the node and queue if for processing */
- /*ks_dht_node_t *node = entry->gptr; */
- /* ++entry->outstanding_pings; */
#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
- printf(" ping queued for nodeid %s count %d\n",
+ ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n",
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
#endif
+ ks_dht_node_t* node = entry->gptr;
+ ks_dht_ping(internal->dht, &node->addr, NULL);
+
return;
}
//#include "ks.h"
#include "../src/dht/ks_dht.h"
+ks_dht_t* dht;
ks_dhtrt_routetable_t* rt;
ks_pool_t* pool;
ks_thread_pool_t* tpool;
printf("*** testbuckets - test01 start\n"); fflush(stdout);
ks_dhtrt_routetable_t* rt;
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
ks_dhtrt_deinitroute(&rt);
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
ks_dht_nodeid_t nodeid, homeid;
memset(homeid.id, 0xdd, KS_DHT_NODEID_SIZE);
homeid.id[19] = 0;
ks_status_t status;
- for (int i=0,i2=0; i<10000; ++i) {
- if (i%40 == 0) {
- ++nodeid.id[0];
- if(i2%40 == 0) {
- ++nodeid.id[1];
+ for (int i=0,i2=0,i3=0; i<10000; ++i, ++i2, ++i3) {
+ if (i%20 == 0) {
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if(i2%20 == 0) {
+ nodeid.id[1] = nodeid.id[1] / 2;
+ i2 = 0;
+ if(i3%20 == 0) {
+ nodeid.id[2] = nodeid.id[2] / 2;
+ }
}
else {
- ++nodeid.id[2];
+ ++nodeid.id[3];
}
}
else {
}
+/* test process_table */
+void test51()
+{
+ printf("*** testbuckets - test51 start\n"); fflush(stdout);
+
+ ks_dht_node_t* peer;
+ ks_dht_nodeid_t nodeid, nodeid2;
+ memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
+ memset(nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ char ipv6[] = "1234:1234:1234:1234";
+ char ipv4[] = "123.123.123.123";
+ unsigned short port = 7000;
+ enum ks_afflags_t both = ifboth;
+
+ ks_status_t status;
+
+ for (int i=0,i2=0; i<2; ++i, ++i2) {
+ if (i%20 == 0) {
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if(i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
+ }
+ else {
+ ++nodeid.id[1];
+ }
+ ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+ ks_dhtrt_touch_node(rt, nodeid);
+ }
+
+ for(int ix=0; ix<50; ++ix) {
+ ks_dhtrt_process_table(rt);
+ ks_sleep(1000 * 1000 * 120);
+ printf("*** pulse ks_dhtrt_process_table\n");
+ if ( ix%2 == 0) ks_dhtrt_dump(rt, 7);
+ }
+
+
+ printf("*** testbuckets - test51 complete\n"); fflush(stdout);
+
+ return;
+}
+
int main(int argc, char* argv[]) {
ks_init();
ks_global_set_default_logger(7);
+ ks_dht_create(&dht, NULL, NULL);
+
- ks_thread_pool_create(&tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+ ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
ks_status_t status;
char *str = NULL;
printf("init/deinit routeable\n"); fflush(stdout);
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
ks_dhtrt_deinitroute(&rt);
for(int tix=0; tix<argc; ++tix) {
if (tests[tix] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test01();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 2) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test02();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 3) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test03();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 4) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test04();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 5) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test05();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 6) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test06();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 50) {
- ks_dhtrt_initroute(&rt, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
test50();
ks_dhtrt_deinitroute(&rt);
continue;
}
+ if (tests[tix] == 51) {
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
+ test51();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }