/* change for testing */
#define KS_DHT_BUCKETSIZE 20
+#define KS_DHTRT_INACTIVETIME (5*60)
+#define KS_DHTRT_MAXPING 3
/* peer flags */
-#define DHTPEER_ACTIVE 0x01
-#define DHTPEER_SUSPECT 0x02
-#define DHTPEER_EXPIRED 0x04
+#define DHTPEER_ACTIVE 1
+#define DHTPEER_SUSPECT 2
+#define DHTPEER_EXPIRED 3
/* internal structures */
+typedef struct ks_dhtrt_rw_lock_s {
+ ks_pool_t* pool;
+ ks_mutex_t* mutex;
+ ks_cond_t* rcond;
+ volatile uint16_t read_count;
+ ks_cond_t* wcond;
+ volatile uint16_t write_count; /* hopefully never more than 1 ! */
+} ks_dhtrt_rw_lock;
+
typedef struct ks_dhtrt_bucket_entry_s {
ks_time_t tyme;
unsigned char id[KS_DHT_IDSIZE];
ks_dhtrt_node* gptr; /* ptr to peer */
uint8_t inuse;
+ uint8_t outstanding_pings;
uint8_t flags; /* active, suspect, expired */
- struct ks_dhtrt_bucket_entry_s* left;
- struct ks_dhtrt_bucket_entry_s* right;
- struct ks_dhtrt_bucket_entry_s* prev;
} ks_dhtrt_bucket_entry;
typedef struct ks_dhtrt_bucket_s {
ks_dhtrt_bucket_entry entries[KS_DHT_BUCKETSIZE];
- ks_dhtrt_bucket_entry* first; /* sorted order - first*/
- ks_dhtrt_bucket_entry* last; /* sorted order - last*/
- ks_dhtrt_bucket_entry* avail; /* available chain */
- unsigned short count;
+ uint8_t count;
+ uint8_t expired_count;
} ks_dhtrt_bucket;
struct ks_dhtrt_bucket_header* left;
struct ks_dhtrt_bucket_header* right;
ks_dhtrt_bucket* bucket;
- unsigned char mask[KS_DHT_IDSIZE];
- unsigned char flags;
+ ks_time_t tyme; /* last processed time */
+ unsigned char mask[KS_DHT_IDSIZE]; /* node id mask */
+ unsigned char flags;
} ks_dhtrt_bucket_header;
typedef struct ks_dhtrt_internal_s {
- ks_dhtrt_bucket_header* buckets;
+ ks_dhtrt_bucket_header* buckets; /* root bucketheader */
/* */
} ks_dhtrt_internal;
unsigned char* hixor,
unsigned int max);
+static
+void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry);
+
+static
+ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock);
+
+static
+void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock);
+static
+ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock);
+static
+ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock);
+static
+void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock);
+
+
+
/* debugging */
#define KS_DHT_DEBUGPRINTF_
if (insanity > 3200) assert(insanity < 3200);
/* first - seek a stale entry to eject */
+ if (bucket->expired_count) {
+ ks_status_t s = ks_dhtrt_insert_id(bucket, peer);
+ if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
+ }
+
/*
todo: attempting a ping at at this point would require us
to suspend this process ... tricky...assume right now we will go ahead and
{
ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid);
if (header == 0) return KS_STATUS_FAIL;
+ if (header->bucket == 0) return KS_STATUS_FAIL;
ks_dhtrt_bucket_entry* e = ks_dhtrt_find_bucketentry(header, nodeid);
- if (e != 0) {
+ if (e != 0) {
e->tyme = ks_time_now();
+ e->outstanding_pings = 0;
+ if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count;
e->flags = DHTPEER_ACTIVE;
return KS_STATUS_SUCCESS;
}
ks_dhtrt_bucket_header* header = internal->buckets;
ks_dhtrt_bucket_header* stack[KS_DHT_IDSIZE * 8];
int stackix=0;
+ ks_time_t t0 = ks_time_now();
while(header) {
stack[stackix++] = header;
if (header->bucket) {
- /*ks_dhtrt_bucket* b = header->bucket;*/
+ ks_dhtrt_bucket* b = header->bucket;
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
-
- }
+ ks_dhtrt_bucket_entry* e = &b->entries[ix];
+ if (e->inuse == 1) {
+ /* more than n pings outstanding? */
+ if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
+ e->flags = DHTPEER_EXPIRED;
+ ++b->expired_count;
+ continue;
+ }
+ if (e->flags == DHTPEER_SUSPECT) {
+ ks_dhtrt_ping(e);
+ continue;
+ }
+ ks_time_t tdiff = t0 - e->tyme;
+ if (tdiff > KS_DHTRT_INACTIVETIME) {
+ e->flags = DHTPEER_SUSPECT;
+ ks_dhtrt_ping(e);
+ }
+ }
+ } /* end for each bucket_entry */
}
header = header->left;
if (header == 0 && stackix > 1) {
header = header->right;
}
}
-
-
+ return;
}
ks_dhtrt_xorcmp(0, 0, 0);
ks_dhtrt_split_bucket(0, 0, 0);
ks_dhtrt_shiftleft(0);
+ ks_dhtrt_initrwlock( 0);
+ ks_dhtrt_deinitrwlock( 0);
+
+ ks_dhtrt_getreadlock( 0);
+ ks_dhtrt_getwritelock( 0);
+ ks_dhtrt_tryreadlock( 0);
+ ks_dhtrt_trywritelock( 0);
+ ks_dhtrt_releasereadlock( 0);
+ ks_dhtrt_releasewritelock( 0);
+
}
assert(0);
}
uint8_t free = KS_DHT_BUCKETSIZE;
+ uint8_t expiredix = KS_DHT_BUCKETSIZE;
/* find free .. but also check that it is not already here! */
uint8_t ix = 0;
if (free == KS_DHT_BUCKETSIZE) {
free = ix; /* use this one */
}
+ }
+ else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) {
+ expiredix = ix;
}
else if (!memcmp(bucket->entries[ix].id, peer->id, KS_DHT_IDSIZE)) {
#ifdef KS_DHT_DEBUGPRINTF_
return KS_STATUS_SUCCESS; /* already exists */
}
}
+
+ if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
+ /* bump this one - but only if we have no other option */
+ free = expiredix;
+ --bucket->expired_count;
+ }
if ( free<KS_DHT_BUCKETSIZE ) {
bucket->entries[free].inuse = 1;
return KS_STATUS_SUCCESS;
}
- assert(0); /* should not reach this point */
return KS_STATUS_FAIL;
}
return loaded;
}
+void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry) {
+ ++entry->outstanding_pings;
+ /* @todo */
+ /* set the appropriate command in the node and queue if for processing */
+ /*ks_dht_node_t* node = entry->gptr; */
+#ifdef KS_DHT_DEBUGPRINTF_
+ char buf[100];
+ printf(" ping queued for nodeid %s count %d\n",
+ ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
+#endif
+ return;
+}
+
+
/*
strictly for shifting the bucketheader mask
so format must be a right filled mask (hex: ..ffffffff)
return 1;
}
+static
+ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock)
+{
+ ks_status_t s = ks_mutex_create(&lock->mutex, 0, lock->pool);
+ if (s != KS_STATUS_SUCCESS) return s;
+ s = ks_cond_create_ex(&lock->rcond, lock->pool, lock->mutex);
+ if (s != KS_STATUS_SUCCESS) return s;
+ s = ks_cond_create_ex(&lock->wcond, lock->pool, lock->mutex);
+ return s;
+}
+
+static
+void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock)
+{
+ ks_cond_destroy(&lock->rcond);
+ ks_cond_destroy(&lock->wcond);
+ ks_mutex_destroy(&lock->mutex);
+ memset(lock, 0, sizeof(ks_dhtrt_rw_lock));
+}
+
+static
+void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock)
+{
+ ks_mutex_lock(lock->mutex);
+ while (lock->write_count > 0) {
+ ks_cond_wait(lock->rcond);
+ }
+ ++lock->read_count;
+ ks_mutex_unlock(lock->mutex);
+}
+
+static
+ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock)
+{
+ return KS_STATUS_FAIL;
+}
+
+static
+void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock)
+{
+ ks_mutex_lock(lock->mutex);
+ --lock->read_count;
+ if (lock->read_count == 0)
+ ks_cond_signal(lock->wcond);
+ ks_mutex_unlock(lock->mutex);
+}
+
+static
+void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock)
+{
+ ks_mutex_lock(lock->mutex);
+ while (lock->read_count > 0) {
+ ks_cond_wait(lock->wcond);
+ }
+ ++lock->write_count;
+ ks_mutex_unlock(lock->mutex);
+}
+
+static
+ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock)
+{
+ return KS_STATUS_FAIL;
+}
+
+static
+void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock)
+{
+ ks_mutex_lock(lock->mutex);
+ --lock->write_count;
+ assert(lock->write_count==0);
+ ks_cond_broadcast(lock->rcond);
+ ks_mutex_unlock(lock->mutex);
+}
+
+
static char* ks_dhtrt_printableid(const unsigned char* id, char* buffer)
{
char* t = buffer;