#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120)
-#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 0
+#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
/* peer flags */
#define DHTPEER_DUBIOUS 0
static
void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node);
static
-void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table);
+void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all);
static
ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table);
/* very verbose */
/* # define KS_DHT_DEBUGPRINTFX_ */
/* debug locking */
-#define KS_DHT_DEBUGLOCKPRINTF_
+/* #define KS_DHT_DEBUGLOCKPRINTF_ */
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table)
+KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **tableP)
{
/* @todo*/
+ ks_dhtrt_routetable_t* table = *tableP;
+
+ if (!table || !table->internal) {
+ return;
+ }
+
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_rwl_write_lock(internal->lock); /* grab write lock */
+ ks_dhtrt_process_deleted(table, 1);
+ table->internal = NULL; /* make sure no other threads get in */
+ ks_pool_t *pool = table->pool;
- ks_pool_t *pool = (*table)->pool;
- ks_pool_free(pool, &(*table));
+ ks_dhtrt_bucket_header_t *header = internal->buckets;
+ ks_dhtrt_bucket_header_t *last_header;
+ ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
+ int stackix=0;
+
+ while (header) {
+ stack[stackix++] = header;
+
+ if (header->bucket) {
+ ks_pool_free(pool, &header->bucket);
+ header->bucket = NULL;
+ }
+ last_header = header;
+ header = header->left;
+
+ if (header == 0 && stackix > 1) {
+ stackix -= 2;
+ header = stack[stackix];
+ header = header->right;
+#ifdef KS_DHT_DEBUGLOCKPRINTFX_
+ char buf[100];
+ ks_log(KS_LOG_DEBUG,"deinit: freeing bucket header %s\n", ks_dhtrt_printableid(last_header->mask, buf));
+#endif
+ ks_pool_free(pool, &last_header);
+ }
+ }
+
+ ks_pool_free(pool, &internal);
+ ks_pool_free(pool, &table);
+ *tableP = NULL;
return;
}
unsigned short port,
ks_dht_node_t **node)
{
+ if (!table || !table->internal) {
+ return KS_STATUS_FAIL;
+ }
+
ks_dht_node_t *tnode;
ks_dhtrt_internal_t* internal = table->internal;
+
ks_rwl_read_lock(internal->lock); /* grab write lock and insert */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
+ if (!table || !table->internal) {
+ return KS_STATUS_FAIL;
+ }
+
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
+
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
+ if (!table || !table->internal) {
+ return KS_STATUS_FAIL;
+ }
+
ks_dhtrt_internal_t* internal = table->internal;
- ks_dhtrt_bucket_t *bucket = 0;
+ ks_dhtrt_bucket_t *bucket = 0;
+
int insanity = 0;
ks_rwl_write_lock(internal->lock);
KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
+ if (!table || !table->internal) {
+ return NULL;
+ }
- ks_dht_node_t* node = NULL;
-
+ ks_dht_node_t* node = NULL;
ks_dhtrt_internal_t* internal = table->internal;
+
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
+ if (!table || !table->internal) {
+ return KS_STATUS_FAIL;
+ }
+
ks_status_t s = KS_STATUS_FAIL;
- ks_dhtrt_internal_t* internal = table->internal;
+ ks_dhtrt_internal_t* internal = table->internal;
+
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
+ if (!table || !table->internal) {
+ return KS_STATUS_FAIL;
+ }
+
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t *internal = table->internal;
+
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
{
- uint8_t count = 0;
- ks_dhtrt_internal_t *internal = table->internal;
+ if (!table || !table->internal) {
+ return KS_STATUS_FAIL;
+ query->count = 0;
+ }
+
+ uint8_t count = 0;
+ ks_dhtrt_internal_t *internal = table->internal;
+
ks_rwl_read_lock(internal->lock); /* grab read lock */
count = ks_dhtrt_findclosest_locked_nodes(table, query);
ks_rwl_read_unlock(internal->lock); /* release read lock */
/* inactive again it is considered inactive */
/* */
- ks_dhtrt_internal_t *internal = table->internal;
+ if (!table || !table->internal) {
+ return;
+ }
+
+ ks_dhtrt_internal_t *internal = table->internal;
int ping_count = 0;
ks_time_t t0 = ks_time_now_sec();
if ( e->flags != DHTPEER_EXPIRED &&
e->outstanding_pings >= KS_DHTRT_MAXPING ) {
#ifdef KS_DHT_DEBUGPRINTF_
+ char buf1[100];
ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
- ks_dhtrt_printableid(e->id, buf));
+ ks_dhtrt_printableid(e->id, buf1));
#endif
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
} /* end for each bucket_entry */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
- char buf1[100];
- ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
+ char buf[100];
+ ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
#endif
ks_rwl_write_unlock(b->lock);
} /* end of if trywrite_lock successful */
else {
#ifdef KS_DHT_DEBUGPRINTF_
- char buf2[100];
- ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
+ char buf1[100];
+ ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
#endif
}
}
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
- ks_dhtrt_process_deleted(table);
+ ks_dhtrt_process_deleted(table, 0);
if (ping_count == 0) {
internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
return;
}
-void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
+void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_mutex_lock(internal->deleted_node_lock);
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count);
#endif
/* reclaim excess memory */
- printf("%d %d %p\n", internal->deleted_count, KS_DHTRT_RECYCLE_NODE_THRESHOLD, (void*)deleted); fflush(stdout);
+ uint32_t threshold = KS_DHTRT_RECYCLE_NODE_THRESHOLD;
+
+ if (all) {
+ threshold = 1;
+ }
- while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
+ while(internal->deleted_count > threshold && deleted) {
ks_dht_node_t* node = deleted->node;
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
#endif
}
else {
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
#endif
prev = deleted;