struct ks_dhtrt_bucket_header_s * parent;
struct ks_dhtrt_bucket_header_s * left;
struct ks_dhtrt_bucket_header_s * right;
+ struct ks_dhtrt_bucket_header_s * left1bit;
+ struct ks_dhtrt_bucket_header_s * right1bit;
ks_dhtrt_bucket_t * bucket;
ks_time_t tyme; /* last processed time */
unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_lock(bucket->lock);
/* first - seek a stale entry to eject */
if (bucket->expired_count) {
ks_status_t s = ks_dhtrt_insert_id(bucket, node);
+
if (s == KS_STATUS_SUCCESS) {
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock);
#endif
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock);
#endif
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock);
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf));
ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_lock(bucket->lock); /* lock new bucket */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(bucket->lock);
return s;
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_read_lock(bucket->lock);
}
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_read_unlock(bucket->lock);
}
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Touch node: write bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
}
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Touch node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
ks_rwl_write_unlock(header->bucket->lock);
}
static
uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
{
- uint8_t max = query->max;
uint8_t total = 0;
uint8_t cnt;
- if (max == 0) return 0; /* sanity checks */
- if (max > KS_DHTRT_MAXQUERYSIZE) { /* enforce the maximum */
- max = KS_DHTRT_MAXQUERYSIZE;
+ if (query->max == 0) return 0; /* sanity checks */
+ if (query->max > KS_DHTRT_MAXQUERYSIZE) { /* enforce the maximum */
query->max = KS_DHTRT_MAXQUERYSIZE;
}
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
- ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer));
+ ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for nodeid %s\n",
+ query->max,
+ ks_dhtrt_printableid(query->nodeid.id, buffer));
ks_log(KS_LOG_DEBUG, " ...starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
/* step 1 - look at immediate bucket */
/* --------------------------------- */
+ int max = query->max;
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, header, &xort0, initid ,max);
- max -= cnt;
total += cnt;
#ifdef KS_DHT_DEBUGPRINTF_
}
}
+ max = query->count - total;
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, header, &xort1, initid ,max);
- max -= cnt;
total += cnt;
#ifdef KS_DHT_DEBUGPRINTF_
memcpy(rightid, xort1.bheader->mask, KS_DHT_NODEID_SIZE);
int insanity = 0;
- ks_dhtrt_bucket_header_t *lheader;
- ks_dhtrt_bucket_header_t *rheader;
+ ks_dhtrt_bucket_header_t *lheader = 0;
+ ks_dhtrt_bucket_header_t *rheader = 0;
+ ks_dhtrt_bucket_header_t *last_rheader = 0;
+ ks_dhtrt_bucket_header_t *last_lheader = 0;
ks_dhtrt_sortedxors_t *prev = &xort1;
ks_dhtrt_sortedxors_t *tofree = 0;
ks_dhtrt_sortedxors_t *xortn;
ks_dhtrt_sortedxors_t *xortn1;
do {
+ last_lheader = lheader;
lheader = 0;
+ last_rheader = rheader;
rheader = 0;
xortn = 0;
xortn1 = 0;
if (leftid[0] != 0xff) {
+
ks_dhtrt_shiftleft(leftid);
- lheader = ks_dhtrt_find_bucketheader(table, leftid);
+
+ if (last_lheader && last_lheader->left1bit) {
+ lheader = last_lheader->left1bit = ks_dhtrt_find_relatedbucketheader(last_lheader->left1bit, leftid);
+ }
+ else {
+ lheader = ks_dhtrt_find_bucketheader(table, leftid);
+ if (last_lheader) {
+ last_lheader->left1bit = lheader; /* remember so we can take a shortcut next query */
+ }
+ }
if (lheader) {
xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
- if (tofree == 0) tofree = xortn;
+ if (tofree == 0) {
+ tofree = xortn;
+ }
prev->next = xortn;
prev = xortn;
- cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
+ max = query->max - total;
+ cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
lheader, xortn, leftid ,max);
- max -= cnt;
+ total += cnt;
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG," stage3: seaching left bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(lheader->mask, buffer), cnt, total);
#endif
}
+#ifdef KS_DHT_DEBUGPRINTF_
+ else {
+ ks_log(KS_LOG_DEBUG," stage3: failed to find left header %s\n",
+ ks_dhtrt_printableid(leftid, buffer));
+ }
+#endif
+
}
- if (max > 0 && rightid[KS_DHT_NODEID_SIZE-1] != 0x00) {
+ if (rightid[KS_DHT_NODEID_SIZE-1] != 0x00) {
+
ks_dhtrt_shiftright(rightid);
- rheader = ks_dhtrt_find_bucketheader(table, rightid);
+
+ if (last_rheader && last_rheader->right1bit) {
+ rheader = last_rheader->right1bit = ks_dhtrt_find_relatedbucketheader(last_rheader->right1bit, rightid);
+ }
+ else {
+ rheader = ks_dhtrt_find_bucketheader(table, rightid);
+ if (rheader == last_rheader) { /* did we get the same bucket header returned */
+ rheader = 0; /* yes: we are done on the left hand branch */
+ }
+ else {
+ if (last_rheader) {
+ last_rheader->left1bit = rheader; /* remember so we can take a shortcut next query */
+ }
+ }
+ }
if (rheader) {
xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
+
+ if (tofree == 0) {
+ tofree = xortn1;
+ }
+
prev->next = xortn1;
prev = xortn1;
+ max = query->max - total;
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
rheader, xortn1, rightid , max);
- max -= cnt;
+ total += cnt;
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG," stage3: seaching right bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(rheader->mask, buffer), cnt, total);
#endif
}
+#ifdef KS_DHT_DEBUGPRINTF_
+ else {
+ ks_log(KS_LOG_DEBUG," stage3: failed to find right header %s\n",
+ ks_dhtrt_printableid(rightid, buffer));
+ }
+#endif
+
}
if (!lheader && !rheader) {
if (insanity > 159) {
assert(insanity <= 159);
}
-
- } while (max < query->max);
+ } while (total < query->max);
ks_dhtrt_load_query(query, &xort0);
#endif
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
+ e->outstanding_pings = 0; /* extinguish all hope: do not retry again */
continue;
}
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
memset(buffer, 0, 100);
- if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
- else strcpy(buffer, "<free>");
- ks_log(KS_LOG_DEBUG, " slot %d: %d %d %s\n", ix,
- b->entries[ix].flags,
+ if (b->entries[ix].inuse == 1) {
+ ks_dhtrt_printableid(b->entries[ix].id, buffer);
+ ks_dht_node_t *n = b->entries[ix].gptr;
+ ks_log(KS_LOG_DEBUG, " slot %d: flags:%d %d type:%d family:%d %s\n", ix,
+ b->entries[ix].flags,
b->entries[ix].outstanding_pings,
+ n->type,
+ n->family,
buffer);
+ }
+ else {
+ ks_log(KS_LOG_DEBUG, " slot %d: <free>\n", ix);
+ }
}
ks_log(KS_LOG_DEBUG, " --------------------------\n\n");
char buf[100];
ks_log(KS_LOG_DEBUG, "closestbucketnodes: LOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf));
- //fflush(stdout);
#endif
enum ks_afflags_t both = ifboth;
ks_status_t status;
+ int ipv4_remote = 0;
+ int ipv4_local = 0;
for (int i=0; i<200; ++i) {
if (i%10 == 0) {
++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
++nodeid.id[1];
}
- ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
- ks_dhtrt_touch_node(rt, nodeid);
+ ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+ if (s0 == KS_STATUS_SUCCESS) {
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_remote;
+ }
}
+
for (int i=0; i<2; ++i) {
if (i%10 == 0) {
++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
++nodeid.id[1];
}
- ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
- ks_dhtrt_touch_node(rt, nodeid);
+ ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
+ if (s0 == KS_STATUS_SUCCESS) {
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_local;
+ }
}
for (int i=0; i<201; ++i) {
if (i%10 == 0) {
++nodeid.id[0];
+ nodeid.id[1] = 0;
}
else {
++nodeid.id[1];
}
+ ks_dhtrt_dump(rt, 7);
+
+
int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
- printf("\n** local query count expected 2, actual %d\n", qcount); fflush(stdout);
+ printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, both);
printf("\n*** remote query count expected 20, actual %d\n", qcount); fflush(stdout);
qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both);
printf("\n*** AF_INET6 count expected 20, actual %d\n", qcount); fflush(stdout);
qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv4);
- printf("\n*** remote AF_INET query count expected 20, actual %d\n", qcount); fflush(stdout);
+ printf("\n*** remote AF_INET query count expected 20, actual %d max %d\n", qcount, ipv4_remote); fflush(stdout);
qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv6);
printf("\n*** remote AF_INET6 query count expected 20, actual %d\n", qcount); fflush(stdout);
}
+void test30()
+{
+ printf("*** testbuckets - test03 start\n"); fflush(stdout);
+
+ ks_dht_node_t* peer;
+ ks_dht_nodeid_t nodeid;
+ memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ char ipv6[] = "1234:1234:1234:1234";
+ char ipv4[] = "123.123.123.123";
+ unsigned short port = 7000;
+ enum ks_afflags_t both = ifboth;
+
+ ks_status_t status;
+ int ipv4_remote = 0;
+ int ipv4_local = 0;
+
+ for (int i=0; i<200; ++i) {
+ if (i%10 == 0) {
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
+ }
+ else {
+ ++nodeid.id[1];
+ }
+ ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
+ if (s0 == KS_STATUS_SUCCESS) {
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_remote;
+ }
+ }
+
+ for (int i=0; i<2; ++i) {
+ if (i%10 == 0) {
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
+ }
+ else {
+ ++nodeid.id[1];
+ }
+
+ ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
+ if (s0 == KS_STATUS_SUCCESS) {
+ ks_dhtrt_touch_node(rt, nodeid);
+ ++ipv4_local;
+ }
+ }
+
+ for (int i=0; i<201; ++i) {
+ if (i%10 == 0) {
+ ++nodeid.id[0];
+ nodeid.id[1] = 0;
+ }
+ else {
+ ++nodeid.id[1];
+ }
+ ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
+ ks_dhtrt_touch_node(rt, nodeid);
+ }
+
+
+ ks_dhtrt_dump(rt, 7);
+
+
+ int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
+ printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
+ qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
+ printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
+ qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both);
+ printf("\n** local query count expected 20, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout);
+
+ return;
+}
+
+
+
+
+
+
+
/* test resue of node memory */
void test50()
{
ks_dht_create(&dht, NULL, NULL);
- ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+ // ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
+
+ tpool = 0;
+
ks_status_t status;
char *str = NULL;
int bytes = 1024;
continue;
}
+
+ if (tests[tix] == 30) {
+ ks_dhtrt_initroute(&rt, dht, pool, tpool);
+ test30();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+
if (tests[tix] == 50) {
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test50();