d->transactions_hash = NULL;
d->rt_ipv4 = NULL;
d->rt_ipv6 = NULL;
+ d->token_secret_current = 0;
+ d->token_secret_previous = 0;
+ d->token_secret_expiration = 0;
+ d->storage_hash = NULL;
return KS_STATUS_SUCCESS;
}
dht->transactions_hash = NULL;
dht->rt_ipv4 = NULL;
dht->rt_ipv6 = NULL;
+ dht->token_secret_current = 0;
+ dht->token_secret_previous = 0;
+ dht->token_secret_expiration = 0;
+ dht->storage_hash = NULL;
return KS_STATUS_SUCCESS;
}
ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht_register_query(dht, "ping", ks_dht_process_query_ping);
ks_dht_register_query(dht, "find_node", ks_dht_process_query_findnode);
+ ks_dht_register_query(dht, "get", ks_dht_process_query_get);
+ ks_dht_register_query(dht, "put", ks_dht_process_query_put);
ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
dht->rt_ipv4 = NULL;
dht->rt_ipv6 = NULL;
+
+ dht->token_secret_current = dht->token_secret_previous = rand();
+ dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+
+ ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+ ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE);
return KS_STATUS_SUCCESS;
}
{
ks_assert(dht);
+ // @todo free storage_hash entries
+ if (dht->storage_hash) {
+ ks_hash_destroy(&dht->storage_hash);
+ dht->storage_hash = NULL;
+ }
+ dht->token_secret_current = 0;
+ dht->token_secret_previous = 0;
+ dht->token_secret_expiration = 0;
if (dht->rt_ipv4) {
ks_dhtrt_deinitroute(dht->rt_ipv4);
dht->rt_ipv4 = NULL;
return ks_dht_utility_compact_address(address, buffer, buffer_length, buffer_size);
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, const char *key, ks_dht_nodeid_t **nodeid)
+{
+ struct bencode *id;
+ const char *idv;
+ ks_size_t idv_len;
+
+ ks_assert(args);
+ ks_assert(key);
+ ks_assert(nodeid);
+
+ *nodeid = NULL;
+
+ id = ben_dict_get_by_str(args, key);
+ if (!id) {
+ ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
+ return KS_STATUS_FAIL;
+ }
+
+ idv = ben_str_val(id);
+ idv_len = ben_str_len(id);
+ if (idv_len != KS_DHT_NODEID_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, idv_len);
+ return KS_STATUS_FAIL;
+ }
+
+ *nodeid = (ks_dht_nodeid_t *)idv;
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const char *key, ks_dht_token_t **token)
+{
+ struct bencode *tok;
+ const char *tokv;
+ ks_size_t tokv_len;
+
+ ks_assert(args);
+ ks_assert(key);
+ ks_assert(token);
+
+ *token = NULL;
+
+ tok = ben_dict_get_by_str(args, key);
+ if (!tok) {
+ ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
+ return KS_STATUS_FAIL;
+ }
+
+ tokv = ben_str_val(tok);
+ tokv_len = ben_str_len(tok);
+ if (tokv_len != KS_DHT_TOKEN_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, tokv_len);
+ return KS_STATUS_FAIL;
+ }
+
+ *token = (ks_dht_token_t *)tokv;
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
+{
+ SHA_CTX sha;
+ uint16_t port = 0;
+
+ ks_assert(raddr);
+ ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
+ ks_assert(target);
+ ks_assert(token);
+
+ secret = htonl(secret);
+ port = htons(raddr->port);
+
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, &secret, sizeof(uint32_t));
+ SHA1_Update(&sha, raddr->host, strlen(raddr->host));
+ SHA1_Update(&sha, &port, sizeof(uint16_t));
+ SHA1_Update(&sha, target->id, KS_DHT_NODEID_SIZE);
+ SHA1_Final(token->token, &sha);
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
+{
+ ks_dht_token_t tok;
+
+ ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok);
+
+ if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) {
+ return KS_TRUE;
+ }
+
+ ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok);
+
+ return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
+}
+
/**
*
*/
}
}
ks_hash_write_unlock(dht->transactions_hash);
+
+ if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
+ dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION;
+ dht->token_secret_previous = dht->token_secret_current;
+ dht->token_secret_current = rand();
+ }
}
/**
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid)
+{
+ ks_dht_message_t *message = NULL;
+ struct bencode *a = NULL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(targetid);
+
+ if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
+ ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE));
+
+ ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+ ks_q_push(dht->send_q, (void *)message);
+
+ return KS_STATUS_SUCCESS;
+}
+
/**
*
*/
*/
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
{
- struct bencode *id;
- //const char *idv;
- ks_size_t idv_len;
+ ks_dht_nodeid_t *id;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
ks_assert(message);
ks_assert(message->args);
- id = ben_dict_get_by_str(message->args, "id");
- if (!id) {
- ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n");
- return KS_STATUS_FAIL;
- }
-
- //idv = ben_str_val(id);
- idv_len = ben_str_len(id);
- if (idv_len != KS_DHT_NODEID_SIZE) {
- ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len);
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
*/
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
{
- struct bencode *id;
- struct bencode *target;
+ ks_dht_nodeid_t *id;
+ ks_dht_nodeid_t *target;
struct bencode *want;
- const char *idv;
- //const char *targetv;
- ks_size_t idv_len;
- ks_size_t targetv_len;
ks_bool_t want4 = KS_FALSE;
ks_bool_t want6 = KS_FALSE;
ks_dht_message_t *response = NULL;
ks_assert(message);
ks_assert(message->args);
-
- id = ben_dict_get_by_str(message->args, "id");
- if (!id) {
- ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n");
- return KS_STATUS_FAIL;
- }
-
- idv = ben_str_val(id);
- idv_len = ben_str_len(id);
- if (idv_len != KS_DHT_NODEID_SIZE) {
- ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len);
- return KS_STATUS_FAIL;
- }
-
-
- target = ben_dict_get_by_str(message->args, "target");
- if (!target) {
- ks_log(KS_LOG_DEBUG, "Message args missing required key 'target'\n");
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
- //targetv = ben_str_val(target);
- targetv_len = ben_str_len(target);
- if (targetv_len != KS_DHT_NODEID_SIZE) {
- ks_log(KS_LOG_DEBUG, "Message args 'target' value has an unexpected size of %d\n", targetv_len);
+ if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
-
want = ben_dict_get_by_str(message->args, "want");
if (want) {
size_t want_len = ben_list_len(want);
}
// @todo remove this, testing only
- if (ks_dht_utility_compact_node((ks_dht_nodeid_t *)idv,
+ if (ks_dht_utility_compact_node(id,
&message->raddr,
message->raddr.family == AF_INET ? buffer4 : buffer6,
message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length,
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message)
+{
+ ks_dht_nodeid_t *id;
+ ks_dht_nodeid_t *target;
+ struct bencode *seq;
+ int64_t sequence = -1;
+ ks_bool_t sequence_snuffed = KS_FALSE;
+ ks_dht_token_t token;
+ ks_dht_storageitem_t *item = NULL;
+ ks_dht_message_t *response = NULL;
+ struct bencode *r = NULL;
+
+ ks_assert(dht);
+ ks_assert(message);
+ ks_assert(message->args);
+
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ seq = ben_dict_get_by_str(message->args, "seq");
+ if (seq) {
+ sequence = ben_int_val(seq);
+ }
+
+ // @todo add/touch bucket entry for remote node
+
+ ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
+
+ ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
+
+ item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED);
+ ks_hash_read_unlock(dht->storage_hash);
+
+ sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
+ // @todo if sequence is provided then requester has the data so if the local sequence is lower, maybe create job to update local data from the requester?
+
+ // @todo find closest ipv4 and ipv6 nodes to target
+
+ // @todo compact ipv4 and ipv6 nodes into separate buffers
+
+ if (ks_dht_setup_response(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ &response,
+ &r) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+ ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE));
+ if (item) {
+ if (item->mutable) {
+ if (!sequence_snuffed) {
+ ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE));
+ ben_dict_set(r, ben_blob("sig", 3), ben_blob(item->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
+ }
+ ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
+ }
+ if (!sequence_snuffed) {
+ ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v));
+ }
+ }
+ // @todo nodes, nodes6
+
+ ks_log(KS_LOG_DEBUG, "Sending message response get\n");
+ ks_q_push(dht->send_q, (void *)response);
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
+{
+ ks_dht_message_t *response = NULL;
+ struct bencode *r = NULL;
+
+ ks_assert(dht);
+ ks_assert(message);
+ ks_assert(message->args);
+
+ // @todo add/touch bucket entry for remote node
+
+ ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
+
+ if (ks_dht_setup_response(dht,
+ message->endpoint,
+ &message->raddr,
+ message->transactionid,
+ message->transactionid_length,
+ &response,
+ &r) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
+
+ ks_log(KS_LOG_DEBUG, "Sending message response put\n");
+ ks_q_push(dht->send_q, (void *)response);
+
+ return KS_STATUS_SUCCESS;
+}
+
+
/**
*
*/
return KS_STATUS_SUCCESS;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
+{
+ ks_dht_nodeid_t *id;
+ ks_dht_token_t *token;
+
+ ks_assert(dht);
+ ks_assert(message);
+
+ // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
+ if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ // @todo add extract function for mutable ks_dht_storageitem_key_t
+ // @todo add extract function for mutable ks_dht_storageitem_signature_t
+
+ // @todo add/touch bucket entry for remote node and other nodes returned
+
+ ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+
+ return KS_STATUS_SUCCESS;
+}
+
/* For Emacs:
* Local Variables:
* mode:c
--- /dev/null
+#include "ks_dht.h"
+#include "ks_dht-int.h"
+#include "sodium.h"
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_alloc(ks_dht_storageitem_t **item, ks_pool_t *pool)
+{
+ ks_dht_storageitem_t *si;
+
+ ks_assert(item);
+ ks_assert(pool);
+
+ *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
+ si->pool = pool;
+ si->v = NULL;
+ si->mutable = KS_FALSE;
+ si->salt_length = 0;
+ si->seq = 0;
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_prealloc(ks_dht_storageitem_t *item, ks_pool_t *pool)
+{
+ ks_assert(item);
+ ks_assert(pool);
+
+ item->pool = pool;
+ item->v = NULL;
+ item->mutable = KS_FALSE;
+ item->salt_length = 0;
+ item->seq = 0;
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_free(ks_dht_storageitem_t *item)
+{
+ ks_assert(item);
+
+ ks_dht_storageitem_deinit(item);
+ ks_pool_free(item->pool, item);
+
+ return KS_STATUS_SUCCESS;
+}
+
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_init(ks_dht_storageitem_t *item, struct bencode *v)
+{
+ ks_assert(item);
+ ks_assert(item->pool);
+ ks_assert(v);
+ ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
+
+ item->v = ben_clone(v);
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_deinit(ks_dht_storageitem_t *item)
+{
+ ks_assert(item);
+
+ if (item->v) {
+ ben_free(item->v);
+ item->v = NULL;
+ }
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_create(ks_dht_storageitem_t *item, ks_bool_t mutable)
+{
+ SHA_CTX sha;
+
+ ks_assert(item);
+ ks_assert(item->pool);
+ ks_assert(item->v);
+
+ item->mutable = mutable;
+
+ if (!mutable) {
+ size_t enc_len = 0;
+ uint8_t *enc = ben_encode(&enc_len, item->v);
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, enc, enc_len);
+ SHA1_Final(item->id.id, &sha);
+ free(enc);
+ } else {
+ size_t enc_len = 0;
+ uint8_t *enc = NULL;
+ struct bencode *sig = ben_dict();
+
+ crypto_sign_keypair(item->pk.key, item->sk.key);
+ randombytes_buf(item->salt, KS_DHT_STORAGEITEM_SALT_MAX_SIZE);
+ item->salt_length = KS_DHT_STORAGEITEM_SALT_MAX_SIZE;
+ item->seq = 1;
+
+ ben_dict_set(sig, ben_blob("salt", 4), ben_blob(item->salt, item->salt_length));
+ ben_dict_set(sig, ben_blob("seq", 3), ben_int(item->seq));
+ ben_dict_set(sig, ben_blob("v", 1), ben_clone(item->v));
+ enc = ben_encode(&enc_len, sig);
+ ben_free(sig);
+
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, enc, enc_len);
+ SHA1_Final(item->sig.sig, &sha);
+
+ free(enc);
+
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
+ SHA1_Update(&sha, item->salt, item->salt_length);
+ SHA1_Final(item->id.id, &sha);
+ }
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_immutable(ks_dht_storageitem_t *item)
+{
+ SHA_CTX sha;
+ size_t enc_len = 0;
+ uint8_t *enc = NULL;
+
+ ks_assert(item);
+ ks_assert(item->v);
+
+ item->mutable = KS_FALSE;
+
+ enc = ben_encode(&enc_len, item->v);
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, enc, enc_len);
+ SHA1_Final(item->id.id, &sha);
+ free(enc);
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item,
+ ks_dht_storageitem_key_t *k,
+ uint8_t *salt,
+ ks_size_t salt_length,
+ int64_t sequence,
+ ks_dht_storageitem_signature_t *signature)
+{
+ SHA_CTX sha;
+
+ ks_assert(item);
+ ks_assert(item->v);
+ ks_assert(k);
+ ks_assert(!(!salt && salt_length > 0));
+ ks_assert(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
+ ks_assert(signature);
+
+ item->mutable = KS_TRUE;
+
+ memcpy(item->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE);
+ if (salt && salt_length > 0) {
+ memcpy(item->salt, salt, salt_length);
+ item->salt_length = salt_length;
+ }
+ item->seq = sequence;
+ memcpy(item->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
+
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
+ if (item->salt && item->salt_length > 0) {
+ SHA1_Update(&sha, item->salt, item->salt_length);
+ }
+ SHA1_Final(item->id.id, &sha);
+
+ return KS_STATUS_SUCCESS;
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */