*/
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
{
+ struct bencode *message;
+ struct bencode *t;
+ struct bencode *y;
+ const char *tv;
+ const char *yv;
+ ks_size_t tv_len;
+ ks_size_t yv_len;
+ uint16_t transactionid;
+ char messagetype;
+
ks_assert(dht);
ks_assert(raddr);
+ ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", raddr->host, raddr->port);
+ if (raddr->family != AF_INET && raddr->family != AF_INET6) {
+ ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n");
+ return KS_STATUS_FAIL;
+ }
+
+ // @todo blacklist check for bad actor nodes
+
+ message = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
+ if (!message) {
+ ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
+ return KS_STATUS_FAIL;
+ }
+
+ ks_log(KS_LOG_DEBUG, "Message decoded\n");
+ ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message));
+
+ t = ben_dict_get_by_str(message, "t");
+ if (!t) {
+ ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n");
+ return KS_STATUS_FAIL;
+ }
+
+ tv = ben_str_val(t);
+ tv_len = ben_str_len(t);
+ if (tv_len != sizeof(uint16_t)) {
+ ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpected size of %d\n", tv_len);
+ return KS_STATUS_FAIL;
+ }
+
+ transactionid = ntohs(*((uint16_t *)tv));
+ ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", transactionid);
+
+ y = ben_dict_get_by_str(message, "y");
+ if (!y) {
+ ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n");
+ return KS_STATUS_FAIL;
+ }
+
+ yv = ben_str_val(y);
+ yv_len = ben_str_len(y);
+ if (yv_len != 1) {
+ ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpected size of %d\n", yv_len);
+ return KS_STATUS_FAIL;
+ }
+
+ messagetype = (char)yv[0];
+ ks_log(KS_LOG_DEBUG, "Message type is '%c'\n", messagetype);
+
+ // @todo dispatch callback from the 'y' registry
+
return KS_STATUS_SUCCESS;
}
#include <ks.h>
#include <../dht/ks_dht.h>
+#include <../dht/ks_dht-int.h>
+#include <../dht/ks_dht_endpoint-int.h>
#include <tap.h>
+#define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
+
int main() {
+ ks_size_t buflen = strlen(TEST_DHT1_PROCESS_BUFFER);
ks_status_t err;
int mask = 0;
ks_dht2_t *dht1 = NULL;
ks_bool_t have_v4, have_v6;
char v4[48] = {0}, v6[48] = {0};
ks_sockaddr_t addr;
+ ks_sockaddr_t raddr;
err = ks_init();
ok(!err);
+ ks_global_set_default_logger(7);
+
err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL);
ok(err == KS_STATUS_SUCCESS);
have_v4 = !zstr_buf(v4);
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET);
ok(err == KS_STATUS_SUCCESS);
-
+
err = ks_dht2_bind(&dht2, &addr);
ok(err == KS_STATUS_SUCCESS);
+
+ raddr = addr;
}
if (have_v6) {
ok(err == KS_STATUS_SUCCESS);
}
+ // @todo populate dht1->recv_buffer and dht1->recv_buffer_length
+ memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_BUFFER, buflen);
+ dht1->recv_buffer_length = buflen;
-
+ err = ks_dht2_process(dht1, &raddr);
+ ok(err == KS_STATUS_SUCCESS);
/* Cleanup and shutdown */