#define KS_DHTRT_INACTIVETIME (15*60)
#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
-
+#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
/* peer flags */
#define DHTPEER_DUBIOUS 0
uint8_t inuse;
uint8_t outstanding_pings;
uint8_t flags; /* active, suspect, expired */
+ uint8_t touched; /* did we ever get a touch */
} ks_dhtrt_bucket_entry_t;
typedef struct ks_dhtrt_bucket_s {
typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
+ ks_thread_pool_t *tpool;
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_time_t last_process_table;
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
+ ks_dhtrt_deletednode_t *free_nodes;
uint32_t deleted_count;
} ks_dhtrt_internal_t;
static
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table);
+static
+ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table);
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
static
/* # define KS_DHT_DEBUGPRINTFX_ very verbose */
/* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */
-KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool)
+KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool)
{
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
ks_rwl_create(&internal->lock, pool);
+ internal->tpool = tpool;
ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
table->internal = internal;
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (bentry != 0) {
bentry->tyme = ks_time_now_sec();
+
+ if (bentry->touched) {
+ bentry->flags = DHTPEER_ACTIVE;
+ }
+
(*node) = bentry->gptr;
ks_rwl_read_unlock(internal->lock);
return KS_STATUS_SUCCESS;
}
ks_rwl_read_unlock(internal->lock);
- /* @todo - replace with reusable memory pool */
- ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
+ ks_dht_node_t *tnode = ks_dhtrt_make_node(table);
tnode->table = table;
for (int i = 0; i < 5; ++i) {
ks_rwl_write_unlock(bucket->lock);
}
-
}
- ks_rwl_read_unlock(internal->lock); /* release write lock */
- /* at this point no subsequent find/query will return the node - so we can
- safely free it if we can grab the write lock
- Having held the write lock on the bucket we know no other thread
- is awaiting a read/write lock on the node
- */
+ ks_rwl_read_unlock(internal->lock); /* release write lock */
+ /* at this point no subsequent find/query will return the node */
- if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { /* grab exclusive lock on node */
- ks_rwl_destroy(&(node->reflock));
- ks_pool_free(table->pool, &node);
- }
- else {
- ks_dhtrt_queue_node_fordelete(table, node);
- }
+ ks_dhtrt_queue_node_fordelete(table, node);
return s;
}
if (e != 0) {
e->tyme = ks_time_now_sec();
e->outstanding_pings = 0;
+ e->touched = 1;
if (e->flags == DHTPEER_EXPIRED) {
--header->bucket->expired_count;
ks_dhtrt_internal_t *internal = table->internal;
ks_time_t t0 = ks_time_now_sec();
+
if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
return;
}
+ ks_log(KS_LOG_DEBUG,"process_table in progress\n");
+
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = internal->buckets;
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG,"process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100];
ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(b->lock);
#ifdef KS_DHT_DEBUGPRINTF_
char buf2[100];
ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
- //fflush(stdout);
#endif
}
}
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
- while(deleted) {
+ /* reclaim excess memory */
+ while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
ks_dht_node_t* node = deleted->node;
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
deleted = deleted->next;
ks_pool_free(table->pool, &temp);
--internal->deleted_count;
+
if (prev != NULL) {
prev->next = deleted;
}
else {
internal->deleted_node = deleted;
- }
+ }
+
}
else {
prev = deleted;
deleted = prev->next;
}
-
}
-
ks_mutex_unlock(internal->deleted_node_lock);
}
void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node)
{
ks_dhtrt_internal_t* internal = table->internal;
- ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
- deleted->node = node;
- ks_mutex_lock(internal->deleted_node_lock);
- deleted->next = internal->deleted_node;
- internal->deleted_node = deleted;
- ++internal->deleted_count;
- ks_mutex_unlock(internal->deleted_node_lock);
+ ks_mutex_lock(internal->deleted_node_lock);
+ ks_dhtrt_deletednode_t* deleted = internal->free_nodes;
+
+ if (deleted) {
+ internal->free_nodes = deleted->next;
+ }
+ else {
+ deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
+ }
+
+ deleted->node = node;
+ deleted->next = internal->deleted_node;
+ internal->deleted_node = deleted;
+ ++internal->deleted_count;
+ ks_mutex_unlock(internal->deleted_node_lock);
}
+ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
+{
+ ks_dht_node_t *node = NULL;
+ ks_dhtrt_internal_t *internal = table->internal;
+ ks_mutex_lock(internal->deleted_node_lock);
+
+ /* to to reuse a deleted node */
+ if (internal->deleted_count) {
+ ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
+ node = deleted->node;
+ memset(node, 0, sizeof(ks_dht_node_t));
+ internal->deleted_node = deleted->next;
+ deleted->next = internal->free_nodes;
+ internal->free_nodes = deleted;
+ --internal->deleted_count;
+ }
+ ks_mutex_unlock(internal->deleted_node_lock);
+
+ if (!node) {
+ node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
+ }
+
+ return node;
+}
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
++entry->outstanding_pings;