ks_time_t tyme;
uint8_t id[KS_DHT_NODEID_SIZE];
ks_dht_node_t *gptr; /* ptr to peer */
- enum ks_dht_nodetype_t type;
- enum ks_afflags_t family;
uint8_t inuse;
uint8_t outstanding_pings;
uint8_t flags; /* active, suspect, expired */
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
ks_dht_t *dht;
- ks_thread_pool_t *tpool;
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_time_t last_process_table;
ks_time_t next_process_table_delta;
ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
static
char *ks_dhtrt_printableid(uint8_t *id, char *buffer);
-static
-unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t *entry);
static
uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query);
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
- ks_pool_t *pool,
- ks_thread_pool_t* tpool)
+ ks_pool_t *pool)
{
(void)ks_dhtrt_find_relatedbucketheader;
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
ks_rwl_create(&internal->lock, pool);
- internal->tpool = tpool;
internal->dht = dht;
internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
assert(header != NULL); /* should always find a header */
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
+
if (bentry != 0) {
bentry->tyme = ks_time_now_sec();
tnode = ks_dhtrt_make_node(table);
tnode->table = table;
+ enum ks_afflags_t family;
+
for (int i = 0; i < 5; ++i) {
if (ip[i] == ':') {
- tnode->family = AF_INET6; break;
+ family = AF_INET6; break;
} else if (ip[i] == '.') {
- tnode->family = AF_INET; break;
+ family = AF_INET; break;
}
}
memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE);
tnode->type = type;
- if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
+ if (( ks_addr_set(&tnode->addr, ip, port, family) != KS_STATUS_SUCCESS) ||
( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) {
ks_pool_free(table->pool, &tnode);
ks_rwl_read_unlock(internal->lock);
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
-#ifdef KS_DHT_DEBUGPRINTFX_
+#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count);
#endif
uint32_t threshold = KS_DHTRT_RECYCLE_NODE_THRESHOLD;
if (all) {
- threshold = 1;
+ threshold = 0;
}
while(internal->deleted_count > threshold && deleted) {
ks_dht_node_t* node = deleted->node;
#ifdef KS_DHT_DEBUGPRINTFX_
- ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted : try write lock\n");
#endif
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
deleted = deleted->next;
ks_pool_free(table->pool, &temp);
--internal->deleted_count;
-#ifdef KS_DHT_DEBUGPRINTF_
- ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count);
+#ifdef KS_DHT_DEBUGPRINTFX__
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count reduced to %d\n", internal->deleted_count);
#endif
if (prev != NULL) {
prev->next = deleted;
}
else {
-#ifdef KS_DHT_DEBUGPRINTFX_
- ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
+#ifdef KS_DHT_DEBUGPRINTF_
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted : try write lock failed\n");
#endif
prev = deleted;
deleted = prev->next;
b->entries[ix].flags,
b->entries[ix].outstanding_pings,
n->type,
- n->family,
+ n->addr.family,
buffer);
}
else {
/* move it to the left */
memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_NODEID_SIZE);
dest->entries[lix].gptr = source->entries[rix].gptr;
- dest->entries[lix].family = source->entries[rix].family;
- dest->entries[lix].type = source->entries[rix].type;
dest->entries[lix].inuse = 1;
++lix;
++dest->count;
if ( free<KS_DHT_BUCKETSIZE ) {
bucket->entries[free].inuse = 1;
bucket->entries[free].gptr = node;
- bucket->entries[free].type = node->type;
- bucket->entries[free].family = node->family;
bucket->entries[free].tyme = ks_time_now_sec();
bucket->entries[free].flags = DHTPEER_DUBIOUS;
for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
- if ( bucket->entries[ix].inuse == 1 && /* in use */
- bucket->entries[ix].flags == DHTPEER_ACTIVE && /* not dubious or expired */
- (family == ifboth || bucket->entries[ix].family == family) && /* match if family */
- (bucket->entries[ix].type & type) && /* match type */
- ks_dhtrt_isactive( &(bucket->entries[ix])) ) {
+ if ( bucket->entries[ix].inuse == 1 && /* in use */
+ bucket->entries[ix].flags == DHTPEER_ACTIVE && /* not dubious or expired */
+ (family == ifboth || bucket->entries[ix].gptr->addr.family == family) && /* match if family */
+ (bucket->entries[ix].gptr->type & type) ) { /* match type */
/* calculate xor value */
ks_dhtrt_xor(bucket->entries[ix].id, id, xorvalue );
deleted->next = internal->deleted_node;
internal->deleted_node = deleted; /* add to deleted queue */
++internal->deleted_count;
-#ifdef KS_DHT_DEBUGPRINTFX_
+#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC: Queue for delete %d\n", internal->deleted_count);
#endif
ks_mutex_unlock(internal->deleted_node_lock);
printf("**** testbuckets - test01 start\n"); fflush(stdout);
ks_dhtrt_routetable_t *rt;
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool);
ks_dhtrt_deinitroute(&rt);
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool);
ks_dht_nodeid_t nodeid, homeid;
memset(homeid.id, 0xdd, KS_DHT_NODEID_SIZE);
homeid.id[19] = 0;
for (int i=0; i<200; ++i) {
if (i%10 == 0) {
- ++nodeid.id[0];
- nodeid.id[1] = 0;
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
- if (s0 == KS_STATUS_SUCCESS) {
- ks_dhtrt_touch_node(rt, nodeid);
- ++ipv4_remote;
+ if (s0 == KS_STATUS_SUCCESS) {
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_remote;
}
}
for (int i=0; i<2; ++i) {
- if (i%10 == 0) {
- ++nodeid.id[0];
- nodeid.id[1] = 0;
+ if (i%10 == 0) {
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
if (s0 == KS_STATUS_SUCCESS) {
- ks_dhtrt_touch_node(rt, nodeid);
- ++ipv4_local;
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_local;
}
}
for (int i=0; i<201; ++i) {
if (i%10 == 0) {
- ++nodeid.id[0];
- nodeid.id[1] = 0;
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
for (int i=0,i2=0,i3=0; i<10000; ++i, ++i2, ++i3) {
if (i%20 == 0) {
- nodeid.id[0] = nodeid.id[0] / 2;
- if (i2%20 == 0) {
- nodeid.id[1] = nodeid.id[1] / 2;
- i2 = 0;
- if (i3%20 == 0) {
- nodeid.id[2] = nodeid.id[2] / 2;
- }
- }
- else {
- ++nodeid.id[3];
- }
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if (i2%20 == 0) {
+ nodeid.id[1] = nodeid.id[1] / 2;
+ i2 = 0;
+ if (i3%20 == 0) {
+ nodeid.id[2] = nodeid.id[2] / 2;
+ }
+ }
+ else {
+ ++nodeid.id[3];
+ }
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
ks_dhtrt_touch_node(rt, g_nodeid1);
ks_dht_node_t *peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=2
- peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=3
- peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
+ peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=3
+ peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4
ks_dhtrt_release_node(peer2); //lock=3
ks_dhtrt_sharelock_node(peer2); //lock=4
return;
}
+void test07()
+{
+ printf("**** testbuckets - test07 start\n"); fflush(stdout);
+
+ ks_dht_node_t *peer;
+ memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
+ memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ char ipv6[] = "1234:1234:1234:1234";
+ char ipv4[] = "123.123.123.123";
+ unsigned short port = 7000;
+
+ /* build a delete queue */
+
+ for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
+ if (i0%20 == 0) {
+ g_nodeid2.id[0]>>=1;
+ }
+ else {
+ ++ g_nodeid2.id[19];
+ }
+ ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, &peer);
+ ks_dhtrt_touch_node(rt, g_nodeid2);
+ ks_dhtrt_release_node(peer);
+ }
+
+ memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+ for (int i0=0, i1=0; i0<150; ++i0, ++i1) {
+ if (i0%20 == 0) {
+ g_nodeid2.id[0]>>=1;
+ }
+ else {
+ ++ g_nodeid2.id[19];
+ }
+ ks_dht_node_t* n = ks_dhtrt_find_node(rt, g_nodeid2);
+ ks_dhtrt_release_node(n);
+ ks_dhtrt_delete_node(rt, n);
+ }
+
+ ks_dhtrt_process_table(rt);
+
+ printf("**** test07 should delete 100 nodes, leaving 50\n"); fflush(stdout);
+ printf("**** testbuckets - test07 ended\n"); fflush(stdout);
+}
static int gindex = 1;
ks_sleep(10000);
for (int i=0; i<query.count; ++i) {
- ks_dhtrt_release_node(query.nodes[i]);
- ks_sleep(10000);
+ ks_dhtrt_release_node(query.nodes[i]);
+ ks_sleep(10000);
}
ks_sleep(2000000);
for (int loop=0; loop<test60loops; ++loop) {
for (int i=0; i<test60nodes; ++i) {
- ++nodeid.id[19];
- ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
- ks_sleep(1000);
- ks_dhtrt_touch_node(rt, nodeid);
+ ++nodeid.id[19];
+ ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
+ ks_sleep(1000);
+ ks_dhtrt_touch_node(rt, nodeid);
}
for (int i=0; i<test60nodes; ++i) {
- peer = ks_dhtrt_find_node(rt, nodeid);
- if (peer) {
- ks_dhtrt_delete_node(rt, peer);
- ks_sleep(400);
- }
- --nodeid.id[19];
+ peer = ks_dhtrt_find_node(rt, nodeid);
+ if (peer) {
+ ks_dhtrt_delete_node(rt, peer);
+ ks_sleep(400);
+ }
+ --nodeid.id[19];
}
}
for (int i=0; i<200; ++i) {
if (i%10 == 0) {
- ++nodeid.id[0];
- nodeid.id[1] = 0;
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
if (s0 == KS_STATUS_SUCCESS) {
- ks_dhtrt_touch_node(rt, nodeid);
- ++ipv4_remote;
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_remote;
}
}
for (int i=0; i<2; ++i) {
if (i%10 == 0) {
- ++nodeid.id[0];
- nodeid.id[1] = 0;
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
if (s0 == KS_STATUS_SUCCESS) {
- ks_dhtrt_touch_node(rt, nodeid);
- ++ipv4_local;
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_local;
}
}
for (int i=0; i<201; ++i) {
if (i%10 == 0) {
- ++nodeid.id[0];
- nodeid.id[1] = 0;
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
for (int i=0,i2=0; i<200; ++i, ++i2) {
if (i%20 == 0) {
- nodeid.id[0] = nodeid.id[0] / 2;
- if (i2%20 == 0) {
- i2 = 0;
- nodeid.id[1] = nodeid.id[1] / 2;
- }
- else {
- ++nodeid.id[2];
- }
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if (i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
for (int i=0,i2=0; i<200; ++i, ++i2) {
- if (i%20 == 0) {
- nodeid.id[0] = nodeid.id[0] / 2;
- if (i2%20 == 0) {
- i2 = 0;
- nodeid.id[1] = nodeid.id[1] / 2;
- }
- else {
- ++nodeid.id[2];
- }
+ if (i%20 == 0) {
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if (i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dht_node_t *n = ks_dhtrt_find_node(rt, nodeid);
if (n != NULL) {
- ks_dhtrt_release_node(n);
- ks_dhtrt_delete_node(rt, n);
+ ks_dhtrt_release_node(n);
+ ks_dhtrt_delete_node(rt, n);
}
}
for (int i=0,i2=0; i<200; ++i, ++i2) {
if (i%20 == 0) {
- nodeid.id[0] = nodeid.id[0] / 2;
- if (i2%20 == 0) {
- i2 = 0;
- nodeid.id[1] = nodeid.id[1] / 2;
- }
- else {
- ++nodeid.id[2];
- }
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if (i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
for (int i=0,i2=0; i<2; ++i, ++i2) {
if (i%20 == 0) {
- nodeid.id[0] = nodeid.id[0] / 2;
- if (i2%20 == 0) {
- i2 = 0;
- nodeid.id[1] = nodeid.id[1] / 2;
- }
- else {
- ++nodeid.id[2];
- }
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if (i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
}
else {
- ++nodeid.id[1];
+ ++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
int tests[100];
if (argc == 0) {
- tests[0] = 1;
- tests[1] = 2;
- tests[2] = 3;
- tests[3] = 4;
- tests[4] = 5;
+ tests[0] = 1;
+ tests[1] = 2;
+ tests[2] = 3;
+ tests[3] = 4;
+ tests[4] = 5;
}
else {
for(int tix=1; tix<100 && tix<argc; ++tix) {
- long i = strtol(argv[tix], NULL, 0);
- tests[tix] = i;
+ long i = strtol(argv[tix], NULL, 0);
+ tests[tix] = i;
}
}
printf("init/deinit routeable\n"); fflush(stdout);
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
+ ks_dhtrt_initroute(&rt, dht, pool);
ks_dhtrt_deinitroute(&rt);
for (int tix=0; tix<argc; ++tix) {
if (tests[tix] == 1) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test01();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test01();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 2) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test02();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test02();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 3) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test03();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test03();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 4) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test04();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test04();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 5) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test05();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test05();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 6) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test06();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test06();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
+ if (tests[tix] == 7) {
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test07();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
if (tests[tix] == 30) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test30();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test30();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 50) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test50();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test50();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 51) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test51();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test51();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 60) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- test60();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test60();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}
if (tests[tix] == 99) {
- ks_dhtrt_initroute(&rt, dht, pool, tpool);
- //testnodelocking();
- ks_dhtrt_deinitroute(&rt);
- continue;
+ ks_dhtrt_initroute(&rt, dht, pool);
+ //testnodelocking();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
}