return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint)
+{
+ // @todo lookup standard def for IPV6 max size
+ char ip[48];
+ ks_dht_endpoint_t *ep = NULL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(endpoint);
+
+ *endpoint = NULL;
+
+ ks_ip_route(ip, sizeof(ip), raddr->host);
+
+ // @todo readlock hash
+ if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+ ks_sockaddr_t addr;
+ ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
+ if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+ }
+
+ if (!ep) {
+ ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
+ return KS_STATUS_FAIL;
+ }
+
+ return KS_STATUS_SUCCESS;
+}
+
/**
*
*/
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
-
+ // @todo writelock registry
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
-
+ // @todo writelock registry
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
-
+ // @todo writelock registry
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
dht->bind_ipv4 |= addr->family == AF_INET;
dht->bind_ipv6 |= addr->family == AF_INET6;
- // @todo start of ks_dht_endpoint_bind
if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) {
return KS_STATUS_FAIL;
}
ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE);
ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE);
- // @todo end of ks_dht_endpoint_bind
epindex = dht->endpoints_size++;
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
// @todo initialize or add local nodeid to appropriate route table
if (ep->addr.family == AF_INET) {
if (!dht->rt_ipv4) {
- //ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, (ks_dhtrt_nodeid_t));
+ //ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, &ep->nodeid);
}
} else {
if (!dht->rt_ipv6) {
- //ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool);
+ //ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, &ep->nodeid);
}
}
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
+ // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool
ks_dht_process(dht, dht->endpoints[i], &raddr);
}
}
*/
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
- // @todo lookup standard def for IPV6 max size
- char ip[48];
- ks_dht_endpoint_t *ep;
// @todo calculate max IPV6 payload size?
char buf[1000];
ks_size_t buf_len;
ks_assert(dht);
ks_assert(message);
+ ks_assert(message->endpoint);
ks_assert(message->data);
// @todo blacklist check
- ks_ip_route(ip, sizeof(ip), message->raddr.host);
-
- if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
- ks_sockaddr_t addr;
- ks_addr_set(&addr, ip, dht->autoroute_port, message->raddr.family);
- if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
- }
-
- if (!ep) {
- ks_log(KS_LOG_DEBUG, "No route available to %s\n", message->raddr.host);
- return KS_STATUS_FAIL;
- }
-
buf_len = ben_encode2(buf, sizeof(buf), message->data);
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
- return ks_socket_sendto(ep->sock, (void *)buf, &buf_len, &message->raddr);
+ return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr);
}
/**
ks_assert(transactionid);
ks_assert(errorstr);
+ if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
- ks_assert(ep);
ks_assert(raddr);
ks_assert(query);
ks_assert(callback);
ks_assert(message);
*message = NULL;
-
+
+ if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
// @todo atomic increment or mutex
transactionid = dht->transactionid_next++;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
- ks_assert(ep);
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(message);
*message = NULL;
-
+
+ if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
return KS_STATUS_FAIL;
}
- ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_FAIL;
}
- ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
- //ks_dht_send(dht, raddr, message);
return KS_STATUS_SUCCESS;
}
if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
goto done;
}
-
+
+ // @todo readlocking registry for calling from threadpool
if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
} else {
message->args = a;
+ // @todo readlocking registry for calling from threadpool
if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
} else {
ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n");
return KS_STATUS_FAIL;
}
- // todo end of ks_dht_message_parse_response
+ // @todo end of ks_dht_message_parse_response
message->args = r;
transaction->raddr.host,
transaction->raddr.port);
} else {
- // @todo mark transaction for later removal
transaction->finished = KS_TRUE;
ret = transaction->callback(dht, message);
}
memcpy(error, et, es_len);
error[es_len] = '\0';
- // todo end of ks_dht_message_parse_error
+ // @todo end of ks_dht_message_parse_error
message->args = e;
transaction->raddr.host,
transaction->raddr.port);
} else {
- // @todo mark transaction for later removal
ks_dht_message_callback_t callback;
transaction->finished = KS_TRUE;
+ // @todo readlock on registry
if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
ret = callback(dht, message);
} else {
&r) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
-
- //if (ks_dht_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
- // goto done;
- //}
- //if (ks_dht_message_init(response, message->endpoint, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
- // goto done;
- //}
-
- //if (ks_dht_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) {
- // goto done;
- //}
-
- ben_dict_set(r, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response);
ks_bool_t want6 = KS_FALSE;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
- uint8_t buffer[1000];
- ks_size_t buffer_length = 0;
+ uint8_t buffer4[1000];
+ uint8_t buffer6[1000];
+ ks_size_t buffer4_length = 0;
+ ks_size_t buffer6_length = 0;
ks_assert(dht);
ks_assert(message);
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
- // @todo get closest nodes to target from route table
- // @todo compact into buffer
- if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv, &message->raddr, buffer, &buffer_length, sizeof(buffer)) != KS_STATUS_SUCCESS) {
+ if (want4) {
+ // @todo get closest nodes to target from ipv4 route table
+ // @todo compact nodes into buffer4
+ }
+ if (want6) {
+ // @todo get closest nodes to target from ipv6 route table
+ // @todo compact nodes into buffer6
+ }
+
+ // @todo remove this, testing only
+ if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv,
+ &message->raddr,
+ message->raddr.family == AF_INET ? buffer4 : buffer6,
+ message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length,
+ message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
return KS_STATUS_FAIL;
}
- //if (ks_dht_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
- // goto done;
- //}
-
- //if (ks_dht_message_init(response, message->endpoint, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
- // goto done;
- //}
-
- //if (ks_dht_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) {
- // goto done;
- //}
-
- ben_dict_set(r, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
- // @todo populate nodes/nodes6
- ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer, buffer_length));
+ ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ if (want4) {
+ ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
+ }
+ if (want6) {
+ ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
+ }
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response);