ks_time_t last_process_table;
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
- ks_dhtrt_deletednode_t *free_nodes;
+ ks_dhtrt_deletednode_t *free_node_ex;
uint32_t deleted_count;
} ks_dhtrt_internal_t;
/* debugging */
#define KS_DHT_DEBUGPRINTF_
-/* # define KS_DHT_DEBUGPRINTFX_ very verbose */
-/* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */
-
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool)
+/* very verbose */
+/* # define KS_DHT_DEBUGPRINTFX_ */
+/* debug locking */
+/* # define KS_DHT_DEBUGLOCKPRINTF_ */
+
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
+ ks_pool_t *pool,
+ ks_thread_pool_t* tpool)
{
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
unsigned short port,
ks_dht_node_t **node)
{
+ ks_dht_node_t *tnode;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab write lock and insert */
bentry->flags = DHTPEER_ACTIVE;
}
- (*node) = bentry->gptr;
+ tnode = bentry->gptr;
+ ks_rwl_read_lock( tnode->reflock);
ks_rwl_read_unlock(internal->lock);
+ (*node) = tnode;
return KS_STATUS_SUCCESS;
}
ks_rwl_read_unlock(internal->lock);
- ks_dht_node_t *tnode = ks_dhtrt_make_node(table);
+ tnode = ks_dhtrt_make_node(table);
tnode->table = table;
for (int i = 0; i < 5; ++i) {
}
ks_status_t s = ks_dhtrt_insert_node(table, tnode);
-
+
+ if (tnode && s == KS_STATUS_SUCCESS) {
+ ks_rwl_read_lock( tnode->reflock);
+ }
+
(*node) = tnode;
return s;
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
- ks_log(KS_LOG_DEBUG, "Insert node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+ ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
ks_rwl_read_lock(node->reflock);
}
#ifdef KS_DHT_DEBUGLOCKPRINTF_
- ks_log(KS_LOG_DEBUG, "Insert node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
+ ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
ks_rwl_read_unlock(bucket->lock);
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
+#ifdef KS_DHT_DEBUGPRINTF_
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count);
+#endif
+
+
/* reclaim excess memory */
while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
ks_dht_node_t* node = deleted->node;
+#ifdef KS_DHT_DEBUGPRINTFX__
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
+#endif
+
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, &node);
deleted = deleted->next;
ks_pool_free(table->pool, &temp);
--internal->deleted_count;
-
+#ifdef KS_DHT_DEBUGPRINTFX_
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count);
+#endif
if (prev != NULL) {
prev->next = deleted;
}
}
else {
+#ifdef KS_DHT_DEBUGPRINTFX__
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
+#endif
prev = deleted;
deleted = prev->next;
}
}
+
+#ifdef KS_DHT_DEBUGPRINTF_
+ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted exit: internal->deleted_count %d\n", internal->deleted_count);
+#endif
+
ks_mutex_unlock(internal->deleted_node_lock);
}
memset(buffer, 0, 100);
if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
else strcpy(buffer, "<free>");
- ks_log(KS_LOG_DEBUG, " slot %d: %s\n", ix, buffer);
+ ks_log(KS_LOG_DEBUG, " slot %d: %d %s\n", ix, b->entries[ix].flags, buffer);
}
ks_log(KS_LOG_DEBUG, " --------------------------\n\n");
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
- ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer));
- if (parent) ks_log(KS_LOG_DEBUG, "from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer));
+ ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer));
+ if (parent) ks_log(KS_LOG_DEBUG, " ... from parent mask: %s\n", ks_dhtrt_printableid(parent->mask, buffer));
printf("\n");
#endif
return header;
char buffer[100];
ks_log(KS_LOG_DEBUG, "\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer));
ks_log(KS_LOG_DEBUG, " into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count);
- ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
+ ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
#endif
return;
}
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGPRINTFX_
char bufferx[100];
- if ( bucket->entries[ix].inuse == 1) {
+ if ( bucket->entries[ix].inuse == 1 && bucket->entries[ix].flags == DHTPEER_ACTIVE ) {
ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%x\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse );
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGPRINTFX_
- char bufferx[100];_
+ char bufferx[100];
ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%c\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse );
{
ks_dhtrt_internal_t* internal = table->internal;
ks_mutex_lock(internal->deleted_node_lock);
- ks_dhtrt_deletednode_t* deleted = internal->free_nodes;
+ ks_dhtrt_deletednode_t* deleted = internal->free_node_ex; /* grab a free stub */
if (deleted) {
- internal->free_nodes = deleted->next;
+ internal->free_node_ex = deleted->next;
}
else {
deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
}
deleted->node = node;
- deleted->next = internal->deleted_node;
- internal->deleted_node = deleted;
+ deleted->next = internal->deleted_node;
+ internal->deleted_node = deleted; /* add to deleted queue */
++internal->deleted_count;
+#ifdef KS_DHT_DEBUGPRINTFX_
+ ks_log(KS_LOG_DEBUG, "ALLOC: Queue for delete %d\n", internal->deleted_count);
+#endif
ks_mutex_unlock(internal->deleted_node_lock);
}
/* to to reuse a deleted node */
if (internal->deleted_count) {
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
- node = deleted->node;
+ node = deleted->node; /* take the node */
memset(node, 0, sizeof(ks_dht_node_t));
+ deleted->node = 0; /* avoid accidents */
internal->deleted_node = deleted->next;
- deleted->next = internal->free_nodes;
- internal->free_nodes = deleted;
- --internal->deleted_count;
+ deleted->next = internal->free_node_ex; /* save the stub for reuse */
+ --internal->deleted_count;
+#ifdef KS_DHT_DEBUGPRINTFX_
+ ks_log(KS_LOG_DEBUG, "ALLOC: Reusing a node struct %d\n", internal->deleted_count);
+#endif
}
ks_mutex_unlock(internal->deleted_node_lock);
}
+/* test resue of node memory */
+void test50()
+{
+ printf("*** testbuckets - test50 start\n"); fflush(stdout);
+
+ ks_dht_node_t* peer;
+ ks_dht_nodeid_t nodeid, nodeid2;
+ memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
+ memset(nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ char ipv6[] = "1234:1234:1234:1234";
+ char ipv4[] = "123.123.123.123";
+ unsigned short port = 7000;
+ enum ks_afflags_t both = ifboth;
+
+ ks_status_t status;
+
+ for (int i=0,i2=0; i<200; ++i, ++i2) {
+ if (i%20 == 0) {
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if(i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
+ }
+ else {
+ ++nodeid.id[1];
+ }
+ ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+ ks_dhtrt_touch_node(rt, nodeid);
+ }
+
+ memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
+ for (int i=0,i2=0; i<200; ++i, ++i2) {
+ if (i%20 == 0) {
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if(i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
+ }
+ else {
+ ++nodeid.id[1];
+ }
+ ks_dht_node_t *n = ks_dhtrt_find_node(rt, nodeid);
+ if (n != NULL) {
+ ks_dhtrt_release_node(n);
+ ks_dhtrt_delete_node(rt, n);
+ }
+ }
+
+ ks_dhtrt_process_table(rt);
+
+ memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ for (int i=0,i2=0; i<200; ++i, ++i2) {
+ if (i%20 == 0) {
+ nodeid.id[0] = nodeid.id[0] / 2;
+ if(i2%20 == 0) {
+ i2 = 0;
+ nodeid.id[1] = nodeid.id[1] / 2;
+ }
+ else {
+ ++nodeid.id[2];
+ }
+ }
+ else {
+ ++nodeid.id[1];
+ }
+ ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+ ks_dhtrt_touch_node(rt, nodeid);
+ }
+
+ printf("*** testbuckets - test50 start\n"); fflush(stdout);
+ return;
+}
+
+
+
int main(int argc, char* argv[]) {
printf("testdhtbuckets - start\n");
- int tests[10];
+ int tests[100];
if (argc == 0) {
+ tests[0] = 1;
tests[1] = 1;
tests[2] = 1;
tests[3] = 1;
tests[4] = 1;
- tests[5] = 1;
- tests[6] = 0;
- tests[7] = 0;
- tests[8] = 0;
- tests[9] = 0;
}
else {
- for(int tix=1; tix<10 && tix<argc; ++tix) {
+ for(int tix=1; tix<100 && tix<argc; ++tix) {
long i = strtol(argv[tix], NULL, 0);
- tests[i] = 1;
+ tests[tix] = i;
}
}
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_deinitroute(&rt);
- if (tests[1] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
- test01();
- ks_dhtrt_deinitroute(&rt);
- }
- if (tests[2] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
- test02();
- ks_dhtrt_deinitroute(&rt);
- }
+ for(int tix=0; tix<argc; ++tix) {
+
+ if (tests[tix] == 1) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test01();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 2) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test02();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 3) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test03();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 4) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test04();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 5) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test05();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 6) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test06();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 50) {
+ ks_dhtrt_initroute(&rt, pool, tpool);
+ test50();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
- if (tests[3] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
- test03();
- ks_dhtrt_deinitroute(&rt);
- }
- if (tests[4] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
- test04();
- ks_dhtrt_deinitroute(&rt);
- }
- if (tests[5] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
- test05();
- ks_dhtrt_deinitroute(&rt);
- }
- if (tests[6] == 1) {
- ks_dhtrt_initroute(&rt, pool, tpool);
- test06();
- ks_dhtrt_deinitroute(&rt);
}
+
+
+
return 0;
}