for (int i = 0; i < 5; ++i) {
if (ip[i] == ':') {
- tnode->family = AF_INET6; break;
+ tnode->family = AF_INET6; break;
} else if (ip[i] == '.') {
- tnode->family = AF_INET; break;
+ tnode->family = AF_INET; break;
}
- }
+ }
- memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE);
+ memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE);
- if ((ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
- (ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS)) {
- ks_pool_free(table->pool, tnode);
- return KS_STATUS_FAIL;
- }
+ if ((ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
+ (ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS)) {
+ ks_pool_free(table->pool, tnode);
+ return KS_STATUS_FAIL;
+ }
- (*node) = tnode;
+ (*node) = tnode;
- return KS_STATUS_SUCCESS;
+ return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
if (header != 0) {
- ks_dhtrt_bucket_t *bucket = header->bucket;
+ ks_dhtrt_bucket_t *bucket = header->bucket;
- if (bucket != 0) { /* we were not able to find a bucket*/
- ks_dhtrt_delete_id(bucket, node->nodeid.id);
- }
+ if (bucket != 0) { /* we were not able to find a bucket*/
+ ks_dhtrt_delete_id(bucket, node->nodeid.id);
+ }
}
ks_pool_free(table->pool, node);
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
- ks_dhtrt_bucket_t *bucket = 0;
- int insanity = 0;
+ ks_dhtrt_bucket_t *bucket = 0;
+ int insanity = 0;
/* first see if it exists */
ks_dht_node_t *peer = ks_dhtrt_find_node(table, node->nodeid);
- if (peer != 0) {
+ if (peer != 0) {
return KS_STATUS_FAIL;
}
- ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
+ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
bucket = header->bucket;
- assert(bucket != 0); /* we were not able to find a bucket*/
+ assert(bucket != 0); /* we were not able to find a bucket*/
while (bucket->count == KS_DHT_BUCKETSIZE) {
- if (insanity > 3200) assert(insanity < 3200);
+ if (insanity > 3200) assert(insanity < 3200);
- /* first - seek a stale entry to eject */
- if (bucket->expired_count) {
+ /* first - seek a stale entry to eject */
+ if (bucket->expired_count) {
ks_status_t s = ks_dhtrt_insert_id(bucket, node);
if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
- }
+ }
- /*
+ /*
todo: attempting a ping at at this point would require us
to suspend this process ... tricky...assume right now we will go ahead and
eject. Possibly add to a list to recheck
*/
- if ( !(header->flags & BHF_LEFT) ) { /* only the left handside node can be split */
-#ifdef KS_DHT_DEBUGPRINTF_
+ if ( !(header->flags & BHF_LEFT) ) { /* only the left handside node can be split */
+#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
- printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
+ printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
#endif
return KS_STATUS_FAIL;
- }
-
- /* bucket must be split */
+ }
+
+ /* bucket must be split */
/* work out new mask */
unsigned char newmask[KS_DHT_NODEID_SIZE];
memcpy(newmask, header->mask, KS_DHT_NODEID_SIZE);
- if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */
-#ifdef KS_DHT_DEBUGPRINTF_
+ if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */
+#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
- printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(peer->nodeid.id, buffer));
+ printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(peer->nodeid.id, buffer));
#endif
return KS_STATUS_FAIL;
- }
+ }
/* shift right x bits : todo 1 bit for the moment */
- ks_dhtrt_shiftright(newmask);
+ ks_dhtrt_shiftright(newmask);
/* create the new bucket structures */
- ks_dhtrt_bucket_header_t *newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask);
+ ks_dhtrt_bucket_header_t *newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask);
- newleft->bucket = ks_dhtrt_create_bucket(table->pool);
- newleft->flags = BHF_LEFT; /* flag as left hand side - therefore splitable */
+ newleft->bucket = ks_dhtrt_create_bucket(table->pool);
+ newleft->flags = BHF_LEFT; /* flag as left hand side - therefore splitable */
- ks_dhtrt_bucket_header_t *newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask);
+ ks_dhtrt_bucket_header_t *newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask);
ks_dhtrt_split_bucket(header, newleft, newright);
/* ok now we need to try again to see if the bucket has capacity */
/* which bucket do care about */
if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) {
- bucket = newleft->bucket;
- header = newleft;
- } else {
- bucket = newright->bucket;
- header = newright;
- }
- ++insanity;
+ bucket = newleft->bucket;
+ header = newleft;
+ } else {
+ bucket = newright->bucket;
+ header = newright;
+ }
+ ++insanity;
}
-#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("inserting nodeid %s ", ks_dhtrt_printableid(node->nodeid.id, buffer));
- printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer));
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buffer[100];
+ printf("inserting nodeid %s ", ks_dhtrt_printableid(node->nodeid.id, buffer));
+ printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
- /* by this point we have a viable bucket */
- return ks_dhtrt_insert_id(bucket, node);
+ /* by this point we have a viable bucket */
+ return ks_dhtrt_insert_id(bucket, node);
}
KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) {
ks_dhtrt_bucket_t *bucket = header->bucket;
- if (bucket == 0) return NULL; /* probably a logic error ?*/
+ if (bucket == 0) return NULL; /* probably a logic error ?*/
return ks_dhtrt_find_nodeid(bucket, nodeid.id);
}
if (e != 0) {
e->tyme = ks_time_now();
e->outstanding_pings = 0;
- if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count;
+ if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count;
e->flags = DHTPEER_ACTIVE;
return KS_STATUS_SUCCESS;
}
return KS_STATUS_FAIL;
}
-KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
+KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
uint8_t total = 0;
uint8_t cnt;
- if (max == 0) return 0; /* sanity check */
+ if (max == 0) return 0; /* sanity check */
query->count = 0;
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, query->nodeid.id);
-#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer));
- printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer));
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buffer[100];
+ printf("finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer));
+ printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
max -= cnt;
total += cnt;
-#ifdef KS_DHT_DEBUGPRINTF_
- printf(" bucket header %s yielded %d nodes; total=%d\n", buffer, cnt, total);
+#ifdef KS_DHT_DEBUGPRINTF_
+ printf(" bucket header %s yielded %d nodes; total=%d\n", buffer, cnt, total);
#endif
- if (total >= query->max) { /* is query answered ? */
+ if (total >= query->max) { /* is query answered ? */
return ks_dhtrt_load_query(query, &xort0);
}
ks_dhtrt_bucket_header_t *parent = header->parent;
if (header == parent->left) {
- xort1.bheader = header = parent->right;
+ xort1.bheader = header = parent->right;
} else {
- if (!parent->left->bucket) { /* left hand might no have a bucket - if so choose left->right */
- xort1.bheader = header = parent->left->right;
+ if (!parent->left->bucket) { /* left hand might no have a bucket - if so choose left->right */
+ xort1.bheader = header = parent->left->right;
} else {
- xort1.bheader = header = parent->left;
- }
+ xort1.bheader = header = parent->left;
+ }
}
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort1, initid ,max);
max -= cnt;
total += cnt;
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
printf(" stage2: sibling bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(header->mask, buffer), cnt, total);
#endif
- if (total >= query->max) { /* is query answered ? */
+ if (total >= query->max) { /* is query answered ? */
return ks_dhtrt_load_query(query, &xort0);
}
xortn = 0;
xortn1 = 0;
- if (leftid[0] != 0xff) {
+ if (leftid[0] != 0xff) {
ks_dhtrt_shiftleft(leftid);
lheader = ks_dhtrt_find_bucketheader(table, leftid);
- if (lheader) {
+ if (lheader) {
xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
memset(xortn, 0, sizeof(ks_dhtrt_sortedxors_t));
prev = xortn;
cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, lheader, xortn, leftid ,max);
max -= cnt;
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
printf(" stage3: seaching left bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(lheader->mask, buffer), cnt, total);
#endif
}
- }
+ }
- if (max > 0 && rightid[KS_DHT_NODEID_SIZE-1] != 0x00) {
+ if (max > 0 && rightid[KS_DHT_NODEID_SIZE-1] != 0x00) {
ks_dhtrt_shiftright(rightid);
rheader = ks_dhtrt_find_bucketheader(table, rightid);
prev = xortn1;
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, rheader, xortn1, rightid , max);
max -= cnt;
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
printf(" stage3: seaching right bucket header %s yielded %d nodes, total=%d\n",
- ks_dhtrt_printableid(rheader->mask, buffer), cnt, total);
+ ks_dhtrt_printableid(rheader->mask, buffer), cnt, total);
#endif
}
- }
-
- if (!lheader && !rheader) break;
+ }
+
+ if (!lheader && !rheader) break;
- ++insanity;
+ ++insanity;
- if (insanity > 159) {
- assert(insanity <= 159);
- }
+ if (insanity > 159) {
+ assert(insanity <= 159);
+ }
} while (max < query->count);
KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
{
- /* walk the table and update the status of all known knodes */
- /* anything that is suspect automatically becomes expired */
+ /* walk the table and update the status of all known knodes */
+ /* anything that is suspect automatically becomes expired */
- /* inactive for 15 minutes, a node becomes quesionable */
- /* it should be pinged */
+ /* inactive for 15 minutes, a node becomes quesionable */
+ /* it should be pinged */
- /* if it has not been 'touched' since the last time */
- /* give it one more try */
+ /* if it has not been 'touched' since the last time */
+ /* give it one more try */
- /* inactive again it is considered inactive */
- /* */
+ /* inactive again it is considered inactive */
+ /* */
- ks_dhtrt_internal_t *internal = table->internal;
- ks_dhtrt_bucket_header_t *header = internal->buckets;
- ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
- int stackix=0;
- ks_time_t t0 = ks_time_now();
+ ks_dhtrt_internal_t *internal = table->internal;
+ ks_dhtrt_bucket_header_t *header = internal->buckets;
+ ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
+ int stackix=0;
+ ks_time_t t0 = ks_time_now();
- while (header) {
+ while (header) {
stack[stackix++] = header;
if (header->bucket) {
ks_dhtrt_bucket_t *b = header->bucket;
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
- ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
+ ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
if (e->inuse == 1) {
/* more than n pings outstanding? */
if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
- e->flags = DHTPEER_EXPIRED;
+ e->flags = DHTPEER_EXPIRED;
++b->expired_count;
continue;
}
e->flags = DHTPEER_SUSPECT;
ks_dhtrt_ping(e);
}
- }
- } /* end for each bucket_entry */
+ }
+ } /* end for each bucket_entry */
}
header = header->left;
header = stack[stackix];
header = header->right;
}
- }
- return;
+ }
+ return;
}
if (header->bucket) {
ks_dhtrt_bucket_t *b = header->bucket;
printf(" bucket holds %d entries\n", b->count);
-
+
if (level == 7) {
- printf(" --------------------------\n");
+ printf(" --------------------------\n");
- for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
+ for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
memset(buffer, 0, 100);
if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
else strcpy(buffer, "<free>");
- printf(" slot %d: %s\n", ix, buffer);
- }
+ printf(" slot %d: %s\n", ix, buffer);
+ }
- printf(" --------------------------\n\n");
+ printf(" --------------------------\n\n");
}
-
- }
+
+ }
header = header->left;
header = stack[stackix];
header = header->right;
}
- }
+ }
return;
}
static
ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt_bucket_header_t *parent, uint8_t *mask)
{
- ks_dhtrt_bucket_header_t *header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t));
+ ks_dhtrt_bucket_header_t *header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t));
- memset(header, 0, sizeof(ks_dhtrt_bucket_header_t));
- memcpy(header->mask, mask, sizeof(header->mask));
- header->parent = parent;
+ memset(header, 0, sizeof(ks_dhtrt_bucket_header_t));
+ memcpy(header->mask, mask, sizeof(header->mask));
+ header->parent = parent;
#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer));
- if (parent) printf("from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer));
- printf("\n");
+ char buffer[100];
+ printf("creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer));
+ if (parent) printf("from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer));
+ printf("\n");
#endif
- return header;
+ return header;
}
static
ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool)
{
- ks_dhtrt_bucket_t *bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t));
+ ks_dhtrt_bucket_t *bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t));
- memset(bucket, 0, sizeof(ks_dhtrt_bucket_t));
- /*ks_rwl_create(&bucket->lock, pool);*/
- return bucket;
+ memset(bucket, 0, sizeof(ks_dhtrt_bucket_t));
+ /*ks_rwl_create(&bucket->lock, pool);*/
+ return bucket;
}
static
ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *table, ks_dhtrt_nodeid_t id)
{
- /* find the right bucket.
- if a bucket header has a bucket, it does not children
+ /* find the right bucket.
+ if a bucket header has a bucket, it does not children
so it must be the bucket to use
*/
- ks_dhtrt_internal_t *internal = table->internal;
- ks_dhtrt_bucket_header_t *header = internal->buckets;
+ ks_dhtrt_internal_t *internal = table->internal;
+ ks_dhtrt_bucket_header_t *header = internal->buckets;
- while (header) {
+ while (header) {
if ( header->bucket ) {
return header;
}
/* left hand side is more restrictive (closer) so should be tried first */
if (header->left != 0 && (ks_dhtrt_ismasked(id, header->left->mask))) {
- header = header->left;
+ header = header->left;
} else {
- header = header->right;
+ header = header->right;
}
- }
+ }
return NULL;
}
if (bucket == 0) return NULL;
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
#endif
- if ( bucket->entries[ix].inuse == 1 &&
- (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
- return &(bucket->entries[ix]);
- }
+ if ( bucket->entries[ix].inuse == 1 &&
+ (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
+ return &(bucket->entries[ix]);
+ }
}
return NULL;
static
void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
- ks_dhtrt_bucket_header_t *left,
- ks_dhtrt_bucket_header_t *right)
+ ks_dhtrt_bucket_header_t *left,
+ ks_dhtrt_bucket_header_t *right)
{
- /* so split the bucket in two based on the masks in the new header */
- /* the existing bucket - with the remaining ids will be taken by the right hand side */
+ /* so split the bucket in two based on the masks in the new header */
+ /* the existing bucket - with the remaining ids will be taken by the right hand side */
ks_dhtrt_bucket_t *source = original->bucket;
- ks_dhtrt_bucket_t *dest = left->bucket;
+ ks_dhtrt_bucket_t *dest = left->bucket;
int lix = 0;
int rix = 0;
- /* ****************** */
- /* bucket write lock */
- /* ****************** */
- /*ks_rwl_write_lock(source->lock);*/
- source->locked=1;
+ /* ****************** */
+ /* bucket write lock */
+ /* ****************** */
+ /*ks_rwl_write_lock(source->lock);*/
+ source->locked=1;
for ( ; rix<KS_DHT_BUCKETSIZE; ++rix) {
- if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) {
- /* move it to the left */
+ if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) {
+ /* move it to the left */
memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_NODEID_SIZE);
dest->entries[lix].gptr = source->entries[rix].gptr;
dest->entries[lix].inuse = 1;
- ++lix;
+ ++lix;
++dest->count;
/* now remove it from the original bucket */
--source->count;
}
}
- /* *********************** */
- /* end bucket write lock */
- /* *********************** */
- source->locked=0;
- /*ks_rwl_write_unlock(source->lock);*/
+ /* *********************** */
+ /* end bucket write lock */
+ /* *********************** */
+ source->locked=0;
+ /*ks_rwl_write_unlock(source->lock);*/
/* give original bucket to the new left hand side header */
right->bucket = source;
- original->bucket = 0;
- original->left = left;
- original->right = right;
-#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer));
- printf(" into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count);
- printf(" and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
+ original->bucket = 0;
+ original->left = left;
+ original->right = right;
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buffer[100];
+ printf("\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer));
+ printf(" into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count);
+ printf(" and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
#endif
return;
}
/*
- * buckets are implemented as static array
- * There does not seem to be any advantage in sorting/tree structures in terms of xor math
- * so at least the static array does away with the need for locking.
+ * buckets are implemented as static array
+ * There does not seem to be any advantage in sorting/tree structures in terms of xor math
+ * so at least the static array does away with the need for locking.
*/
static
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
{
- /* sanity checks */
+ /* sanity checks */
if (!bucket || bucket->count >= KS_DHT_BUCKETSIZE) {
- assert(0);
+ assert(0);
}
- uint8_t free = KS_DHT_BUCKETSIZE;
- uint8_t expiredix = KS_DHT_BUCKETSIZE;
+ uint8_t free = KS_DHT_BUCKETSIZE;
+ uint8_t expiredix = KS_DHT_BUCKETSIZE;
/* find free .. but also check that it is not already here! */
- uint8_t ix = 0;
-
- for (; ix<KS_DHT_BUCKETSIZE; ++ix) {
- if (bucket->entries[ix].inuse == 0) {
- if (free == KS_DHT_BUCKETSIZE) {
- free = ix; /* use this one */
- }
- }
- else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) {
- expiredix = ix;
- }
- else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
-#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
+ uint8_t ix = 0;
+
+ for (; ix<KS_DHT_BUCKETSIZE; ++ix) {
+ if (bucket->entries[ix].inuse == 0) {
+ if (free == KS_DHT_BUCKETSIZE) {
+ free = ix; /* use this one */
+ }
+ }
+ else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) {
+ expiredix = ix;
+ }
+ else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buffer[100];
+ printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
#endif
- bucket->entries[ix].tyme = ks_time_now();
- bucket->entries[ix].flags &= DHTPEER_ACTIVE;
- return KS_STATUS_SUCCESS; /* already exists */
- }
+ bucket->entries[ix].tyme = ks_time_now();
+ bucket->entries[ix].flags &= DHTPEER_ACTIVE;
+ return KS_STATUS_SUCCESS; /* already exists */
+ }
}
- /* ****************** */
- /* bucket write lock */
- /* ****************** */
- /*ks_rwl_write_lock(bucket->lock);*/
- bucket->locked = 1;
-
- if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
- /* bump this one - but only if we have no other option */
- free = expiredix;
- --bucket->expired_count;
- }
+ /* ****************** */
+ /* bucket write lock */
+ /* ****************** */
+ /*ks_rwl_write_lock(bucket->lock);*/
+ bucket->locked = 1;
+
+ if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
+ /* bump this one - but only if we have no other option */
+ free = expiredix;
+ --bucket->expired_count;
+ }
if ( free<KS_DHT_BUCKETSIZE ) {
bucket->entries[free].inuse = 1;
bucket->entries[free].gptr = node;
- bucket->entries[free].tyme = ks_time_now();
- bucket->entries[free].flags &= DHTPEER_ACTIVE;
+ bucket->entries[free].tyme = ks_time_now();
+ bucket->entries[free].flags &= DHTPEER_ACTIVE;
- ++bucket->count;
+ ++bucket->count;
memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE);
- bucket->locked = 0;
- /*ks_rwl_write_unlock(bucket->lock);*/
-#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("Inserting node %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
+ bucket->locked = 0;
+ /*ks_rwl_write_unlock(bucket->lock);*/
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buffer[100];
+ printf("Inserting node %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
#endif
- return KS_STATUS_SUCCESS;
+ return KS_STATUS_SUCCESS;
}
- bucket->locked = 0;
- /*ks_rwl_write_unlock(bucket->lock);*/
- /* ********************** */
- /* end bucket write lock */
- /* ********************** */
+ bucket->locked = 0;
+ /*ks_rwl_write_unlock(bucket->lock);*/
+ /* ********************** */
+ /* end bucket write lock */
+ /* ********************** */
- return KS_STATUS_FAIL;
+ return KS_STATUS_FAIL;
}
static
ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
{
-#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("find nodeid for: %s\n", ks_dhtrt_printableid(id, buffer));
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buffer[100];
+ printf("find nodeid for: %s\n", ks_dhtrt_printableid(id, buffer));
#endif
-
+
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-#ifdef KS_DHT_DEBUGPRINTFX_
- char bufferx[100];
- if ( bucket->entries[ix].inuse == 1) {
- printf("\nbucket->entries[%d].id = %s inuse=%x\n", ix,
- ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
- bucket->entries[ix].inuse );
- }
-#endif
- if ( bucket->entries[ix].inuse == 1 &&
- (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
+#ifdef KS_DHT_DEBUGPRINTFX_
+ char bufferx[100];
+ if ( bucket->entries[ix].inuse == 1) {
+ printf("\nbucket->entries[%d].id = %s inuse=%x\n", ix,
+ ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
+ bucket->entries[ix].inuse );
+ }
+#endif
+ if ( bucket->entries[ix].inuse == 1 &&
+ (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
return bucket->entries[ix].gptr;
- }
+ }
}
return NULL;
}
static
void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
{
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
- char buffer[100];
- printf("\ndeleting node for: %s\n", ks_dhtrt_printableid(id, buffer));
+ char buffer[100];
+ printf("\ndeleting node for: %s\n", ks_dhtrt_printableid(id, buffer));
#endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-#ifdef KS_DHT_DEBUGPRINTF_
- printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix,
+#ifdef KS_DHT_DEBUGPRINTF_
+ printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, buffer),
bucket->entries[ix].inuse );
#endif
- if ( bucket->entries[ix].inuse == 1 &&
- (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
+ if ( bucket->entries[ix].inuse == 1 &&
+ (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
bucket->entries[ix].inuse = 0;
bucket->entries[ix].gptr = 0;
bucket->entries[ix].flags = 0;
return;
- }
+ }
}
return;
}
uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
ks_dhtrt_bucket_header_t *header,
ks_dhtrt_sortedxors_t *xors,
- unsigned char *hixor, /*todo: remove */
+ unsigned char *hixor, /*todo: remove */
unsigned int max) {
-
- uint8_t count = 0; /* count of nodes added this time */
+
+ uint8_t count = 0; /* count of nodes added this time */
xors->startix = KS_DHT_BUCKETSIZE;
xors->count = 0;
unsigned char xorvalue[KS_DHT_NODEID_SIZE];
/* just ugh! - there must be a better way to do this */
/* walk the entire bucket calculating the xor value on the way */
- /* add valid & relevant entries to the xor values */
+ /* add valid & relevant entries to the xor values */
ks_dhtrt_bucket_t *bucket = header->bucket;
- if (bucket == 0) { /* sanity */
-#ifdef KS_DHT_DEBUGPRINTF_
- char buf[100];
- printf("closestbucketnodes: intermediate tree node found %s\n",
+ if (bucket == 0) { /* sanity */
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buf[100];
+ printf("closestbucketnodes: intermediate tree node found %s\n",
ks_dhtrt_printableid(header->mask, buf));
#endif
-
+
}
for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
- if ( bucket->entries[ix].inuse == 1 &&
- ks_dhtrt_isactive( &(bucket->entries[ix])) ) {
+ if ( bucket->entries[ix].inuse == 1 &&
+ ks_dhtrt_isactive( &(bucket->entries[ix])) ) {
/* calculate xor value */
ks_dhtrt_xor(bucket->entries[ix].id, id, xorvalue );
/* do we need to hold this one */
- if ( count < max || /* yes: we have not filled the quota yet */
- (memcmp(xorvalue, hixor, KS_DHT_NODEID_SIZE) < 0)) { /* or is closer node than one already selected */
+ if ( count < max || /* yes: we have not filled the quota yet */
+ (memcmp(xorvalue, hixor, KS_DHT_NODEID_SIZE) < 0)) { /* or is closer node than one already selected */
- /* now sort the new xorvalue into the results structure */
+ /* now sort the new xorvalue into the results structure */
/* this now becomes worst case O(n*2) logic - is there a better way */
/* in practice the bucket size is fixed so actual behavior is proably 0(logn) */
- unsigned int xorix = xors->startix; /* start of ordered list */
+ unsigned int xorix = xors->startix; /* start of ordered list */
unsigned int prev_xorix = KS_DHT_BUCKETSIZE;
-
+
for (int ix2=0; ix2<count; ++ix2) {
if (memcmp(xorvalue, xors->xort[xorix].xor, KS_DHT_NODEID_SIZE) > 0) {
-
- break; /* insert before xorix, after prev_xoris */
+
+ break; /* insert before xorix, after prev_xoris */
}
prev_xorix = xorix;
xorix = xors->xort[xorix].nextix;
memcpy(xors->xort[count].xor, xorvalue, KS_DHT_NODEID_SIZE);
xors->xort[count].ix = ix;
- xors->xort[count].nextix = xorix; /* correct forward chain */
- if (prev_xorix < KS_DHT_BUCKETSIZE) { /* correct backward chain */
+ xors->xort[count].nextix = xorix; /* correct forward chain */
+ if (prev_xorix < KS_DHT_BUCKETSIZE) { /* correct backward chain */
xors->xort[prev_xorix].nextix = count;
} else {
xors->startix = count;
}
++count;
- }
+ }
}
}
xors->count = count;
- return count; /* return count of added nodes */
+ return count; /* return count of added nodes */
}
static
ks_dhtrt_sortedxors_t *current = xort;
uint8_t loaded = 0;
while (current) {
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
- printf(" loadquery from bucket %s count %d\n",
+ printf(" loadquery from bucket %s count %d\n",
ks_dhtrt_printableid(current->bheader->mask,buf), current->count);
#endif
int xorix = current->startix;
unsigned int z = current->xort[xorix].ix;
query->nodes[ix] = current->bheader->bucket->entries[z].gptr;
++loaded;
- }
+ }
if (loaded >= query->max) break;
current = current->next;
}
}
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
- ++entry->outstanding_pings;
- /* @todo */
- /* set the appropriate command in the node and queue if for processing */
+ ++entry->outstanding_pings;
+ /* @todo */
+ /* set the appropriate command in the node and queue if for processing */
/*ks_dht_node_t *node = entry->gptr; */
-#ifdef KS_DHT_DEBUGPRINTF_
+#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
printf(" ping queued for nodeid %s count %d\n",
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
#endif
- return;
+ return;
}
unsigned char b1 = 0;
for (int i = KS_DHT_NODEID_SIZE-1; i >= 0; --i) {
- if (id[i] == 0) break; /* beyond mask- we are done */
+ if (id[i] == 0) break; /* beyond mask- we are done */
b1 = id[i] & 0x01;
id[i] >>= 1;
if (i != (KS_DHT_NODEID_SIZE-1)) {
xor1 = id1[i] ^ ref[i];
xor2 = id2[i] ^ ref[i];
if (xor1 < xor2) {
- return -1; / * id1 is closer * /
+ return -1; / * id1 is closer * /
}
- return 1; / * id2 is closer * /
+ return 1; / * id2 is closer * /
}
- return 0; / * id2 and id2 are identical ! * /
+ return 0; / * id2 and id2 are identical ! * /
}
*/
/* create an xor value from two ids */
static void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor)
{
- for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i) {
- if (id1[i] == id2[i]) {
- xor[i] = 0;
- }
- xor[i] = id1[i] ^ id2[i];
- }
- return;
+ for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i) {
+ if (id1[i] == id2[i]) {
+ xor[i] = 0;
+ }
+ xor[i] = id1[i] ^ id2[i];
+ }
+ return;
}
/* is id masked by mask 1 => yes, 0=> no */
static int ks_dhtrt_ismasked(const uint8_t *id, const unsigned char *mask)
{
- for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i) {
- if (mask[i] == 0 && id[i] != 0) return 0;
- else if (mask[i] == 0xff) return 1;
- else if (id[i] > mask[i]) return 0;
- }
- return 1;
+ for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i) {
+ if (mask[i] == 0 && id[i] != 0) return 0;
+ else if (mask[i] == 0xff) return 1;
+ else if (id[i] > mask[i]) return 0;
+ }
+ return 1;
}
static char *ks_dhtrt_printableid(uint8_t *id, char *buffer)
}
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
+