dht->autoroute = KS_FALSE;
dht->autoroute_port = 0;
-
+
ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht_register_type(dht, "q", ks_dht_process_query);
ks_dht_register_type(dht, "r", ks_dht_process_response);
ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
-
+
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
-
+
dht->endpoints = NULL;
dht->endpoints_size = 0;
ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool);
dht->endpoints_poll = NULL;
+ dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS;
+
ks_q_create(&dht->send_q, dht->pool, 0);
dht->send_q_unsent = NULL;
dht->recv_buffer_length = 0;
ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE);
-
+
return KS_STATUS_SUCCESS;
}
*/
KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht)
{
+ ks_hash_iterator_t *it;
ks_assert(dht);
- // @todo free storage_hash entries
if (dht->storage_hash) {
+ for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *key;
+ ks_dht_storageitem_t *val;
+ ks_hash_this(it, &key, NULL, (void **)&val);
+ ks_dht_storageitem_deinit(val);
+ ks_dht_storageitem_free(&val);
+ }
ks_hash_destroy(&dht->storage_hash);
- dht->storage_hash = NULL;
}
+
dht->token_secret_current = 0;
dht->token_secret_previous = 0;
dht->token_secret_expiration = 0;
- if (dht->rt_ipv4) {
- ks_dhtrt_deinitroute(&dht->rt_ipv4);
- dht->rt_ipv4 = NULL;
- }
- if (dht->rt_ipv6) {
- ks_dhtrt_deinitroute(&dht->rt_ipv6);
- dht->rt_ipv6 = NULL;
- }
+
+ if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4);
+ if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6);
+
dht->transactionid_next = 0;
- if (dht->transactions_hash) {
- ks_hash_destroy(&dht->transactions_hash);
- dht->transactions_hash = NULL;
- }
+ if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash);
+
dht->recv_buffer_length = 0;
+
if (dht->send_q) {
ks_dht_message_t *msg;
while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) {
ks_dht_message_free(&msg);
}
ks_q_destroy(&dht->send_q);
- dht->send_q = NULL;
}
if (dht->send_q_unsent) {
ks_dht_message_deinit(dht->send_q_unsent);
ks_dht_message_free(&dht->send_q_unsent);
}
+
+ dht->pulse_expirations = 0;
+
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
ks_dht_endpoint_t *ep = dht->endpoints[i];
ks_dht_endpoint_deinit(ep);
ks_pool_free(dht->pool, dht->endpoints);
dht->endpoints = NULL;
}
+
if (dht->endpoints_poll) {
ks_pool_free(dht->pool, dht->endpoints_poll);
dht->endpoints_poll = NULL;
}
- if (dht->endpoints_hash) {
- ks_hash_destroy(&dht->endpoints_hash);
- dht->endpoints_hash = NULL;
- }
+ if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash);
+
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
- if (dht->registry_type) {
- ks_hash_destroy(&dht->registry_type);
- dht->registry_type = NULL;
- }
- if (dht->registry_query) {
- ks_hash_destroy(&dht->registry_query);
- dht->registry_query = NULL;
- }
- if (dht->registry_error) {
- ks_hash_destroy(&dht->registry_error);
- dht->registry_error = NULL;
- }
+ if (dht->registry_type) ks_hash_destroy(&dht->registry_type);
+ if (dht->registry_query) ks_hash_destroy(&dht->registry_query);
+ if (dht->registry_error) ks_hash_destroy(&dht->registry_error);
dht->autoroute = KS_FALSE;
dht->autoroute_port = 0;
-
+
return KS_STATUS_SUCCESS;
}
{
ks_assert(dht);
- if (!autoroute) {
- port = 0;
- } else if (port <= 0) {
- port = KS_DHT_DEFAULT_PORT;
- }
-
+ if (!autoroute) port = 0;
+ else if (port <= 0) port = KS_DHT_DEFAULT_PORT;
+
dht->autoroute = autoroute;
dht->autoroute_port = port;
-
+
return KS_STATUS_SUCCESS;
}
ks_assert(endpoint);
*endpoint = NULL;
-
+
ks_ip_route(ip, sizeof(ip), raddr->host);
- // @todo readlock hash
- if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+ ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED);
+ ks_hash_read_unlock(dht->endpoints_hash);
+
+ if (!ep && dht->autoroute) {
ks_sockaddr_t addr;
ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
- if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
if (!ep) {
ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
return KS_STATUS_FAIL;
}
-
+
return KS_STATUS_SUCCESS;
}
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
- // @todo writelock registry
+
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
- // @todo writelock registry
+
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
- // @todo writelock registry
+
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
ks_dht_endpoint_t *ep;
ks_socket_t sock;
int32_t epindex;
-
+
ks_assert(dht);
ks_assert(addr);
ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
ks_assert(addr->port);
- if (endpoint) {
- *endpoint = NULL;
- }
+ if (endpoint) *endpoint = NULL;
dht->bind_ipv4 |= addr->family == AF_INET;
dht->bind_ipv6 |= addr->family == AF_INET6;
- if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
- return KS_STATUS_FAIL;
- }
+ if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL;
// @todo shouldn't ks_addr_bind take a const addr *?
if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) {
ks_socket_close(&sock);
return KS_STATUS_FAIL;
}
-
+
if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) {
ks_socket_close(&sock);
return KS_STATUS_FAIL;
}
-
+
if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) {
ks_dht_endpoint_free(&ep);
ks_socket_close(&sock);
ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE);
ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE);
-
+
epindex = dht->endpoints_size++;
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
dht->endpoints[epindex] = ep;
ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep);
-
+
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size);
dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
- // @todo initialize or add local nodeid to appropriate route table
if (ep->addr.family == AF_INET) {
- if (!dht->rt_ipv4) {
- ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
- }
+ if (!dht->rt_ipv4) ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool);
+ ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
} else {
- if (!dht->rt_ipv6) {
- ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
- }
- }
-
- if (endpoint) {
- *endpoint = ep;
+ if (!dht->rt_ipv6) ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
+ ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node);
}
+ if (endpoint) *endpoint = ep;
+
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{
int32_t result;
-
+
ks_assert(dht);
ks_assert (timeout >= 0);
if (timeout == 0) {
// @todo deal with default timeout, should return quickly but not hog the CPU polling
}
-
+
result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout);
if (result > 0) {
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (dht->endpoints_poll[i].revents & POLLIN) {
ks_sockaddr_t raddr = KS_SA_INIT;
dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE;
-
+
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
// @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool
}
}
- ks_dht_idle(dht);
+ ks_dht_pulse_expirations(dht);
+
+ ks_dht_pulse_send(dht);
+
+ if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
+ if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
}
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address,
- uint8_t *buffer,
- ks_size_t *buffer_length,
- ks_size_t buffer_size)
+KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
+{
+ ks_hash_iterator_t *it = NULL;
+ ks_time_t now = ks_time_now_sec();
+
+ ks_assert(dht);
+
+ if (dht->pulse_expirations <= now) {
+ dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS;
+ }
+
+ ks_hash_write_lock(dht->transactions_hash);
+ for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *key = NULL;
+ ks_dht_transaction_t *value = NULL;
+ ks_bool_t remove = KS_FALSE;
+
+ ks_hash_this(it, &key, NULL, (void **)&value);
+ if (value->finished) remove = KS_TRUE;
+ else if (value->expiration <= now) {
+ ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+ remove = KS_TRUE;
+ }
+ if (remove) {
+ ks_hash_remove(dht->transactions_hash, (char *)key);
+ ks_pool_free(value->pool, value);
+ }
+ }
+ ks_hash_write_unlock(dht->transactions_hash);
+
+ if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
+ dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+ dht->token_secret_previous = dht->token_secret_current;
+ dht->token_secret_current = rand();
+ }
+}
+
+/**
+ *
+ */
+KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht)
+{
+ ks_dht_message_t *message;
+ ks_bool_t bail = KS_FALSE;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(dht);
+
+ while (!bail) {
+ message = NULL;
+ if (dht->send_q_unsent) {
+ message = dht->send_q_unsent;
+ dht->send_q_unsent = NULL;
+ }
+ if (!message) bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
+ if (!bail) {
+ bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS;
+ if (ret == KS_STATUS_BREAK) dht->send_q_unsent = message;
+ else if (ret == KS_STATUS_SUCCESS) {
+ ks_dht_message_deinit(message);
+ ks_dht_message_free(&message);
+ }
+ }
+ }
+}
+
+/**
+ *
+ */
+static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
+{
+ char *t = buffer;
+
+ ks_assert(id);
+ ks_assert(buffer);
+
+ memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1);
+
+ for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]);
+
+ return buffer;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address,
+ uint8_t *buffer,
+ ks_size_t *buffer_length,
+ ks_size_t buffer_size)
{
- ks_size_t required = sizeof(uint16_t);
+ ks_size_t addr_len;
+ const void *paddr = NULL;
uint16_t port = 0;
-
+
ks_assert(address);
ks_assert(buffer);
ks_assert(buffer_length);
ks_assert(buffer_size);
ks_assert(address->family == AF_INET || address->family == AF_INET6);
- if (address->family == AF_INET) {
- required += sizeof(uint32_t);
- } else {
- required += 8 * sizeof(uint16_t);
- }
-
- if (*buffer_length + required > buffer_size) {
+ addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
+
+ if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) {
ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n");
return KS_STATUS_FAIL;
}
if (address->family == AF_INET) {
- uint32_t *paddr = (uint32_t *)&address->v.v4.sin_addr;
- uint32_t addr = htonl(*paddr);
- port = htons(address->v.v4.sin_port);
-
- memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint32_t));
- *buffer_length += sizeof(uint32_t);
+ paddr = &address->v.v4.sin_addr; // already network byte order
+ port = address->v.v4.sin_port; // already network byte order
} else {
- uint16_t *paddr = (uint16_t *)&address->v.v6.sin6_addr;
- port = htons(address->v.v6.sin6_port);
-
- for (int32_t i = 0; i < 8; ++i) {
- uint16_t addr = htons(paddr[i]);
- memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint16_t));
- *buffer_length += sizeof(uint16_t);
- }
+ paddr = &address->v.v6.sin6_addr; // already network byte order
+ port = address->v.v6.sin6_port; // already network byte order
}
+ memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t));
+ *buffer_length += addr_len;
- memcpy(buffer + (*buffer_length), (void *)&port, sizeof(uint16_t));
+ memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t));
*buffer_length += sizeof(uint16_t);
-
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer,
+ ks_size_t *buffer_length,
+ ks_size_t buffer_size,
+ ks_sockaddr_t *address)
+{
+ ks_size_t addr_len;
+ const void *paddr = NULL;
+ uint16_t port = 0;
+
+ ks_assert(buffer);
+ ks_assert(buffer_length);
+ ks_assert(address);
+ ks_assert(address->family == AF_INET ||address->family == AF_INET6);
+
+ addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8);
+ if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_FAIL;
+
+ paddr = buffer + *buffer_length;
+ *buffer_length += addr_len;
+ port = *((uint16_t *)(buffer + *buffer_length));
+ *buffer_length += sizeof(uint16_t);
+
+ // @todo ks_addr_set_raw second parameter should be const?
+ ks_addr_set_raw(address, (void *)paddr, port, address->family);
+
return KS_STATUS_SUCCESS;
}
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid,
- ks_sockaddr_t *address,
- uint8_t *buffer,
- ks_size_t *buffer_length,
- ks_size_t buffer_size)
+KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid,
+ const ks_sockaddr_t *address,
+ uint8_t *buffer,
+ ks_size_t *buffer_length,
+ ks_size_t buffer_size)
{
ks_assert(address);
ks_assert(buffer);
memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE);
*buffer_length += KS_DHT_NODEID_SIZE;
- return ks_dht_utility_compact_address(address, buffer, buffer_length, buffer_size);
+ return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size);
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer,
+ ks_size_t *buffer_length,
+ ks_size_t buffer_size,
+ ks_dht_nodeid_t *nodeid,
+ ks_sockaddr_t *address)
+{
+ ks_assert(buffer);
+ ks_assert(buffer_length);
+ ks_assert(nodeid);
+ ks_assert(address);
+ ks_assert(address->family == AF_INET ||address->family == AF_INET6);
+
+ if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_FAIL;
+
+ memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
+ *buffer_length += KS_DHT_NODEID_SIZE;
+
+ return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address);
}
/**
ks_assert(nodeid);
*nodeid = NULL;
-
+
id = ben_dict_get_by_str(args, key);
if (!id) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
return KS_STATUS_FAIL;
}
-
+
idv = ben_str_val(id);
idv_len = ben_str_len(id);
if (idv_len != KS_DHT_NODEID_SIZE) {
ks_assert(token);
*token = NULL;
-
+
tok = ben_dict_get_by_str(args, key);
if (!tok) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
secret = htonl(secret);
port = htons(raddr->port);
-
+
SHA1_Init(&sha);
SHA1_Update(&sha, &secret, sizeof(uint32_t));
SHA1_Update(&sha, raddr->host, strlen(raddr->host));
ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok);
- if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) {
- return KS_TRUE;
- }
+ if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) return KS_TRUE;
ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok);
/**
*
*/
-KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht)
+KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
- ks_assert(dht);
-
- ks_dht_idle_expirations(dht);
-
- ks_dht_idle_send(dht);
-}
+ // @todo calculate max IPV6 payload size?
+ char buf[1000];
+ ks_size_t buf_len;
-/**
- *
- */
-KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht)
-{
- ks_hash_iterator_t *it = NULL;
- ks_time_t now = ks_time_now_sec();
-
ks_assert(dht);
+ ks_assert(message);
+ ks_assert(message->endpoint);
+ ks_assert(message->data);
- // @todo add delay between checking expirations, every 10 seconds?
+ // @todo blacklist check
- ks_hash_write_lock(dht->transactions_hash);
- for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- const void *key = NULL;
- ks_dht_transaction_t *value = NULL;
- ks_bool_t remove = KS_FALSE;
+ buf_len = ben_encode2(buf, sizeof(buf), message->data);
- ks_hash_this(it, &key, NULL, (void **)&value);
- if (value->finished) {
- remove = KS_TRUE;
- } else if (value->expiration <= now) {
- ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
- remove = KS_TRUE;
- }
- if (remove) {
- ks_hash_remove(dht->transactions_hash, (char *)key);
- ks_pool_free(value->pool, value);
- }
- }
- ks_hash_write_unlock(dht->transactions_hash);
+ ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
+ ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
- if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
- dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
- dht->token_secret_previous = dht->token_secret_current;
- dht->token_secret_current = rand();
- }
+ return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
}
/**
*
*/
-KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht)
+KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
+ ks_dht_endpoint_t *ep,
+ ks_sockaddr_t *raddr,
+ const char *query,
+ ks_dht_message_callback_t callback,
+ ks_dht_message_t **message,
+ struct bencode **args)
{
- ks_dht_message_t *message;
- ks_bool_t bail = KS_FALSE;
- ks_status_t ret = KS_STATUS_SUCCESS;
+ uint32_t transactionid;
+ ks_dht_transaction_t *trans = NULL;
+ ks_dht_message_t *msg = NULL;
+ ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
-
- while (!bail) {
- message = NULL;
- if (dht->send_q_unsent) {
- message = dht->send_q_unsent;
- dht->send_q_unsent = NULL;
- }
- if (!message) {
- bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
- }
- if (!bail) {
- bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS;
- if (ret == KS_STATUS_BREAK) {
- dht->send_q_unsent = message;
- } else if (ret == KS_STATUS_SUCCESS) {
- ks_dht_message_deinit(message);
- ks_dht_message_free(&message);
- }
- }
- }
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
-{
- // @todo calculate max IPV6 payload size?
- char buf[1000];
- ks_size_t buf_len;
-
- ks_assert(dht);
- ks_assert(message);
- ks_assert(message->endpoint);
- ks_assert(message->data);
-
- // @todo blacklist check
-
- buf_len = ben_encode2(buf, sizeof(buf), message->data);
-
- ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
- ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
-
- return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
- ks_dht_endpoint_t *ep,
- ks_sockaddr_t *raddr,
- uint8_t *transactionid,
- ks_size_t transactionid_length,
- long long errorcode,
- const char *errorstr)
-{
- ks_dht_message_t *error = NULL;
- struct bencode *e = NULL;
- ks_status_t ret = KS_STATUS_FAIL;
-
- ks_assert(dht);
- ks_assert(raddr);
- ks_assert(transactionid);
- ks_assert(errorstr);
-
- if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
- if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
- if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
- goto done;
- }
-
- if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
- goto done;
- }
-
- ben_list_append(e, ben_int(errorcode));
- ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
-
- ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
- ks_q_push(dht->send_q, (void *)error);
-
- ret = KS_STATUS_SUCCESS;
-
- done:
- if (ret != KS_STATUS_SUCCESS && error) {
- ks_dht_message_deinit(error);
- ks_dht_message_free(&error);
- }
- return ret;
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
- ks_dht_endpoint_t *ep,
- ks_sockaddr_t *raddr,
- const char *query,
- ks_dht_message_callback_t callback,
- ks_dht_message_t **message,
- struct bencode **args)
-{
- uint32_t transactionid;
- ks_dht_transaction_t *trans = NULL;
- ks_dht_message_t *msg = NULL;
- ks_status_t ret = KS_STATUS_FAIL;
-
- ks_assert(dht);
- ks_assert(raddr);
- ks_assert(query);
- ks_assert(callback);
- ks_assert(message);
+ ks_assert(raddr);
+ ks_assert(query);
+ ks_assert(callback);
+ ks_assert(message);
*message = NULL;
- if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo atomic increment or mutex
transactionid = dht->transactionid_next++;
- if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) goto done;
*message = msg;
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(message);
-
+
*message = NULL;
- if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
+
+ if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) goto done;
- if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) {
- goto done;
- }
-
*message = msg;
ret = KS_STATUS_SUCCESS;
return ret;
}
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
-{
- ks_dht_message_t *message = NULL;
- struct bencode *a = NULL;
-
- ks_assert(dht);
- ks_assert(raddr);
-
- if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
- ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
-
- ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
- ks_q_push(dht->send_q, (void *)message);
-
- return KS_STATUS_SUCCESS;
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
-{
- ks_dht_message_t *message = NULL;
- struct bencode *a = NULL;
-
- ks_assert(dht);
- ks_assert(raddr);
- ks_assert(targetid);
-
- if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
- ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
- ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
-
- ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
- ks_q_push(dht->send_q, (void *)message);
-
- return KS_STATUS_SUCCESS;
-}
-
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
-{
- ks_dht_message_t *message = NULL;
- struct bencode *a = NULL;
-
- ks_assert(dht);
- ks_assert(raddr);
- ks_assert(targetid);
-
- if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
- ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
- // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
- ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
-
- ks_log(KS_LOG_DEBUG, "Sending message query get\n");
- ks_q_push(dht->send_q, (void *)message);
-
- return KS_STATUS_SUCCESS;
-}
-
/**
*
*/
}
// @todo blacklist check for bad actor nodes
-
- if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
- if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
- goto done;
- }
+ if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- // @todo readlocking registry for calling from threadpool
- if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
- ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
- } else {
- ret = callback(dht, &message);
- }
+ if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) goto done;
+
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_READLOCKED);
+ ks_hash_read_unlock(dht->registry_type);
+
+ if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
+ else ret = callback(dht, &message);
done:
ks_dht_message_deinit(&message);
-
+
return ret;
}
ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n");
return KS_STATUS_FAIL;
}
-
+
qv = ben_str_val(q);
qv_len = ben_str_len(q);
if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
message->args = a;
- // @todo readlocking registry for calling from threadpool
- if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
- ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
- } else {
- ret = callback(dht, message);
- }
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED);
+ ks_hash_read_unlock(dht->registry_query);
+
+ if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
+ else ret = callback(dht, message);
return ret;
}
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
-
- if (!transaction) {
- ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
- } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
+
+ if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
+ else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message response rejected due to spoofing from %s %d, expected %s %d\n",
message->raddr.host,
return ret;
}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback)
+{
+ ks_assert(dht);
+ ks_assert(id);
+
+ // @todo check hash for id to see if search already exists
+
+ // @todo if search does not exist, create new search and store in hash by id
+
+ // @todo queue callback into search, if multiple tasks are searching the same id they can all be notified of results
+
+ // @todo if search existed already and is already running then bail out and let it run
+
+ // @todo find closest nodes to id locally, store as closest results, and queue in search pending a find_node call for closer nodes
+
+ // @todo pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
+
+
+ // @todo upon receiving response to find_node, check for an existing search by the id
+
+ // @todo keep track of the closest K(8) nodes found to the id
+
+ // @todo if there is closer node(s) in response, update furthest search result(s) and queue find_node calls for closer nodes
+
+ // @todo if search queue is empty, call callbacks
+
+ // @todo otherwise pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout
+
+
+ // @todo during pulse iterate searches and check for last popped timeout where find_node received no reply
+
+ // @todo pop a pending find_node call, or call callbacks if empty
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
+ ks_dht_endpoint_t *ep,
+ ks_sockaddr_t *raddr,
+ uint8_t *transactionid,
+ ks_size_t transactionid_length,
+ long long errorcode,
+ const char *errorstr)
+{
+ ks_dht_message_t *error = NULL;
+ struct bencode *e = NULL;
+ ks_status_t ret = KS_STATUS_FAIL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(transactionid);
+ ks_assert(errorstr);
+
+ if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done;
+
+ if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done;
+
+ ben_list_append(e, ben_int(errorcode));
+ ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
+
+ ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
+ ks_q_push(dht->send_q, (void *)error);
+
+ ret = KS_STATUS_SUCCESS;
+
+ done:
+ if (ret != KS_STATUS_SUCCESS && error) {
+ ks_dht_message_deinit(error);
+ ks_dht_message_free(&error);
+ }
+ return ret;
+}
+
/**
*
*/
}
errorcode = ben_int_val(ec);
et = ben_str_val(es);
-
+
memcpy(error, et, es_len);
error[es_len] = '\0';
// @todo end of ks_dht_message_parse_error
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
-
+
if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_dht_message_callback_t callback;
transaction->finished = KS_TRUE;
- // @todo readlock on registry
- if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
- ret = callback(dht, message);
- } else {
+ callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
+ ks_hash_read_unlock(dht->registry_error);
+
+ if (callback) ret = callback(dht, message);
+ else {
ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
ret = KS_STATUS_SUCCESS;
}
return ret;
}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr)
+{
+ ks_dht_message_t *message = NULL;
+ struct bencode *a = NULL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+
+ if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+
+ ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+ ks_q_push(dht->send_q, (void *)message);
+
+ return KS_STATUS_SUCCESS;
+}
+
/**
*
*/
ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- // @todo add/touch bucket entry for remote node
+ routetable = message->endpoint->node->table;
+
+ // @todo touch here, or only create if not exists?
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
+{
+ ks_dht_nodeid_t *id;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
+
+ ks_assert(dht);
+ ks_assert(message);
+
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ routetable = message->endpoint->node->table;
+
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
+
+ ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+{
+ ks_dht_message_t *message = NULL;
+ struct bencode *a = NULL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(targetid);
+
+ if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+
+ ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
+ ks_q_push(dht->send_q, (void *)message);
+
+ return KS_STATUS_SUCCESS;
+}
+
/**
*
*/
uint8_t buffer6[1000];
ks_size_t buffer4_length = 0;
ks_size_t buffer6_length = 0;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
+ ks_dhtrt_querynodes_t query;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
want = ben_dict_get_by_str(message->args, "want");
if (want) {
size_t want_len = ben_list_len(want);
for (size_t i = 0; i < want_len; ++i) {
struct bencode *iv = ben_list_get(want, i);
- if (!ben_cmp_with_str(iv, "n4")) {
- want4 = KS_TRUE;
- }
- if (!ben_cmp_with_str(iv, "n6")) {
- want6 = KS_TRUE;
- }
+ if (!ben_cmp_with_str(iv, "n4")) want4 = KS_TRUE;
+ if (!ben_cmp_with_str(iv, "n6")) want6 = KS_TRUE;
}
}
want6 = message->raddr.family == AF_INET6;
}
- // @todo add/touch bucket entry for remote node
-
+ routetable = message->endpoint->node->table;
+
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
+
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
+ query.nodeid = *target;
+ query.type = ks_dht_remote_t;
+ query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE
if (want4) {
- // @todo get closest nodes to target from ipv4 route table
- // @todo compact nodes into buffer4
+ query.family = AF_INET;
+ ks_dhtrt_findclosest_nodes(routetable, &query);
+
+ for (int32_t i = 0; i < query.count; ++i) {
+ if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
+ &query.nodes[i]->addr,
+ buffer4,
+ &buffer4_length,
+ sizeof(buffer4)) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+ ks_log(KS_LOG_DEBUG,
+ "Compacted ipv4 nodeinfo for %s (%s %d)\n",
+ ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
+ query.nodes[i]->addr.host,
+ query.nodes[i]->addr.port);
+ }
}
if (want6) {
- // @todo get closest nodes to target from ipv6 route table
- // @todo compact nodes into buffer6
- }
-
- // @todo remove this, testing only
- if (ks_dht_utility_compact_node(id,
- &message->raddr,
- message->raddr.family == AF_INET ? buffer4 : buffer6,
- message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length,
- message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
+ query.family = AF_INET6;
+ ks_dhtrt_findclosest_nodes(routetable, &query);
+
+ for (int32_t i = 0; i < query.count; ++i) {
+ if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid,
+ &query.nodes[i]->addr,
+ buffer6,
+ &buffer6_length,
+ sizeof(buffer6)) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+ ks_log(KS_LOG_DEBUG,
+ "Compacted ipv6 nodeinfo for %s (%s %d)\n",
+ ks_dht_hexid(&query.nodes[i]->nodeid, id_buf),
+ query.nodes[i]->addr.host,
+ query.nodes[i]->addr.port);
+ }
}
if (ks_dht_setup_response(dht,
}
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
- if (want4) {
- ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
- }
- if (want6) {
- ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
- }
+ if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
+ if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response);
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
+{
+ ks_dht_nodeid_t *id;
+ struct bencode *n;
+ const uint8_t *nodes = NULL;
+ const uint8_t *nodes6 = NULL;
+ size_t nodes_size = 0;
+ size_t nodes6_size = 0;
+ size_t nodes_len = 0;
+ size_t nodes6_len = 0;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
+ char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
+
+ ks_assert(dht);
+ ks_assert(message);
+
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ n = ben_dict_get_by_str(message->args, "nodes");
+ if (n) {
+ nodes = (const uint8_t *)ben_str_val(n);
+ nodes_size = ben_str_len(n);
+ }
+ n = ben_dict_get_by_str(message->args, "nodes6");
+ if (n) {
+ nodes6 = (const uint8_t *)ben_str_val(n);
+ nodes6_size = ben_str_len(n);
+ }
+
+ routetable = message->endpoint->node->table;
+
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
+
+ while (nodes_len < nodes_size) {
+ ks_dht_nodeid_t nid;
+ ks_sockaddr_t addr;
+
+ addr.family = AF_INET;
+ if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ks_log(KS_LOG_DEBUG,
+ "Expanded ipv4 nodeinfo for %s (%s %d)\n",
+ ks_dht_hexid(&nid, id_buf),
+ addr.host,
+ addr.port);
+
+ if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(dht->rt_ipv4, nid, ks_dht_remote_t, addr.host, addr.port, &node);
+ }
+ }
+
+ while (nodes6_len < nodes6_size) {
+ ks_dht_nodeid_t nid;
+ ks_sockaddr_t addr;
+
+ addr.family = AF_INET6;
+ if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ks_log(KS_LOG_DEBUG,
+ "Expanded ipv6 nodeinfo for %s (%s %d)\n",
+ ks_dht_hexid(&nid, id_buf),
+ addr.host,
+ addr.port);
+
+ if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(dht->rt_ipv6, nid, ks_dht_remote_t, addr.host, addr.port, &node);
+ }
+ }
+ // @todo repeat above for ipv6 table
+
+ ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+{
+ ks_dht_message_t *message = NULL;
+ struct bencode *a = NULL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(targetid);
+
+ if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+
+ ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+ ks_q_push(dht->send_q, (void *)message);
+
+ return KS_STATUS_SUCCESS;
+}
+
/**
*
*/
ks_dht_storageitem_t *item = NULL;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
- if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
seq = ben_dict_get_by_str(message->args, "seq");
- if (seq) {
- sequence = ben_int_val(seq);
- }
+ if (seq) sequence = ben_int_val(seq);
- // @todo add/touch bucket entry for remote node
+ routetable = message->endpoint->node->table;
+
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
-
+
item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED);
ks_hash_read_unlock(dht->storage_hash);
// @todo find closest ipv4 and ipv6 nodes to target
// @todo compact ipv4 and ipv6 nodes into separate buffers
-
+
if (ks_dht_setup_response(dht,
message->endpoint,
&message->raddr,
}
ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
}
- if (!sequence_snuffed) {
- ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
- }
+ if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
}
// @todo nodes, nodes6
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
+{
+ ks_dht_nodeid_t *id;
+ ks_dht_token_t *token;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
+
+ ks_assert(dht);
+ ks_assert(message);
+
+ // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ // @todo add extract function for mutable ks_dht_storageitem_key_t
+ // @todo add extract function for mutable ks_dht_storageitem_signature_t
+
+ routetable = message->endpoint->node->table;
+
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
+ // @todo add/touch bucket entries for other nodes/nodes6 returned
+
+ ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+
+ return KS_STATUS_SUCCESS;
+}
+
+
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
+ ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
- // @todo add/touch bucket entry for remote node
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+
+ routetable = message->endpoint->node->table;
+
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
+ }
ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
return KS_STATUS_SUCCESS;
}
-
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
{
- ks_assert(dht);
- ks_assert(message);
-
- // @todo add/touch bucket entry for remote node
-
- ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
-
- return KS_STATUS_SUCCESS;
-}
+ ks_dht_nodeid_t *id;
+ ks_dhtrt_routetable_t *routetable = NULL;
+ ks_dht_node_t *node = NULL;
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
-{
ks_assert(dht);
ks_assert(message);
- // @todo add/touch bucket entry for remote node and other nodes returned
-
- ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
-
- return KS_STATUS_SUCCESS;
-}
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
-/**
- *
- */
-KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
-{
- ks_dht_nodeid_t *id;
- ks_dht_token_t *token;
-
- ks_assert(dht);
- ks_assert(message);
+ routetable = message->endpoint->node->table;
- // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
- if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
+ if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) {
+ ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node);
}
-
- if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
-
- // @todo add extract function for mutable ks_dht_storageitem_key_t
- // @todo add extract function for mutable ks_dht_storageitem_signature_t
-
- // @todo add/touch bucket entry for remote node and other nodes returned
- ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+ ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
return KS_STATUS_SUCCESS;
}