ks_assert(d->registry_error);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
- /**
- * Default these to FALSE, binding will set them TRUE when a respective address is bound.
- * @todo these may not be useful anymore they are from legacy code
- */
- d->bind_ipv4 = KS_FALSE;
- d->bind_ipv6 = KS_FALSE;
-
/**
* Initialize the data used to track endpoints to NULL, binding will handle latent allocations.
* The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling.
/**
* Create the hash to store searches.
*/
- ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
- ks_assert(d->search_hash);
+ ks_hash_create(&d->searches4_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->searches4_hash);
+ ks_hash_create(&d->searches6_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
+ ks_assert(d->searches6_hash);
/**
- * The search hash uses arbitrary key size, which requires the key size be provided.
+ * The searches hash uses arbitrary key size, which requires the key size be provided.
*/
- ks_hash_set_keysize(d->search_hash, KS_DHT_NODEID_SIZE);
+ ks_hash_set_keysize(d->searches4_hash, KS_DHT_NODEID_SIZE);
+ ks_hash_set_keysize(d->searches6_hash, KS_DHT_NODEID_SIZE);
/**
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
/**
* Cleanup the search hash and it's contents if it is allocated.
*/
- if (d->search_hash) {
- for (it = ks_hash_first(d->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ if (d->searches6_hash) {
+ for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
ks_dht_search_t *val;
ks_hash_this_val(it, (void **)&val);
ks_dht_search_destroy(&val);
}
- ks_hash_destroy(&d->search_hash);
+ ks_hash_destroy(&d->searches6_hash);
+ }
+ if (d->searches4_hash) {
+ for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ ks_dht_search_t *val;
+
+ ks_hash_this_val(it, (void **)&val);
+ ks_dht_search_destroy(&val);
+ }
+ ks_hash_destroy(&d->searches4_hash);
}
/**
/**
* Cleanup the array of endpoint pointers if it is allocated.
*/
- if (d->endpoints) {
- ks_pool_free(d->pool, &d->endpoints);
- d->endpoints = NULL;
- }
+ if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
/**
* Cleanup the array of endpoint polling data if it is allocated.
*/
- if (d->endpoints_poll) {
- ks_pool_free(d->pool, &d->endpoints_poll);
- d->endpoints_poll = NULL;
- }
+ if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
/**
* Cleanup the endpoints hash if it is allocated.
*/
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
- /**
- * Probably don't need this
- */
- d->bind_ipv4 = KS_FALSE;
- d->bind_ipv6 = KS_FALSE;
-
/**
* Cleanup the type, query, and error registries if they have been allocated.
*/
return KS_STATUS_FAIL;
}
- /**
- * Legacy code, this can probably go away
- */
- dht->bind_ipv4 |= addr->family == AF_INET;
- dht->bind_ipv6 |= addr->family == AF_INET6;
-
/**
* Attempt to open a UDP datagram socket for the given address family.
*/
/**
* Attempt to bind the socket to the desired local address.
*/
- // @todo shouldn't ks_addr_bind take a const addr *?
- if ((ret = ks_addr_bind(sock, (ks_sockaddr_t *)addr)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_addr_bind(sock, addr)) != KS_STATUS_SUCCESS) goto done;
/**
* Allocate the endpoint to track the local socket.
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
}
+KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *searches)
+{
+ ks_hash_iterator_t *it = NULL;
+ ks_time_t now = ks_time_now();
+
+ ks_assert(dht);
+ ks_assert(searches);
+
+ ks_hash_write_lock(searches);
+ for (it = ks_hash_first(searches, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *key = NULL;
+ ks_dht_search_t *value = NULL;
+ int32_t active = 0;
+
+ ks_hash_this(it, &key, NULL, (void **)&value);
+
+ ks_mutex_lock(value->mutex);
+ for (ks_hash_iterator_t *i = ks_hash_first(value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
+ const void *k = NULL;
+ ks_dht_search_pending_t *v = NULL;
+
+ ks_hash_this(i, &k, NULL, (void **)&v);
+
+ if (v->finished) continue;
+
+ if (v->expiration <= now) {
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_log(KS_LOG_DEBUG,
+ "Search for %s pending find_node to %s has expired without response\n",
+ ks_dht_hexid(&value->target, id_buf),
+ ks_dht_hexid(&v->nodeid, id2_buf));
+ v->finished = KS_TRUE;
+ continue;
+ }
+ active++;
+ }
+ ks_mutex_unlock(value->mutex);
+
+ if (active == 0) {
+ for (int32_t index = 0; index < value->callbacks_size; ++index) value->callbacks[index](dht, value);
+ ks_hash_remove(searches, (void *)key);
+ ks_dht_search_destroy(&value);
+ }
+ }
+ ks_hash_write_unlock(searches);
+}
+
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
{
ks_hash_iterator_t *it = NULL;
}
ks_hash_write_unlock(dht->transactions_hash);
- ks_hash_write_lock(dht->search_hash);
- for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *search_key = NULL;
- ks_dht_search_t *search_value = NULL;
-
- ks_hash_this(it, &search_key, NULL, (void **)&search_value);
-
- ks_hash_write_lock(search_value->pending);
- for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
- const void *pending_key = NULL;
- ks_dht_search_pending_t *pending_value = NULL;
- ks_bool_t pending_remove = KS_FALSE;
-
- ks_hash_this(i, &pending_key, NULL, (void **)&pending_value);
-
- if (pending_value->finished) pending_remove = KS_TRUE;
- else if (pending_value->expiration <= now) {
- char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
- ks_log(KS_LOG_DEBUG,
- "Search for %s pending find_node to %s has expired without response\n",
- ks_dht_hexid(&search_value->target, id_buf),
- ks_dht_hexid(&pending_value->nodeid, id2_buf));
- pending_remove = KS_TRUE;
- }
- if (pending_remove) {
- ks_hash_remove(search_value->pending, (void *)pending_key);
- ks_dht_search_pending_destroy(&pending_value);
- }
- }
- ks_hash_write_unlock(search_value->pending);
- if (ks_hash_count(search_value->pending) == 0) {
- for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value);
- ks_hash_remove(dht->search_hash, (void *)search_key);
- ks_dht_search_destroy(&search_value);
- }
- }
- ks_hash_write_unlock(dht->search_hash);
+ if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
+ if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t));
*buffer_length += addr_len;
- memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t));
+ memcpy(buffer + (*buffer_length), &port, sizeof(uint16_t));
*buffer_length += sizeof(uint16_t);
return KS_STATUS_SUCCESS;
port = *((uint16_t *)(buffer + *buffer_length));
*buffer_length += sizeof(uint16_t);
- // @todo ks_addr_set_raw second parameter should be const?
- return ks_addr_set_raw(address, (void *)paddr, port, address->family);
+ return ks_addr_set_raw(address, paddr, port, address->family);
}
KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
return KS_STATUS_NO_MEM;
}
- memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE);
+ memcpy(buffer + (*buffer_length), nodeid->id, KS_DHT_NODEID_SIZE);
*buffer_length += KS_DHT_NODEID_SIZE;
return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
// @todo calculate max IPV6 payload size?
- char buf[1000];
+ char buf[1001];
ks_size_t buf_len;
ks_assert(dht);
// @todo blacklist check
- // @todo use different encode function to check if all data was encoded, do not send large incomplete messages
buf_len = ben_encode2(buf, sizeof(buf), message->data);
+ if (buf_len >= sizeof(buf)) {
+ ks_log(KS_LOG_DEBUG, "Dropping message that is too large\n");
+ return KS_STATUS_FAIL;
+ }
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
- int family,
+ int32_t family,
ks_dht_nodeid_t *target,
ks_dht_search_callback_t callback,
ks_dht_search_t **search)
{
+ ks_bool_t locked_searches = KS_FALSE;
ks_bool_t locked_search = KS_FALSE;
- ks_bool_t locked_pending = KS_FALSE;
+ ks_hash_t *searches = NULL;
+ ks_dhtrt_routetable_t *rt = NULL;
ks_dht_search_t *s = NULL;
ks_bool_t inserted = KS_FALSE;
ks_bool_t allocated = KS_FALSE;
if (search) *search = NULL;
- // @todo start write lock on search_hash and hold until after inserting
+ if (family == AF_INET) {
+ if (!dht->rt_ipv4) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+ searches = dht->searches4_hash;
+ rt = dht->rt_ipv4;
+ } else {
+ if (!dht->rt_ipv6) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+ searches = dht->searches6_hash;
+ rt = dht->rt_ipv6;
+ }
+
// check hash for target to see if search already exists
- ks_hash_write_lock(dht->search_hash);
- locked_search = KS_TRUE;
+ ks_hash_write_lock(searches);
+ locked_searches = KS_TRUE;
- s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED);
+ s = ks_hash_search(searches, target->id, KS_UNLOCKED);
// if search does not exist, create new search and store in hash by target
if (!s) {
// if the search is old then bail out and return successfully
if (!allocated) goto done;
- if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
+ // everything past this point until final cleanup is only for when a search of the target does not already exist
+
+ if ((ret = ks_hash_insert(searches, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
inserted = KS_TRUE;
- // lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up
- ks_hash_write_lock(s->pending);
- locked_pending = KS_TRUE;
+ // lock search before unlocking the searches_hash to prevent this search from being used before we finish setting it up
+ ks_mutex_lock(s->mutex);
+ locked_search = KS_TRUE;
- // release search hash lock now, but pending is still locked
- ks_hash_write_unlock(dht->search_hash);
- locked_search = KS_FALSE;
+ // release searches_hash lock now, but search is still locked
+ ks_hash_write_unlock(searches);
+ locked_searches = KS_FALSE;
// find closest good nodes to target locally and store as the closest results
- query.nodeid = *target;
+ query.nodeid = *target;
query.type = KS_DHT_REMOTE;
query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE;
query.family = family;
query.count = 0;
- ks_dhtrt_findclosest_nodes(family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, &query);
+ ks_dhtrt_findclosest_nodes(rt, &query);
for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *n = query.nodes[i];
ks_dht_search_pending_t *pending = NULL;
-
- s->results[i] = n->nodeid;
- ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target);
- // add to pending with expiration
+
+ // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet
+ s->results[s->results_length] = n->nodeid;
+ ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target);
+ s->results_length++;
+
+ pending = ks_hash_search(s->pending, n->nodeid.id, KS_UNLOCKED);
+ if (pending) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check
+
+ // add to pending with expiration, if any of this fails it's almost catastrophic so just bail out and fail the entire search attempt
+ // there are no probable causes for a failure but check them anyway
if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
ks_dht_search_pending_destroy(&pending);
goto done;
}
if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
- // increment here in case we end up bailing out; execute with what it has or destroy the search?
- s->results_length++;
}
- // @todo release query nodes
- ks_hash_write_unlock(s->pending);
- locked_pending = KS_FALSE;
+ // @todo release closest local query node locks
+ ks_mutex_unlock(s->mutex);
+ locked_search = KS_FALSE;
if (search) *search = s;
done:
- if (locked_search) ks_hash_write_unlock(dht->search_hash);
- if (locked_pending) ks_hash_write_unlock(s->pending);
+ if (locked_searches) ks_hash_write_unlock(searches);
+ if (locked_search) ks_mutex_unlock(s->mutex);
if (ret != KS_STATUS_SUCCESS) {
if (!inserted && s) ks_dht_search_destroy(&s);
*search = NULL;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
- // @todo produce "want" value if both families are bound
+ // Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
+ if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, targetid->id, KS_DHT_NODEID_SIZE)) {
+ struct bencode *want = ben_list();
+ ben_list_append_str(want, "n4");
+ ben_list_append_str(want, "n6");
+ ben_dict_set(a, ben_blob("want", 4), want);
+ }
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
if (want4) {
query.family = AF_INET;
- ks_dhtrt_findclosest_nodes(routetable, &query);
+ ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query);
for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *qn = query.nodes[i];
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
}
+ // @todo release query nodes
}
if (want6) {
query.family = AF_INET6;
- ks_dhtrt_findclosest_nodes(routetable, &query);
+ ks_dhtrt_findclosest_nodes(dht->rt_ipv6, &query);
for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *qn = query.nodes[i];
{
ks_dht_nodeid_t *id;
struct bencode *n;
+ //ks_bool_t n4 = KS_FALSE;
+ //ks_bool_t n6 = KS_FALSE;
const uint8_t *nodes = NULL;
const uint8_t *nodes6 = NULL;
size_t nodes_size = 0;
ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_hash_t *searches = NULL;
ks_dht_search_t *search = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
n = ben_dict_get_by_str(message->args, "nodes");
if (n) {
+ //n4 = KS_TRUE;
nodes = (const uint8_t *)ben_str_val(n);
nodes_size = ben_str_len(n);
}
n = ben_dict_get_by_str(message->args, "nodes6");
if (n) {
+ //n6 = KS_TRUE;
nodes6 = (const uint8_t *)ben_str_val(n);
nodes6_size = ben_str_len(n);
}
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
- ks_hash_read_lock(dht->search_hash);
- search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED);
- ks_hash_read_unlock(dht->search_hash);
+ searches = message->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
+
+ ks_hash_read_lock(searches);
+ search = ks_hash_search(searches, message->transaction->target.id, KS_UNLOCKED);
if (search) {
- ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED);
- ks_hash_read_unlock(search->pending);
+ ks_dht_search_pending_t *pending = NULL;
+
+ ks_mutex_lock(search->mutex);
+ pending = ks_hash_search(search->pending, id->id, KS_UNLOCKED);
if (pending) pending->finished = KS_TRUE;
}
+ ks_hash_read_unlock(searches);
while (nodes_len < nodes_size) {
ks_dht_nodeid_t nid;
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
ks_dhtrt_release_node(node);
- if (search) {
+ if (search && message->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
search->results_length++;
} else {
for (int32_t index = 0; index < search->results_length; ++index) {
- // Check if new node is closer than this existing result
+ // Check if new node is closer than this previous result
if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
// If this is the first node that is further then keep it
- // Else if two or more nodes are further, and this existing result is further than the previous one then keep it
+ // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
if (results_index < 0) results_index = index;
else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
}
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
ks_dhtrt_release_node(node);
+
+ if (search && message->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
+ ks_dht_nodeid_t distance;
+ int32_t results_index = -1;
+
+ ks_dht_utility_nodeid_xor(&distance, &nid, &search->target);
+ if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
+ results_index = search->results_length;
+ search->results_length++;
+ } else {
+ for (int32_t index = 0; index < search->results_length; ++index) {
+ // Check if new node is closer than this previous result
+ if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) {
+ // If this is the first node that is further then keep it
+ // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result
+ if (results_index < 0) results_index = index;
+ else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index;
+ }
+ }
+ }
+
+ if (results_index >= 0) {
+ char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+ ks_dht_search_pending_t *pending = NULL;
+
+ ks_log(KS_LOG_DEBUG,
+ "Set closer node id %s (%s) in search of target id %s at results index %d\n",
+ ks_dht_hexid(&nid, id_buf),
+ ks_dht_hexid(&distance, id2_buf),
+ ks_dht_hexid(&search->target, id3_buf),
+ results_index);
+ search->results[results_index] = nid;
+ search->distances[results_index] = distance;
+
+ if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
+ ks_dht_search_pending_destroy(&pending);
+ goto done;
+ }
+ if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
+ }
+ }
}
- // @todo repeat above for ipv6 table
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
done:
+ if(search) ks_mutex_unlock(search->mutex);
return ret;
}