if (ks_dht2_nodeid_init(&dht->nodeid, nodeid) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
+
+ ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+ // @todo ks_hash_insert the q/r/e callbacks into y registry
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
ks_assert(dht);
dht->recv_buffer_length = 0;
- // @todo dht->endpoints_poll deinit
- // @todo dht->endpoints deinit
+ for (int32_t i = 0; i < dht->endpoints_size; ++i) {
+ ks_dht2_endpoint_t *ep = dht->endpoints[i];
+ //ks_hash_remove(dht->endpoints_hash, ep->addr.host);
+ ks_dht2_endpoint_deinit(ep);
+ ks_dht2_endpoint_free(ep);
+ }
+ if (dht->endpoints) {
+ ks_pool_free(dht->pool, dht->endpoints);
+ dht->endpoints = NULL;
+ }
+ if (dht->endpoints_poll) {
+ ks_pool_free(dht->pool, dht->endpoints_poll);
+ dht->endpoints_poll = NULL;
+ }
ks_hash_destroy(&dht->endpoints_hash);
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
+
+ ks_hash_destroy(&dht->registry_y);
+
ks_dht2_nodeid_deinit(&dht->nodeid);
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
+{
+ ks_assert(dht);
+ ks_assert(value);
+ ks_assert(callback);
+
+ return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+}
+
/**
*
*/
*/
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
{
- struct bencode *message;
- struct bencode *t;
- struct bencode *y;
- const char *tv;
- const char *yv;
- ks_size_t tv_len;
- ks_size_t yv_len;
- uint16_t transactionid;
- char messagetype;
+ struct bencode *message = NULL;
+ uint8_t transactionid[KS_DHT_TRANSACTIONID_MAX_SIZE];
+ ks_size_t transactionid_len;
+ char messagetype[KS_DHT_MESSAGETYPE_MAX_SIZE];
+ ks_dht2_registry_callback_t callback;
+ ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
// @todo blacklist check for bad actor nodes
- message = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
- if (!message) {
- ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
+ if (ks_dht2_parse(dht, &message, transactionid, &transactionid_len, messagetype) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
+
+ if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, messagetype, KS_UNLOCKED))) {
+ ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", messagetype);
+ } else {
+ ret = callback(dht, raddr, transactionid, transactionid_len, message);
+ }
+
+ ben_free(message);
+ return ret;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht,
+ struct bencode **message,
+ uint8_t *transactionid,
+ ks_size_t *transactionid_len,
+ char *messagetype)
+{
+ struct bencode *msg = NULL;
+ struct bencode *t;
+ struct bencode *y;
+ const char *tv;
+ const char *yv;
+ ks_size_t tv_len;
+ ks_size_t yv_len;
+
+ ks_assert(dht);
+ ks_assert(message);
+ ks_assert(transactionid);
+ ks_assert(messagetype);
+
+ msg = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
+ if (!msg) {
+ ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
+ goto failure;
+ }
ks_log(KS_LOG_DEBUG, "Message decoded\n");
- ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message));
+ ks_log(KS_LOG_DEBUG, "%s\n", ben_print(msg));
- t = ben_dict_get_by_str(message, "t");
+ t = ben_dict_get_by_str(msg, "t");
if (!t) {
ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n");
- return KS_STATUS_FAIL;
+ goto failure;
}
tv = ben_str_val(t);
tv_len = ben_str_len(t);
- if (tv_len != sizeof(uint16_t)) {
- ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpected size of %d\n", tv_len);
- return KS_STATUS_FAIL;
+ if (tv_len > KS_DHT_TRANSACTIONID_MAX_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len);
+ goto failure;
}
- transactionid = ntohs(*((uint16_t *)tv));
- ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", transactionid);
+ memcpy(transactionid, tv, tv_len);
+ *transactionid_len = tv_len;
+ // @todo hex output of transactionid
+ //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid);
- y = ben_dict_get_by_str(message, "y");
+ y = ben_dict_get_by_str(msg, "y");
if (!y) {
ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n");
- return KS_STATUS_FAIL;
+ goto failure;
}
yv = ben_str_val(y);
yv_len = ben_str_len(y);
- if (yv_len != 1) {
- ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpected size of %d\n", yv_len);
- return KS_STATUS_FAIL;
+ if (yv_len >= KS_DHT_MESSAGETYPE_MAX_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len);
+ goto failure;
}
- messagetype = (char)yv[0];
- ks_log(KS_LOG_DEBUG, "Message type is '%c'\n", messagetype);
-
- // @todo dispatch callback from the 'y' registry
+ memcpy(messagetype, yv, yv_len);
+ messagetype[yv_len] = '\0';
+ ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", messagetype);
+ *message = msg;
return KS_STATUS_SUCCESS;
+
+ failure:
+ if (msg) {
+ ben_free(msg);
+ }
+ *message = NULL;
+ *transactionid_len = 0;
+ messagetype[0] = '\0';
+ return KS_STATUS_FAIL;
}
-
/* For Emacs:
* Local Variables:
* mode:c
#include <../dht/ks_dht_endpoint-int.h>
#include <tap.h>
+#define TEST_DHT1_REGISTER_Y_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
#define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
+ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_len, struct bencode *message)
+{
+ diag("dht_z_callback\n");
+ ok(transactionid[0] == '4' && transactionid[1] == '2');
+ return KS_STATUS_SUCCESS;
+}
+
int main() {
- ks_size_t buflen = strlen(TEST_DHT1_PROCESS_BUFFER);
+ ks_size_t buflen;
ks_status_t err;
int mask = 0;
ks_dht2_t *dht1 = NULL;
ok(!err);
ks_global_set_default_logger(7);
-
+
err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL);
ok(err == KS_STATUS_SUCCESS);
have_v4 = !zstr_buf(v4);
err = ks_dht2_init(&dht2, NULL);
ok(err == KS_STATUS_SUCCESS);
+
+ ks_dht2_register_y(dht1, "z", dht_z_callback);
if (have_v4) {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
ok(err == KS_STATUS_SUCCESS);
}
- // @todo populate dht1->recv_buffer and dht1->recv_buffer_length
- memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_BUFFER, buflen);
+ buflen = strlen(TEST_DHT1_REGISTER_Y_BUFFER);
+ memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_Y_BUFFER, buflen);
dht1->recv_buffer_length = buflen;
err = ks_dht2_process(dht1, &raddr);