*/
if (d->storageitems_hash) {
for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- ks_dht_storageitem_t *val;
-
- ks_hash_this_val(it, (void **)&val);
-
+ const void *key = NULL;
+ ks_dht_storageitem_t *val = NULL;
+ ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_storageitem_destroy(&val);
}
ks_hash_destroy(&d->storageitems_hash);
*/
if (d->searches6_hash) {
for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- ks_dht_search_t *val;
-
- ks_hash_this_val(it, (void **)&val);
+ const void *key = NULL;
+ ks_dht_search_t *val = NULL;
+ ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_search_destroy(&val);
}
ks_hash_destroy(&d->searches6_hash);
}
if (d->searches4_hash) {
for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
- ks_dht_search_t *val;
-
- ks_hash_this_val(it, (void **)&val);
+ const void *key = NULL;
+ ks_dht_search_t *val = NULL;
+ ks_hash_this(it, &key, NULL, (void **)&val);
ks_dht_search_destroy(&val);
}
ks_hash_destroy(&d->searches4_hash);
char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_log(KS_LOG_DEBUG,
"Search for %s pending find_node to %s has expired without response\n",
- ks_dht_hexid(&value->target, id_buf),
- ks_dht_hexid(&v->nodeid, id2_buf));
+ ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE));
v->finished = KS_TRUE;
continue;
}
else if (value->expiration <= now) {
// if the transaction expires, so does the attached job, but the job may try again with a new transaction
value->job->state = KS_DHT_JOB_STATE_EXPIRING;
- ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration);
+ ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) {
}
}
-KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer)
+KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len)
{
char *t = buffer;
- ks_assert(id);
+ ks_assert(data);
ks_assert(buffer);
- memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1);
+ memset(buffer, 0, len * 2 + 1);
- for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]);
+ for (int i = 0; i < len; ++i, t += 2) sprintf(t, "%02X", data[i]);
return buffer;
}
return KS_STATUS_SUCCESS;
}
-KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args,
- ks_bool_t optional,
- const char *key,
- ks_dht_storageitem_key_t **sikey)
+KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_pkey(struct bencode *args,
+ ks_bool_t optional,
+ const char *key,
+ ks_dht_storageitem_pkey_t **pkey)
{
struct bencode *k;
const char *kv;
ks_assert(args);
ks_assert(key);
- ks_assert(sikey);
+ ks_assert(pkey);
- *sikey = NULL;
+ *pkey = NULL;
k = ben_dict_get_by_str(args, key);
if (!k) {
kv = ben_str_val(k);
kv_len = ben_str_len(k);
- if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) {
+ if (kv_len != KS_DHT_STORAGEITEM_PKEY_SIZE) {
ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len);
return KS_STATUS_ARG_INVALID;
}
- *sikey = (ks_dht_storageitem_key_t *)kv;
+ *pkey = (ks_dht_storageitem_pkey_t *)kv;
done:
return ret;
return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
}
-KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target)
{
SHA_CTX sha;
- const uint8_t *v;
+ uint8_t *v = NULL;
size_t v_len;
+ ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(value);
ks_assert(target);
- v = (const uint8_t *)ben_str_val(value);
- v_len = ben_str_len(value);
-
+ v = ben_encode(&v_len, value);
if (!SHA1_Init(&sha) ||
!SHA1_Update(&sha, v, v_len) ||
- !SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+ !SHA1_Final(target->id, &sha)) {
+ ret = KS_STATUS_FAIL;
+ }
+ free(v);
- return KS_STATUS_SUCCESS;
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target)
+{
+ struct bencode *v = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(value);
+ ks_assert(value_length > 0);
+ ks_assert(target);
+
+ v = ben_blob(value, value_length);
+ ret = ks_dht_storageitem_target_immutable_internal(v, target);
+ ben_free(v);
+
+ return ret;
}
-KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target)
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target)
{
SHA_CTX sha;
+ //char buf1[KS_DHT_NODEID_SIZE * 2 + 1];
- ks_assert(k);
+ ks_assert(pk);
ks_assert(target);
if (!SHA1_Init(&sha) ||
- !SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL;
+ !SHA1_Update(&sha, pk->key, KS_DHT_STORAGEITEM_PKEY_SIZE)) return KS_STATUS_FAIL;
if (salt) {
const uint8_t *s = (const uint8_t *)ben_str_val(salt);
size_t s_len = ben_str_len(salt);
}
if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
+ //ks_log(KS_LOG_DEBUG, "Mutable ID: %s\n", ks_dht_hex(target->id, buf1, KS_DHT_NODEID_SIZE));
return KS_STATUS_SUCCESS;
}
+KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target)
+{
+ struct bencode *s = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(pk);
+ ks_assert(target);
+
+ if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
+ ret = ks_dht_storageitem_target_mutable_internal(pk, s, target);
+ if (s) ben_free(s);
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_encode(uint8_t **encoded,
+ ks_size_t *encoded_length,
+ struct bencode *salt,
+ struct bencode *seq,
+ struct bencode *v)
+{
+ char *enc = NULL;
+ char *salt_enc = NULL;
+ size_t salt_enc_length = 0;
+ char *seq_enc = NULL;
+ size_t seq_enc_length = 0;
+ char *v_enc = NULL;
+ size_t v_enc_length = 0;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(encoded);
+ ks_assert(encoded_length);
+ ks_assert(seq);
+ ks_assert(v);
+
+ if (salt) salt_enc = ben_encode(&salt_enc_length, salt);
+ seq_enc = ben_encode(&seq_enc_length, seq);
+ v_enc = ben_encode(&v_enc_length, v);
+
+ *encoded_length = (salt ? 6 : 0) + // 4:salt
+ salt_enc_length +
+ 5 + // 3:seq
+ seq_enc_length +
+ 3 + // 1:v
+ v_enc_length;
+ enc = malloc((*encoded_length) + 1);
+ *encoded = (uint8_t *)enc;
+ enc[0] = '\0';
+
+ if (salt) {
+ strncat(enc, "4:salt", 6);
+ strncat(enc, salt_enc, salt_enc_length);
+ }
+ strncat(enc, "3:seq", 5);
+ strncat(enc, seq_enc, seq_enc_length);
+ strncat(enc, "1:v", 3);
+ strncat(enc, v_enc, v_enc_length);
+
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate_internal(ks_dht_storageitem_signature_t *sig,
+ ks_dht_storageitem_skey_t *sk,
+ struct bencode *salt,
+ struct bencode *seq,
+ struct bencode *v)
+{
+ uint8_t *tmpsig = NULL;
+ size_t tmpsig_len = 0;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+ //char buf1[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1];
+
+ ks_assert(sig);
+ ks_assert(sk);
+ ks_assert(seq);
+ ks_assert(v);
+
+ if ((ret = ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v)) != KS_STATUS_SUCCESS) goto done;
+
+ if (crypto_sign_detached(sig->sig, NULL, tmpsig, tmpsig_len, sk->key) != 0) {
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+ //ks_log(KS_LOG_DEBUG, "Signed: %s\n", ks_dht_hex(sig->sig, buf1, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
+
+ done:
+ if (tmpsig) free(tmpsig);
+ return ret;
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig,
+ ks_dht_storageitem_skey_t *sk,
+ const uint8_t *salt,
+ ks_size_t salt_length,
+ int64_t sequence,
+ const uint8_t *value,
+ ks_size_t value_length)
+{
+ struct bencode *s = NULL;
+ struct bencode *seq = NULL;
+ struct bencode *v = NULL;
+ ks_status_t ret = KS_STATUS_SUCCESS;
+
+ ks_assert(sig);
+ ks_assert(sk);
+ ks_assert(sequence > 0);
+ ks_assert(value);
+ ks_assert(value_length > 0);
+
+ if (salt && salt_length > 0) s = ben_blob(salt, salt_length);
+ seq = ben_int(sequence);
+ v = ben_blob(value, value_length);
+
+ ret = ks_dht_storageitem_signature_generate_internal(sig, sk, s, seq, v);
+
+ if (s) ben_free(s);
+ ben_free(seq);
+ ben_free(v);
+
+ return ret;
+}
+
+KS_DECLARE(ks_bool_t) ks_dht_storageitem_signature_verify(ks_dht_storageitem_signature_t *sig,
+ ks_dht_storageitem_pkey_t *pk,
+ struct bencode *salt,
+ struct bencode *seq,
+ struct bencode *v)
+{
+ uint8_t *tmpsig = NULL;
+ size_t tmpsig_len = 0;
+ int32_t res = 0;
+
+ ks_assert(sig);
+ ks_assert(pk);
+ ks_assert(seq);
+ ks_assert(v);
+
+ if (ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v) != KS_STATUS_SUCCESS) return KS_FALSE;
+
+ res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, pk->key);
+
+ if (tmpsig) free(tmpsig);
+
+ return res == 0;
+}
+
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1];
memcpy(query, qv, qv_len);
query[qv_len] = '\0';
- ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
+ //ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
a = ben_dict_get_by_str(message->data, "a");
if (!a) {
if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
message->args_id = *id;
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
*id,
KS_DHT_REMOTE,
if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
message->args_id = *id;
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_create_node(message->endpoint->node->table,
*id,
KS_DHT_REMOTE,
&node)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
+ ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE));
if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done;
transaction->job->raddr.port);
} else {
transaction->job->response = message;
- transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
message->transaction = transaction;
if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
+ else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING;
+
transaction->job->response = NULL; // message is destroyed after we return, stop using it
transaction->finished = KS_TRUE;
}
return ret;
}
+KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht)
+{
+ ks_assert(dht);
+ ks_hash_read_lock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht)
+{
+ ks_assert(dht);
+ ks_hash_read_unlock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht)
+{
+ ks_assert(dht);
+ ks_hash_write_lock(dht->storageitems_hash);
+}
+
+KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht)
+{
+ ks_assert(dht);
+ ks_hash_write_lock(dht->storageitems_hash);
+}
+
+KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target)
+{
+ ks_assert(dht);
+ ks_assert(target);
+
+ return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
+}
+
+KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item)
+{
+ ks_assert(dht);
+ ks_assert(item);
+
+ return ks_hash_insert(dht->storageitems_hash, item->id.id, item);
+}
+
KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
{
+ ks_dht_job_t *first = NULL;
+ ks_dht_job_t *last = NULL;
+
ks_assert(dht);
ks_mutex_lock(dht->jobs_mutex);
for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) {
- ks_bool_t remove = KS_FALSE;
+ ks_bool_t done = KS_FALSE;
jobn = job->next;
- switch (job->state) {
- case KS_DHT_JOB_STATE_QUERYING:
+ if (job->state == KS_DHT_JOB_STATE_QUERYING) {
job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
- break;
- case KS_DHT_JOB_STATE_RESPONDING:
- break;
- case KS_DHT_JOB_STATE_EXPIRING:
+ }
+ if (job->state == KS_DHT_JOB_STATE_EXPIRING) {
job->attempts--;
if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING;
- else {
- if (job->finish_callback) job->finish_callback(dht, job);
- remove = KS_TRUE;
- }
- break;
- case KS_DHT_JOB_STATE_PROCESSING:
- break;
- case KS_DHT_JOB_STATE_COMPLETING:
- if (job->finish_callback) job->finish_callback(dht, job);
- remove = KS_TRUE;
- break;
- default: break;
+ else done = KS_TRUE;
}
+ if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE;
- if (remove) {
+ if (done) {
if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL;
else if (!jobp) dht->jobs_first = jobn;
else if (!jobn) dht->jobs_last = jobp;
else jobp->next = jobn;
- ks_dht_job_destroy(&job);
+
+ job->next = NULL;
+ if (last) last = last->next = job;
+ else first = last = job;
} else jobp = job;
}
ks_mutex_unlock(dht->jobs_mutex);
+
+ for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) {
+ jobn = job->next;
+ // this cannot occur inside of the main loop, may add new jobs invalidating list pointers
+ if (job->finish_callback) job->finish_callback(dht, job);
+ ks_dht_job_destroy(&job);
+ }
}
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback)
ks_assert(dht);
ks_assert(raddr);
+ //ks_log(KS_LOG_DEBUG, "Starting ping!\n");
+
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
ks_dht_job_build_ping(job, ks_dht_query_ping, callback);
ks_dht_jobs_add(dht, job);
&message,
NULL)) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
done:
ks_assert(message);
ks_assert(message->args);
- ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
+ //ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
if ((ret = ks_dht_response_setup(dht,
message->endpoint,
&response,
NULL)) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response);
done:
ks_assert(dht);
ks_assert(job);
- ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
-
- job->state = KS_DHT_JOB_STATE_COMPLETING;
+ //ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
// done:
return ret;
ben_dict_set(a, ben_blob("want", 4), want);
}
- ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
done:
want6 = message->raddr.family == AF_INET6;
}
- ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
+ //ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
query.nodeid = *target;
&buffer4_length,
sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+ ks_log(KS_LOG_DEBUG,
+ "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
&buffer6_length,
sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+ ks_log(KS_LOG_DEBUG,
+ "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
- ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response);
done:
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
- ks_dht_hexid(&nid, id_buf),
+ ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes[job->response_nodes_count++] = node;
ks_log(KS_LOG_DEBUG,
"Set closer node id %s (%s) in search of target id %s at results index %d\n",
- ks_dht_hexid(&nid, id_buf),
- ks_dht_hexid(&distance, id2_buf),
- ks_dht_hexid(&search->target, id3_buf),
+ ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
results_index);
search->results[results_index] = nid;
search->distances[results_index] = distance;
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
- ks_dht_hexid(&nid, id_buf),
+ ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
ks_log(KS_LOG_DEBUG,
"Set closer node id %s (%s) in search of target id %s at results index %d\n",
- ks_dht_hexid(&nid, id_buf),
- ks_dht_hexid(&distance, id2_buf),
- ks_dht_hexid(&search->target, id3_buf),
+ ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE),
+ ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE),
results_index);
search->results[results_index] = nid;
search->distances[results_index] = distance;
}
}
- ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
-
- job->state = KS_DHT_JOB_STATE_COMPLETING;
+ //ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
done:
if(search) ks_mutex_unlock(search->mutex);
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
- ks_log(KS_LOG_DEBUG, "Sending message query get\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq);
- ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
+ //ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
&buffer4_length,
sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+ ks_log(KS_LOG_DEBUG,
+ "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
&buffer6_length,
sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
+ ks_log(KS_LOG_DEBUG,
+ "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port);
}
ks_dhtrt_release_querynodes(&query);
}
if (item) {
if (item->mutable) {
if (!sequence_snuffed) {
- ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE));
+ ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE));
ben_dict_set(r, ben_blob("sig", 3), ben_blob(item->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
}
ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq));
if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length));
- ks_log(KS_LOG_DEBUG, "Sending message response get\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message response get\n");
ks_q_push(dht->send_q, (void *)response);
done:
{
ks_dht_storageitem_t *item = NULL;
ks_dht_token_t *token = NULL;
- ks_dht_storageitem_key_t *k = NULL;
+ ks_dht_storageitem_pkey_t *k = NULL;
ks_dht_storageitem_signature_t *sig = NULL;
struct bencode *seq;
int64_t sequence = -1;
if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
job->response_token = *token;
- if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_storageitem_pkey(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
seq = ben_dict_get_by_str(job->response->args, "seq");
nodes6_size = ben_str_len(n);
}
- ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
+ //ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
while (nodes_len < nodes_size) {
ks_dht_nodeid_t nid;
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
- ks_dht_hexid(&nid, id_buf),
+ ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes[job->response_nodes_count++] = node;
}
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
- ks_dht_hexid(&nid, id_buf),
+ ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE),
addr.host,
addr.port);
- ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
+ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
}
if (!seq) {
// immutable
- if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_storageitem_target_immutable_internal(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
- else if ((ret = ks_dht_storageitem_create_immutable(&item,
- dht->pool,
- &tmptarget,
- v)) != KS_STATUS_SUCCESS) goto done;
+ else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
+ dht->pool,
+ &tmptarget,
+ v,
+ KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
} else {
// mutable
- struct bencode *tmp = NULL;
- uint8_t *tmpsig = NULL;
- size_t tmpsig_len = 0;
- int32_t res = 0;
-
- if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
- ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
+ ks_log(KS_LOG_DEBUG, "Mutable data hash does not match requested target id\n");
ret = KS_STATUS_FAIL;
goto done;
}
- tmp = ben_dict();
- if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt));
- ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq));
- ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v));
- tmpsig = ben_encode(&tmpsig_len, tmp);
- ben_free(tmp);
-
- res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key);
-
- free(tmpsig);
-
- if (res) {
- ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n");
+ if (!ks_dht_storageitem_signature_verify(sig, k, job->query_salt, seq, v)) {
+ ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
ret = KS_STATUS_FAIL;
goto done;
}
-
+ ks_log(KS_LOG_DEBUG, "Signature verified for %s\n", ks_dht_hex(tmptarget.id, id_buf, KS_DHT_NODEID_SIZE));
if (olditem) {
- if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
- else {
- ks_hash_remove(dht->storageitems_hash, olditem->id.id);
- olditem = NULL;
+ if (olditem->seq > sequence) {
+ goto done;
}
+ if (olditem->seq == sequence) {
+ if (ben_cmp(olditem->v, v) != 0) {
+ goto done;
+ }
+ } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+ olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
}
- if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item,
- dht->pool,
- &tmptarget,
- v,
- k,
- job->query_salt,
- sequence,
- sig)) != KS_STATUS_SUCCESS) goto done;
+ else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
+ dht->pool,
+ &tmptarget,
+ v,
+ KS_TRUE,
+ k,
+ job->query_salt,
+ KS_TRUE,
+ sequence,
+ sig)) != KS_STATUS_SUCCESS) goto done;
}
if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
+ item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED);
} else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
if (item) job->response_storageitem = item;
else if (olditem) job->response_storageitem = olditem;
- job->state = KS_DHT_JOB_STATE_COMPLETING;
-
done:
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
return ret;
}
+// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers
+// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire
KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
- ks_dht_nodeid_t *target,
- uint8_t *salt,
- ks_size_t salt_length)
+ ks_dht_token_t *token,
+ int64_t cas,
+ ks_dht_storageitem_t *item)
{
ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
- ks_assert(target);
+ ks_assert(token);
+ ks_assert(item);
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
- ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length);
+ ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item);
ks_dht_jobs_add(dht, job);
done:
&message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
+ if (job->query_storageitem->mutable) {
+ if (job->query_cas > 0) ben_dict_set(a, ben_blob("cas", 3), ben_int(job->query_cas));
+ ben_dict_set(a, ben_blob("k", 1), ben_blob(job->query_storageitem->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE));
+ if (job->query_storageitem->salt) ben_dict_set(a, ben_blob("salt", 4), ben_clone(job->query_storageitem->salt));
+ ben_dict_set(a, ben_blob("seq", 3), ben_int(job->query_storageitem->seq));
+ ben_dict_set(a, ben_blob("sig", 3), ben_blob(job->query_storageitem->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
+ }
+ ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE));
+ ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v));
-
- ks_log(KS_LOG_DEBUG, "Sending message query put\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message query put\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
+ ks_dht_token_t *token = NULL;
+ ks_dht_storageitem_pkey_t *k = NULL;
+ ks_dht_storageitem_signature_t *sig = NULL;
+ struct bencode *salt = NULL;
+ struct bencode *seq = NULL;
+ int64_t sequence = -1;
+ struct bencode *cas = NULL;
+ int64_t cas_seq = -1;
+ struct bencode *v = NULL;
+ //ks_size_t v_len = 0;
+ ks_bool_t storageitems_locked = KS_FALSE;
+ ks_dht_storageitem_t *item = NULL;
+ ks_dht_storageitem_t *olditem = NULL;
+ ks_dht_nodeid_t target;
ks_dht_message_t *response = NULL;
struct bencode *r = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(message);
ks_assert(message->args);
- ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
+
+ if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
+
+ if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
+ if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
+
+ salt = ben_dict_get_by_str(message->args, "salt");
+
+ seq = ben_dict_get_by_str(message->args, "seq");
+ if (seq) sequence = ben_int_val(seq);
+
+ cas = ben_dict_get_by_str(message->args, "cas");
+ if (cas) cas_seq = ben_int_val(cas);
+
+ if (seq && (!k || !sig)) {
+ ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n");
+ ret = KS_STATUS_ARG_INVALID;
+ goto done;
+ }
+
+ v = ben_dict_get_by_str(message->args, "v");
+ if (!v) {
+ ks_log(KS_LOG_DEBUG, "Must provide v\n");
+ ret = KS_STATUS_ARG_INVALID;
+ goto done;
+ }
+ //v_len = ben_str_len(v);
+
+ if (!seq) {
+ // immutable
+ if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done;
+ } else {
+ // mutable
+ if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done;
+ }
+ olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED);
+
+ if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) {
+ ks_log(KS_LOG_DEBUG, "Invalid token\n");
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ //ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
+
+ ks_hash_write_lock(dht->storageitems_hash);
+ storageitems_locked = KS_TRUE;
+
+ if (!seq) {
+ // immutable
+ if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+ else if ((ret = ks_dht_storageitem_create_immutable_internal(&item,
+ dht->pool,
+ &target,
+ v,
+ KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
+ } else {
+ // mutable
+ if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) {
+ ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n");
+ ret = KS_STATUS_FAIL;
+ goto done;
+ }
+
+ if (olditem) {
+ if (cas && olditem->seq != cas_seq) {
+ // @todo send 301 error instead of the response
+ goto done;
+ }
+ if (olditem->seq > sequence) {
+ // @todo send 302 error instead of the response
+ goto done;
+ }
+ if (olditem->seq == sequence) {
+ if (ben_cmp(olditem->v, v) != 0) {
+ // @todo send 201? error instead of the response
+ goto done;
+ }
+ } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig);
+ olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
+ }
+ else if ((ret = ks_dht_storageitem_create_mutable_internal(&item,
+ dht->pool,
+ &target,
+ v,
+ KS_TRUE,
+ k,
+ salt,
+ KS_TRUE,
+ sequence,
+ sig)) != KS_STATUS_SUCCESS) goto done;
+ }
+ if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_response_setup(dht,
message->endpoint,
&response,
&r)) != KS_STATUS_SUCCESS) goto done;
- ks_log(KS_LOG_DEBUG, "Sending message response put\n");
+ //ks_log(KS_LOG_DEBUG, "Sending message response put\n");
ks_q_push(dht->send_q, (void *)response);
done:
+ if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
+ if (ret != KS_STATUS_SUCCESS) {
+ if (item) ks_dht_storageitem_destroy(&item);
+ }
return ret;
}
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
- job->state = KS_DHT_JOB_STATE_COMPLETING;
-
// done:
return ret;
}
#include <../dht/ks_dht-int.h>
#include <tap.h>
-#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
-#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
+ks_dht_storageitem_skey_t sk;
+ks_dht_storageitem_pkey_t pk;
-ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
+ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
- diag("dht_z_callback\n");
- ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
- ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
+ diag("dht2_put_callback\n");
+ return KS_STATUS_SUCCESS;
+}
+
+ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job)
+{
+ char buf[KS_DHT_TOKEN_SIZE * 2 + 1];
+ const char *v = "Hello World!";
+ size_t v_len = strlen(v);
+ ks_dht_storageitem_signature_t sig;
+ ks_dht_storageitem_t *mutable = NULL;
+
+ diag("dht2_get_token_callback %s\n", ks_dht_hex(job->response_token.token, buf, KS_DHT_TOKEN_SIZE));
+
+ ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+ // @todo check if exists
+ ks_dht_storageitem_create_mutable(&mutable, dht->pool, &job->query_target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+ mutable->sk = sk;
+ ks_dht_storageitems_insert(dht, mutable);
+
+ ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable);
return KS_STATUS_SUCCESS;
}
ks_sockaddr_t raddr1;
//ks_sockaddr_t raddr2;
//ks_sockaddr_t raddr3;
+ ks_dht_nodeid_t target;
+ //ks_dht_storageitem_t *immutable = NULL;
+ //ks_dht_storageitem_t *mutable = NULL;
+ //const char *v = "Hello World!";
+ //size_t v_len = strlen(v);
+ //ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b,
+ //0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d,
+ //0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22,
+ //0xe7, 0x54, 0x94, 0x1a, 0xc7, 0x84, 0x79, 0xd6, 0xc5, 0x4e, 0x1f, 0xaf, 0x60, 0x37, 0x88, 0x1d } };
+ //ks_dht_storageitem_pkey_t pk; //= { { 0x77, 0xff, 0x84, 0x90, 0x5a, 0x91, 0x93, 0x63, 0x67, 0xc0, 0x13, 0x60, 0x80, 0x31, 0x04, 0xf9,
+ //0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } };
+ //uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE];
+ //uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE];
+ //ks_dht_storageitem_signature_t sig;
+ //char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1];
+ //char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1];
+ //const char *test1vector = "3:seqi1e1:v12:Hello World!";
+ //const char *test1vector = "4:salt6:foobar3:seqi1e1:v12:Hello World!";
+ //size_t test1vector_len = strlen(test1vector);
+ //uint8_t test1vector_sig[KS_DHT_STORAGEITEM_SIGNATURE_SIZE];
+ //char test1vector_buf[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1];
err = ks_init();
ok(!err);
err = ks_dht_create(&dht3, NULL, NULL);
ok(err == KS_STATUS_SUCCESS);
-
- ks_dht_register_type(dht1, "z", dht_z_callback);
-
if (have_v4) {
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
ok(err == KS_STATUS_SUCCESS);
ok(err == KS_STATUS_SUCCESS);
}
- //diag("Custom type tests\n");
-
- //buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
- //memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
- //dht1->recv_buffer_length = buflen;
-
- //err = ks_dht_process(dht1, ep1, &raddr);
- //ok(err == KS_STATUS_SUCCESS);
-
- //ks_dht_pulse(dht1, 100);
-
- //ks_dht_pulse(&dht2, 100);
-
-
- //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
- //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);
- //dht1->recv_buffer_length = buflen;
-
- //err = ks_dht_process(dht1, &raddr);
- //ok(err == KS_STATUS_SUCCESS);
-
+ /*
diag("Ping test\n");
- //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
- ks_dht_pulse(dht2, 100); // (COMPLETING)
+ ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) {
ks_dht_pulse(dht2, 100);
}
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+ */
- // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
+ //diag("Get test\n");
+
+
+ /*
+ ks_dht_storageitem_target_immutable((uint8_t *)v, v_len, &target);
+ ks_dht_storageitem_create_immutable(&immutable, dht1->pool, &target, (uint8_t *)v, v_len);
+ ks_dht_storageitems_insert(dht1, immutable);
+ */
+
+ /*
+ crypto_sign_keypair(pk.key, sk.key);
- diag("Find_Node test\n");
+ ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len);
+ ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
+ ks_dht_storageitem_create_mutable(&mutable, dht1->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig);
+ mutable->sk = sk;
+ ks_dht_storageitems_insert(dht1, mutable);
- ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
+ ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0);
+
+ ks_dht_pulse(dht2, 100); // send get query
- ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+ ks_dht_pulse(dht1, 100); // receive get query and send get response
- ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+ ks_dht_pulse(dht2, 100); // receive get response
- ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+ ok(ks_dht_storageitems_find(dht2, &target) != NULL); // item should be verified and stored
- ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+ ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING)
+ */
- ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+ diag("Put test\n");
+
+ crypto_sign_keypair(pk.key, sk.key);
+
+ ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target);
+
+ ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job
- diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ ks_dht_pulse(dht2, 100); // send get query
+
+ ks_dht_pulse(dht1, 100); // receive get query and send get response
+
+ ks_dht_pulse(dht2, 100); // receive get response
+
+ ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query
+
+ ks_dht_pulse(dht1, 100); // receive put query and send put response
+
+ ks_dht_pulse(dht2, 100); // receive put response
+
+ ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING)
+
for (int i = 0; i < 10; ++i) {
//diag("DHT 1\n");
ks_dht_pulse(dht1, 100);
//diag("DHT 2\n");
ks_dht_pulse(dht2, 100);
}
- ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
+
+ // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid
+
+ //diag("Find_Node test\n");
+
+ //ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
+
+ //ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1
+
+ //ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response
+
+ //ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+ //ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1
+
+ //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
+
+ //diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
+ //for (int i = 0; i < 10; ++i) {
+ //diag("DHT 1\n");
+ //ks_dht_pulse(dht1, 100);
+ //diag("DHT 2\n");
+ //ks_dht_pulse(dht2, 100);
+ //}
+ //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good
/* Cleanup and shutdown */