ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE];
uint8_t count;
uint8_t expired_count;
+ ks_rwl_t *lock; /* lock for safe traversal of the entry array */
} ks_dhtrt_bucket_t;
unsigned char flags;
} ks_dhtrt_bucket_header_t;
+typedef struct ks_dhtrt_deletednode_s {
+ ks_dht_node_t* node;
+ struct ks_dhtrt_deletednode_s *next;
+} ks_dhtrt_deletednode_t;
typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
- ks_rwl_t *lock; /* lock for safe traversal of the tree */
+ ks_rwl_t *lock; /* lock for safe traversal of the tree */
+ ks_mutex_t *deleted_node_lock;
+ ks_dhtrt_deletednode_t *deleted_node;
} ks_dhtrt_internal_t;
typedef struct ks_dhtrt_xort_s {
void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor);
static
int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask);
+static
+void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node);
+static
+void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table);
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
ks_rwl_create(&internal->lock, pool);
+ ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
table->internal = internal;
/* initialize root bucket */
unsigned short port,
ks_dht_node_t **node)
{
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_rwl_read_lock(internal->lock); /* grab write lock and insert */
+
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
assert(header != NULL); /* should always find a header */
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (bentry != 0) {
- bentry->type = ks_time_now_sec();
- (*node) = bentry->gptr;
- return KS_STATUS_SUCCESS;
+ bentry->type = ks_time_now_sec();
+ (*node) = bentry->gptr;
+ ks_rwl_read_unlock(internal->lock);
+ return KS_STATUS_SUCCESS;
}
+ ks_rwl_read_unlock(internal->lock);
/* @todo - replace with reusable memory pool */
ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
return KS_STATUS_FAIL;
}
- ks_dhtrt_internal_t* internal = table->internal;
- ks_rwl_write_lock(internal->lock); /* grab write lock and insert */
ks_status_t s = ks_dhtrt_insert_node(table, tnode);
- ks_rwl_write_unlock(internal->lock); /* release write lock */
(*node) = tnode;
{
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
- ks_rwl_write_lock(internal->lock); /* grab write lock and delete */
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
if (header != 0) {
ks_dhtrt_bucket_t *bucket = header->bucket;
if (bucket != 0) { /* we found a bucket*/
+ ks_rwl_write_lock(bucket->lock);
s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
+ ks_rwl_write_unlock(bucket->lock);
}
}
- ks_rwl_write_unlock(internal->lock); /* release write lock */
+ ks_rwl_read_unlock(internal->lock); /* release write lock */
+ /* at this point no subsequent find/query will return the node - so we can
+ safely free it if we can grab the write lock
+ Having held the write lock on the bucket we know no other thread
+ is awaiting a read/write lock on the node
+ */
- ks_rwl_destroy(&(node->reflock));
- ks_pool_free(table->pool, node);
+ if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { /* grab exclusive lock on node */
+ ks_rwl_destroy(&(node->reflock));
+ ks_pool_free(table->pool, node);
+ }
+ else {
+ ks_dhtrt_queue_node_fordelete(table, node);
+ }
return s;
}
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
+ ks_dhtrt_internal_t* internal = table->internal;
ks_dhtrt_bucket_t *bucket = 0;
int insanity = 0;
+ ks_rwl_write_lock(internal->lock);
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
assert(header != NULL); /* should always find a header */
bucket = header->bucket;
if (bucket == 0) {
+ ks_rwl_write_unlock(internal->lock);
return KS_STATUS_FAIL; /* we were not able to find a bucket*/
}
+
+ ks_rwl_write_lock(bucket->lock);
while (bucket->count == KS_DHT_BUCKETSIZE) {
if (insanity > 3200) assert(insanity < 3200);
/* first - seek a stale entry to eject */
if (bucket->expired_count) {
ks_status_t s = ks_dhtrt_insert_id(bucket, node);
-
- if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
+ if (s == KS_STATUS_SUCCESS) {
+ ks_rwl_write_unlock(bucket->lock);
+ ks_rwl_write_unlock(internal->lock);
+ return KS_STATUS_SUCCESS;
+ }
}
/*
char buffer[100];
printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
#endif
+ ks_rwl_write_unlock(bucket->lock);
+ ks_rwl_write_unlock(internal->lock);
return KS_STATUS_FAIL;
}
char buffer[100];
printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
#endif
+ ks_rwl_write_unlock(bucket->lock);
+ ks_rwl_write_unlock(internal->lock);
return KS_STATUS_FAIL;
}
/* which bucket do care about */
if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) {
bucket = newleft->bucket;
+ ks_rwl_write_lock(bucket->lock); /* lock new bucket */
+ ks_rwl_write_unlock(header->right->bucket->lock); /* unlock old bucket */
header = newleft;
} else {
bucket = newright->bucket;
+ /* note: we still hold a lock on the bucket */
header = newright;
}
++insanity;
printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
- /* by this point we have a viable bucket */
- return ks_dhtrt_insert_id(bucket, node);
+ /* by this point we have a viable & locked bucket
+ so downgrade the internal lock to read. safe as we hold the bucket write lock
+ preventing it being sptlit under us.
+ */
+ ks_rwl_write_unlock(internal->lock);
+ ks_rwl_read_lock(internal->lock);
+
+ ks_status_t s = ks_dhtrt_insert_id(bucket, node);
+ ks_rwl_read_unlock(internal->lock);
+ ks_rwl_write_unlock(bucket->lock);
+ return s;
}
-KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) {
+KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
+{
+
+ ks_dht_node_t* node = NULL;
+
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
+
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
- if (header == 0) return NULL;
+ if (header != 0) {
- ks_dhtrt_bucket_t *bucket = header->bucket;
+ ks_dhtrt_bucket_t *bucket = header->bucket;
+
+ if (bucket != 0) { /* probably a logic error ?*/
+
+ ks_rwl_read_lock(bucket->lock);
+ ks_dht_node_t* node = ks_dhtrt_find_nodeid(bucket, nodeid.id);
+
+ if (node != NULL) {
+ ks_rwl_read_lock(node->reflock);
+ }
+
+ ks_rwl_read_unlock(bucket->lock);
+ }
- if (bucket == 0) return NULL; /* probably a logic error ?*/
+ }
- return ks_dhtrt_find_nodeid(bucket, nodeid.id);
+ ks_rwl_read_unlock(internal->lock);
+ return node;
}
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
+
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
if (header != 0 && header->bucket != 0) {
+ ks_rwl_write_lock(header->bucket->lock);
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (e != 0) {
e->flags = DHTPEER_ACTIVE;
s = KS_STATUS_SUCCESS;
}
-
+ ks_rwl_write_unlock(header->bucket->lock);
}
- ks_rwl_read_lock(internal->lock); /* release read lock */
+ ks_rwl_read_unlock(internal->lock); /* release read lock */
return s;
}
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
- if (header != 0) {
-
+ if (header != 0 && header->bucket != 0) {
+ ks_rwl_write_lock(header->bucket->lock);
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (e != 0) {
e->flags = DHTPEER_EXPIRED;
s = KS_STATUS_SUCCESS;
}
-
+ ks_rwl_write_unlock(header->bucket->lock);
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
return s;
if (lheader) {
xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
- memset(xortn, 0, sizeof(ks_dhtrt_sortedxors_t));
if (tofree == 0) tofree = xortn;
if (rheader) {
xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
- memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t));
prev->next = xortn1;
prev = xortn1;
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node)
{
return KS_STATUS_SUCCESS;
- /* return ks_rwl_read_unlock(node->reflock);*/
+ return ks_rwl_read_unlock(node->reflock);
}
stack[stackix++] = header;
if (header->bucket) {
+
ks_dhtrt_bucket_t *b = header->bucket;
- for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
- ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
+ if (ks_rwl_try_write_lock(b->lock) == KS_STATUS_SUCCESS) {
- if (e->inuse == 1) {
- /* more than n pings outstanding? */
+#ifdef KS_DHT_DEBUGLOCKPRINTF_
+ char buf[100];
+ printf("process_table: LOCKING bucket %s\n",
+ ks_dhtrt_printableid(header->mask, buf));
+ fflush(stdout);
+#endif
- if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
- e->flags = DHTPEER_EXPIRED;
- ++b->expired_count;
- continue;
- }
- if (e->flags == DHTPEER_SUSPECT) {
- ks_dhtrt_ping(e);
- continue;
- }
+ for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
+ ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
- ks_time_t tdiff = t0 - e->tyme;
+ if (e->inuse == 1) {
+ /* more than n pings outstanding? */
- if (tdiff > KS_DHTRT_INACTIVETIME) {
- e->flags = DHTPEER_SUSPECT;
- ks_dhtrt_ping(e);
- }
- }
- } /* end for each bucket_entry */
+ if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
+ e->flags = DHTPEER_EXPIRED;
+ ++b->expired_count;
+ continue;
+ }
+
+ if (e->flags == DHTPEER_SUSPECT) {
+ ks_dhtrt_ping(e);
+ continue;
+ }
+
+ ks_time_t tdiff = t0 - e->tyme;
+
+ if (tdiff > KS_DHTRT_INACTIVETIME) {
+ e->flags = DHTPEER_SUSPECT;
+ ks_dhtrt_ping(e);
+ }
+
+ } /* end if e->inuse */
+
+ } /* end for each bucket_entry */
+
+#ifdef KS_DHT_DEBUGLOCKPRINTF_
+ char buf1[100];
+ printf("process_table: UNLOCKING bucket %s\n",
+ ks_dhtrt_printableid(header->mask, buf1));
+ fflush(stdout);
+#endif
+
+ ks_rwl_write_unlock(b->lock);
+
+ } /* end of if trywrite_lock successful */
+ else {
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buf2[100];
+ printf("process_table: unble to LOCK bucket %s\n",
+ ks_dhtrt_printableid(header->mask, buf2));
+ fflush(stdout);
+#endif
+
+ }
}
header = header->left;
}
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
+
+ ks_dhtrt_process_deleted(table);
+
return;
}
+void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
+{
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_mutex_lock(internal->deleted_node_lock);
+
+ ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
+ ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
+
+ while(deleted) {
+ ks_dht_node_t* node = deleted->node;
+
+ if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
+ ks_rwl_destroy(&(node->reflock));
+ ks_pool_free(table->pool, node);
+ temp = deleted;
+ deleted = deleted->next;
+ ks_pool_free(table->pool, temp);
+ if (prev != NULL) {
+ prev->next = deleted;
+ }
+ else {
+ internal->deleted_node = deleted;
+ }
+ }
+ else {
+ prev = deleted;
+ deleted = prev->next;
+ }
+
+ }
+
+ ks_mutex_unlock(internal->deleted_node_lock);
+}
+
KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
/* dump buffer headers */
{
ks_dhtrt_bucket_header_t *header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t));
- memset(header, 0, sizeof(ks_dhtrt_bucket_header_t));
memcpy(header->mask, mask, sizeof(header->mask));
header->parent = parent;
ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool)
{
ks_dhtrt_bucket_t *bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t));
-
- memset(bucket, 0, sizeof(ks_dhtrt_bucket_t));
- /*ks_rwl_create(&bucket->lock, pool);*/
+ ks_rwl_create(&bucket->lock, pool);
return bucket;
}
}
+ ks_rwl_read_lock(bucket->lock); /* get a read lock : released in load_query when the results are copied */
+#ifdef KS_DHT_DEBUGLOCKPRINTF_
+ char buf[100];
+ printf("closestbucketnodes: LOCKING bucket %s\n",
+ ks_dhtrt_printableid(header->mask, buf));
+ fflush(stdout);
+#endif
+
+
for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
if ( bucket->entries[ix].inuse == 1 &&
(family == ifboth || bucket->entries[ix].family == family) &&
query->nodes[ix] = current->bheader->bucket->entries[z].gptr;
xorix = current->xort[xorix].nextix;
++loaded;
- }
+ }
+#ifdef KS_DHT_DEBUGLOCKPRINTF_
+ char buf1[100];
+ printf("load_query: UNLOCKING bucket %s\n",
+ ks_dhtrt_printableid(current->bheader->mask, buf1));
+ fflush(stdout);
+#endif
+ ks_rwl_read_unlock(current->bheader->bucket->lock); /* release the read lock from findclosest_bucketnodes */
+
if (loaded >= query->max) break;
current = current->next;
}
query->count = loaded;
+
return loaded;
}
+void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node)
+{
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
+ deleted->node = node;
+ ks_mutex_lock(internal->deleted_node_lock);
+ deleted->next = internal->deleted_node;
+ internal->deleted_node = deleted;
+ ks_mutex_unlock(internal->deleted_node_lock);
+}
+
+
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
++entry->outstanding_pings;
/* @todo */