ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE];
uint8_t count;
uint8_t expired_count;
- ks_rwl_t * lock;
- uint8_t locked;
} ks_dhtrt_bucket_t;
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
ks_rwl_t *lock; /* lock for safe traversal of the tree */
- uint8_t locked;
} ks_dhtrt_internal_t;
typedef struct ks_dhtrt_xort_s {
static
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node);
static
-void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
+ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
static
char *ks_dhtrt_printableid(uint8_t *id, char *buffer);
static
unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t *entry);
+
+static
+uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query);
static
uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t *xort);
static
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
- /*ks_rwl_create(&internal->lock, pool);*/
+ ks_rwl_create(&internal->lock, pool);
table->internal = internal;
/* initialize root bucket */
tnode->type = type;
if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
- ( ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS) ||
- ( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) {
+ ( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) {
ks_pool_free(table->pool, tnode);
return KS_STATUS_FAIL;
}
+
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_rwl_write_lock(internal->lock); /* grab write lock and insert */
+ ks_status_t s = ks_dhtrt_insert_node(table, tnode);
+ ks_rwl_write_unlock(internal->lock); /* release write lock */
(*node) = tnode;
- return KS_STATUS_SUCCESS;
+ return s;
}
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
+ ks_status_t s = KS_STATUS_FAIL;
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_rwl_write_lock(internal->lock); /* grab write lock and delete */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
if (header != 0) {
ks_dhtrt_bucket_t *bucket = header->bucket;
- if (bucket != 0) { /* we were not able to find a bucket*/
- ks_dhtrt_delete_id(bucket, node->nodeid.id);
+ if (bucket != 0) { /* we found a bucket*/
+ s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
}
+
}
- ks_rwl_destroy(&node->reflock);
+ ks_rwl_write_unlock(internal->lock); /* release write lock */
+
+ ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, node);
- return KS_STATUS_SUCCESS;
+ return s;
}
static
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
+ ks_status_t s = KS_STATUS_FAIL;
+ ks_dhtrt_internal_t* internal = table->internal;
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
- if (header == 0) return KS_STATUS_FAIL;
- if (header->bucket == 0) return KS_STATUS_FAIL;
+ if (header != 0 && header->bucket != 0) {
+ ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
- ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
+ if (e != 0) {
+ e->tyme = ks_time_now();
+ e->outstanding_pings = 0;
- if (e != 0) {
- e->tyme = ks_time_now();
- e->outstanding_pings = 0;
- if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count;
- e->flags = DHTPEER_ACTIVE;
- return KS_STATUS_SUCCESS;
- }
+ if (e->flags == DHTPEER_EXPIRED) {
+ --header->bucket->expired_count;
+ }
- return KS_STATUS_FAIL;
+ e->flags = DHTPEER_ACTIVE;
+ s = KS_STATUS_SUCCESS;
+ }
+
+ }
+ ks_rwl_read_lock(internal->lock); /* release read lock */
+ return s;
}
KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
+ ks_status_t s = KS_STATUS_FAIL;
+ ks_dhtrt_internal_t *internal = table->internal;
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
- if (header == 0) return KS_STATUS_FAIL;
+ if (header != 0) {
- ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
+ ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
+
+ if (e != 0) {
+ e->flags = DHTPEER_EXPIRED;
+ s = KS_STATUS_SUCCESS;
+ }
- if (e != 0) {
- e->flags = DHTPEER_EXPIRED;
- return KS_STATUS_SUCCESS;
}
- return KS_STATUS_FAIL;
+ ks_rwl_read_unlock(internal->lock); /* release read lock */
+ return s;
}
-KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
+KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
+{
+ uint8_t count = 0;
+ ks_dhtrt_internal_t *internal = table->internal;
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
+ count = ks_dhtrt_findclosest_locked_nodes(table, query);
+ ks_rwl_read_unlock(internal->lock); /* release read lock */
+ return count;
+}
+
+static
+uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
{
uint8_t max = query->max;
uint8_t total = 0;
printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
-
ks_dhtrt_sortedxors_t xort0;
-
memset(&xort0, 0 , sizeof(xort0));
ks_dhtrt_nodeid_t initid;
/* */
ks_dhtrt_internal_t *internal = table->internal;
+
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
+
ks_dhtrt_bucket_header_t *header = internal->buckets;
ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
int stackix=0;
header = header->right;
}
}
+ ks_rwl_read_unlock(internal->lock); /* release read lock */
return;
}
ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
int stackix = 0;
+ ks_rwl_read_lock(internal->lock); /* grab read lock */
while (header) {
stack[stackix++] = header;
/* walk and report left handsize */
ks_dhtrt_bucket_t *b = header->bucket;
printf(" bucket holds %d entries\n", b->count);
- if (level == 7) {
+ if (b->count > 0 && level == 7) {
printf(" --------------------------\n");
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
header = header->right;
}
}
-
+ ks_rwl_read_unlock(internal->lock); /* release read lock */
return;
}
int lix = 0;
int rix = 0;
- /* ****************** */
- /* bucket write lock */
- /* ****************** */
- /*ks_rwl_write_lock(source->lock);*/
- source->locked=1;
-
for ( ; rix<KS_DHT_BUCKETSIZE; ++rix) {
if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) {
/* move it to the left */
--source->count;
}
}
- /* *********************** */
- /* end bucket write lock */
- /* *********************** */
- source->locked=0;
- /*ks_rwl_write_unlock(source->lock);*/
/* give original bucket to the new left hand side header */
right->bucket = source;
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
{
/* sanity checks */
- if (!bucket || bucket->count >= KS_DHT_BUCKETSIZE) {
+ if (!bucket || bucket->count > KS_DHT_BUCKETSIZE) {
assert(0);
}
else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
- printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
+ printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
#endif
bucket->entries[ix].tyme = ks_time_now();
bucket->entries[ix].flags &= DHTPEER_ACTIVE;
}
}
- /* ****************** */
- /* bucket write lock */
- /* ****************** */
- /*ks_rwl_write_lock(bucket->lock);*/
- bucket->locked = 1;
-
if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
/* bump this one - but only if we have no other option */
free = expiredix;
bucket->entries[free].tyme = ks_time_now();
bucket->entries[free].flags &= DHTPEER_ACTIVE;
- ++bucket->count;
+ if (free != expiredix) { /* are we are taking a free slot rather than replacing an expired node? */
+ ++bucket->count; /* yes: increment total count */
+ }
+
memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE);
- bucket->locked = 0;
- /*ks_rwl_write_unlock(bucket->lock);*/
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
- printf("Inserting node %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
+ printf("Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free);
#endif
return KS_STATUS_SUCCESS;
}
- bucket->locked = 0;
- /*ks_rwl_write_unlock(bucket->lock);*/
- /* ********************** */
- /* end bucket write lock */
- /* ********************** */
-
return KS_STATUS_FAIL;
}
}
static
-void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
+ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
{
#ifdef KS_DHT_DEBUGPRINTF_
#endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTFX_
+ char bufferx[100];_
printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix,
- ks_dhtrt_printableid(bucket->entries[ix].id, buffer),
+ ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse );
#endif
if ( bucket->entries[ix].inuse == 1 &&
(!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
bucket->entries[ix].inuse = 0;
- bucket->entries[ix].gptr = 0;
+ bucket->entries[ix].gptr = 0;
bucket->entries[ix].flags = 0;
- return;
+ --bucket->count;
+ return KS_STATUS_SUCCESS;
}
}
- return;
+ return KS_STATUS_FAIL;
}