/* internal structures */
typedef struct ks_dhtrt_bucket_entry_s {
ks_time_t tyme;
+ ks_time_t ping_tyme;
uint8_t id[KS_DHT_NODEID_SIZE];
ks_dht_node_t *gptr; /* ptr to peer */
uint8_t inuse;
unsigned int max);
static
-void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
+void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry);
static
-void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid);
+void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid);
/* debugging */
return KS_STATUS_FAIL;
}
- /* shift right x bits : todo 1 bit for the moment */
+ /* shift right 1 bit */
ks_dhtrt_shiftright(newmask);
/* create the new bucket structures */
ks_dhtrt_bucket_header_t *newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask);
- header->right1bit = newleft;
- newleft->left1bit = header;
-
newleft->bucket = ks_dhtrt_create_bucket(table->pool);
newleft->flags = BHF_LEFT; /* flag as left hand side - therefore splitable */
ks_dhtrt_bucket_header_t *newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask);
+ newright->right1bit = newleft;
+ newleft->left1bit = newright;
+
ks_dhtrt_split_bucket(header, newleft, newright);
/* ok now we need to try again to see if the bucket has capacity */
ks_time_t t0 = ks_time_now_sec();
if (t0 - internal->last_process_table < internal->next_process_table_delta) {
+ /*printf("process table: next scan not scheduled\n");*/
return;
}
if (b->count == 0) {
- if (t0 - b->findtyme >= KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */
+ if (t0 - b->findtyme >= (ks_time_t)KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */
+
ks_dht_nodeid_t targetid;
- if (header->left1bit) {
- ks_dhtrt_midmask(header->left1bit->mask, header->mask, targetid.id);
- }
- else if (header->right1bit) {
+
+ if (header->right1bit) {
ks_dhtrt_midmask(header->mask, header->right1bit->mask, targetid.id);
}
else {
- ks_dhtrt_shiftright(targetid.id);
+ ks_dhtrt_nodeid_t rightid;
+ memcpy(rightid, header->mask, KS_DHT_NODEID_SIZE);
+ ks_dhtrt_shiftright(rightid);
+ ks_dhtrt_midmask(header->mask, rightid, targetid.id);
}
- ks_dhtrt_find(internal, &targetid);
- continue;
+
+ ks_dhtrt_find(table, internal, &targetid);
+ b->findtyme = t0;
}
}
+ else {
+ for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
+ ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
- for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
- ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
-
- if (e->inuse == 1) {
-
- if (e->gptr->type != KS_DHT_LOCAL) { /* 'local' nodes do not get expired */
-
- /* more than n pings outstanding? */
+ if (e->inuse == 1) {
- if (e->flags == DHTPEER_DUBIOUS) {
- continue;
- }
+ ks_time_t tdiff = t0 - e->tyme;
- if ( e->flags != DHTPEER_EXPIRED &&
- e->outstanding_pings >= KS_DHTRT_MAXPING ) {
- ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
- ks_dhtrt_printableid(e->id, buf));
- e->flags = DHTPEER_EXPIRED;
- ++b->expired_count;
- e->outstanding_pings = 0; /* extinguish all hope: do not retry again */
- continue;
- }
+ if (e->gptr->type != KS_DHT_LOCAL) { /* 'local' nodes do not get expired */
- /* if not on shortest interval and there are any outstanding pings - send another */
- if ( internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120 &&
- e->outstanding_pings > 0) {
- ks_dhtrt_ping(internal, e);
+ /* more than n pings outstanding? */
- if (e->outstanding_pings == 2) {
- ++ping2_count; /* return in 60 seconds for final check */
- }
- else {
- ++ping_count;
+ if (e->flags == DHTPEER_DUBIOUS) {
+ continue; /* nothin' to see here */
}
- continue;
- }
-
- /* if on shortest interval and there are two outstanding pings - send another and final */
- if ( internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL60 &&
- e->outstanding_pings >= 2) {
- ks_dhtrt_ping(internal, e);
- ++ping_count;
- continue;
- }
-
- ks_time_t tdiff = t0 - e->tyme;
+ /* refresh empty buckets */
+ if ( e->flags != DHTPEER_EXPIRED &&
+ tdiff >= KS_DHTRT_EXPIREDTIME && /* beyond expired time */
+ e->outstanding_pings >= KS_DHTRT_MAXPING ) { /* has been retried */
+ ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
+ ks_dhtrt_printableid(e->id, buf));
+ e->flags = DHTPEER_EXPIRED;
+ ++b->expired_count;
+ e->outstanding_pings = 0; /* extinguish all hope: do not retry again */
+ continue;
+ }
- if (tdiff > KS_DHTRT_EXPIREDTIME) {
- e->flags = DHTPEER_DUBIOUS; /* mark as dubious */
- ks_dhtrt_ping(internal, e); /* final effort to activate */
- continue;
- }
+ /* re ping in-doubt nodes */
+ if ( e->outstanding_pings > 0) {
+ ks_time_t tping = t0 - e->ping_tyme; /* time since we last pinged */
+
+ if (e->outstanding_pings == KS_DHTRT_MAXPING - 1) { /* final ping */
+ ks_dhtrt_ping(internal, e);
+ e->ping_tyme = t0;
+ ++ping2_count;
+ }
+ else if (tping >= KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120) {
+ ks_dhtrt_ping(internal, e);
+ e->ping_tyme = t0;
+ ++ping_count;
+ }
+ continue;
+ }
+
+ /* look for newly expired nodes */
+ if (tdiff > KS_DHTRT_EXPIREDTIME) {
+ e->flags = DHTPEER_DUBIOUS; /* mark as dubious */
+ ks_dhtrt_ping(internal, e); /* final effort to activate */
+ e->ping_tyme = t0;
+ continue;
+ }
- if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */
- ks_dhtrt_ping(internal, e); /* kick */
- ++ping_count;
- continue;
- }
+ if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */
+ ks_dhtrt_ping(internal, e); /* kick */
+ e->ping_tyme = t0;
+ ++ping_count;
+ continue;
+ }
- } /* end if not local */
+ } /* end if not local */
- } /* end if e->inuse */
+ } /* end if e->inuse */
- } /* end for each bucket_entry */
+ } /* end for each bucket_entry */
+ } /* if bucket->count == 0 .... else */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
char buf[100];
ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n",
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
+ /*printf("ping: %s\n", buf); fflush(stdout);*/
#endif
ks_dht_node_t* node = entry->gptr;
ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port);
}
static
-void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid) {
+void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *target) {
-#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
- ks_log(KS_LOG_DEBUG, "Find queued for mask %s\n", ks_dhtrt_printableid(nodeid->id, buf));
-#endif
-
+ ks_log(KS_LOG_DEBUG, "Find queued for target %s\n", ks_dhtrt_printableid(target->id, buf));
+ ks_dht_queue_search_findnode(internal->dht, table, target, NULL);
return;
}
-
-
-
/*
strictly for shifting the bucketheader mask
so format must be a right filled mask (hex: ..ffffffff)
static
void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) {
- int i = 0;
+ uint8_t i = 0;
+
memset(midpt, 0, sizeof KS_DHT_NODEID_SIZE);
for ( ; i < KS_DHT_NODEID_SIZE; ++i) {
if (leftid[i] == 0 && rightid[i] == 0) {
continue;
}
- break; /* first non zero */
+ else if (leftid[i] == 0 || rightid[i] == 0) {
+ midpt[i] = leftid[i] | rightid[i];
+ continue;
+ }
+ else {
+ if (leftid[i] == rightid[i]) {
+ midpt[i] = leftid[i] >> 1;
+ i++;
+ }
+ else {
+ uint16_t x = leftid[i] + rightid[i];
+ x >>= 1;
+ midpt[i++] = (uint8_t)x;
+ }
+ break;
+ }
}
if (i == KS_DHT_NODEID_SIZE) {
return;
}
- uint16_t x = leftid[i] + rightid[i];
- x >>= 1;
- midpt[i++] = (uint8_t)x;
-
if ( i < KS_DHT_NODEID_SIZE ) {
memcpy(&midpt[i], &rightid[i], KS_DHT_NODEID_SIZE-i);
}
}
+void test08()
+{
+ printf("**** testbuckets - test08 start\n"); fflush(stdout);
+
+ ks_dht_node_t *peer;
+ memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
+ memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ char ipv6[] = "1234:1234:1234:1234";
+ char ipv4[] = "123.123.123.123";
+ unsigned short port = 7000;
+
+ /* build a delete queue */
+
+ int cix=0;
+
+ for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
+ if (i0%20 == 0) {
+ g_nodeid2.id[cix]>>=1;
+ //ks_dhtrt_dump(rt, 7);
+ if ( g_nodeid2.id[cix] == 0) ++cix;
+ g_nodeid2.id[19] = 0;
+ }
+ else {
+ ++g_nodeid2.id[19];
+ }
+ ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
+ ks_dhtrt_touch_node(rt, g_nodeid2);
+ ks_dhtrt_release_node(peer);
+ }
+
+ cix = 0;
+
+ memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+ for (int i0=0, i1=0; i0<150; ++i0, ++i1) {
+ if (i0%20 == 0) {
+ g_nodeid2.id[cix]>>=1;
+ if ( g_nodeid2.id[cix] == 0) ++cix;
+ g_nodeid2.id[19] = 0;
+ }
+ else {
+ ++g_nodeid2.id[19];
+ }
+ ks_dht_node_t* n = ks_dhtrt_find_node(rt, g_nodeid2);
+ ks_dhtrt_release_node(n);
+ ks_dhtrt_delete_node(rt, n);
+ }
+
+ /* this should drive the search_findnode */
+
+ for(int i=0; i<45; ++i) {
+ printf("firing process table\n");
+ ks_dhtrt_process_table(rt);
+ ks_sleep(1000 * 1000 * 60); /* sleep one minutes */
+ }
+
+ printf("**** testbuckets - test08 ended\n"); fflush(stdout);
+}
+
+
+void test09()
+{
+ printf("**** testbuckets - test09 start\n"); fflush(stdout);
+
+ ks_dht_node_t *peer;
+ memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
+ memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
+
+ char ipv6[] = "1234:1234:1234:1234";
+ char ipv4[] = "123.123.123.123";
+ unsigned short port = 7000;
+
+ /* build a delete queue */
+
+ int cix=0;
+
+ for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
+ if (i0%20 == 0) {
+ g_nodeid2.id[cix]>>=1;
+ //ks_dhtrt_dump(rt, 7);
+ if ( g_nodeid2.id[cix] == 0) ++cix;
+ g_nodeid2.id[19] = 0;
+ }
+ else {
+ ++g_nodeid2.id[19];
+ }
+ ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
+ ks_dhtrt_touch_node(rt, g_nodeid2);
+ ks_dhtrt_release_node(peer);
+ }
+
+ /* this should expire all nodes after 15 minutes and 3 pings */
+
+ printf("\n\n\n\n");
+
+ for(int i=0; i<45; ++i) {
+ printf("firing process table\n");
+ ks_dhtrt_process_table(rt);
+ ks_sleep(1000 * 1000 * 30); /* sleep 30 seconds */
+ }
+
+ printf("**** testbuckets - test09 ended\n"); fflush(stdout);
+}
+
+
+
+
+
+
+
+
static int gindex = 1;
static ks_mutex_t *glock;
static int gstop = 0;
continue;
}
-
if (tests[tix] == 7) {
ks_dhtrt_initroute(&rt, dht, pool);
test07();
continue;
}
+ if (tests[tix] == 8) {
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test08();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+ if (tests[tix] == 9) {
+ ks_dhtrt_initroute(&rt, dht, pool);
+ test09();
+ ks_dhtrt_deinitroute(&rt);
+ continue;
+ }
+
+
+
if (tests[tix] == 30) {
ks_dhtrt_initroute(&rt, dht, pool);
test30();