ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht2_register_type(dht, "q", ks_dht2_process_query);
ks_dht2_register_type(dht, "r", ks_dht2_process_response);
- // @todo ks_hash_insert the r/e callbacks into type registry
+ ks_dht2_register_type(dht, "e", ks_dht2_process_error);
ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping);
+
+ ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+ // @todo register 301 error for internal get/put CAS hash mismatch retry handler
dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE;
dht->recv_buffer_length = 0;
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
ks_dht2_endpoint_t *ep = dht->endpoints[i];
- //ks_hash_remove(dht->endpoints_hash, ep->addr.host);
ks_dht2_endpoint_deinit(ep);
ks_dht2_endpoint_free(ep);
}
ks_hash_destroy(&dht->registry_query);
dht->registry_query = NULL;
}
+ if (dht->registry_error) {
+ ks_hash_destroy(&dht->registry_error);
+ dht->registry_error = NULL;
+ }
ks_dht2_nodeid_deinit(&dht->nodeid);
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_register_error(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback)
+{
+ ks_assert(dht);
+ ks_assert(value);
+ ks_assert(callback);
+
+ return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+}
+
/**
*
*/
}
}
+ ks_dht2_idle(dht);
+
return KS_STATUS_SUCCESS;
}
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
{
- // @todo lookup standard def for IPV6 max size
- char ip[48];
- ks_dht2_endpoint_t *ep;
- // @todo calculate max IPV6 payload size?
- char buf[1000];
- ks_size_t buf_len;
-
ks_assert(dht);
- ks_assert(raddr);
- ks_assert(message);
- ks_assert(message->data);
-
- // @todo blacklist check
-
- ks_ip_route(ip, sizeof(ip), raddr->host);
-
- if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
- ks_sockaddr_t addr;
- ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
- if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
- return KS_STATUS_FAIL;
- }
- }
-
- if (!ep) {
- ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
- return KS_STATUS_FAIL;
- }
-
- buf_len = ben_encode2(buf, sizeof(buf), message->data);
-
- ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port);
- ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
-
- if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) {
- ks_log(KS_LOG_DEBUG, "Socket error\n");
- return KS_STATUS_FAIL;
- }
return KS_STATUS_SUCCESS;
}
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
+KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
{
ks_assert(dht);
+ if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
return KS_STATUS_SUCCESS;
}
/**
*
*/
-KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
+KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht)
{
+ ks_hash_iterator_t *it = NULL;
+ ks_time_t now = ks_time_now_sec();
+
ks_assert(dht);
+ // @todo add delay between checking expirations, every 10 seconds?
+
+ ks_hash_write_lock(dht->transactions_hash);
+ for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+ const void *key = NULL;
+ ks_dht2_transaction_t *value = NULL;
+ ks_bool_t remove = KS_FALSE;
+
+ ks_hash_this(it, &key, NULL, (void **)&value);
+ if (value->finished) {
+ remove = KS_TRUE;
+ } else if (value->expiration <= now) {
+ ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+ remove = KS_TRUE;
+ }
+ if (remove) {
+ ks_hash_remove(dht->transactions_hash, (char *)key);
+ ks_pool_free(value->pool, value);
+ }
+ }
+ ks_hash_write_unlock(dht->transactions_hash);
+
return KS_STATUS_SUCCESS;
}
return ret;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+ // @todo lookup standard def for IPV6 max size
+ char ip[48];
+ ks_dht2_endpoint_t *ep;
+ // @todo calculate max IPV6 payload size?
+ char buf[1000];
+ ks_size_t buf_len;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(message);
+ ks_assert(message->data);
+
+ // @todo blacklist check
+
+ ks_ip_route(ip, sizeof(ip), raddr->host);
+
+ if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+ ks_sockaddr_t addr;
+ ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
+ if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+ }
+
+ if (!ep) {
+ ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
+ return KS_STATUS_FAIL;
+ }
+
+ buf_len = ben_encode2(buf, sizeof(buf), message->data);
+
+ ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port);
+ ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
+
+ if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) {
+ ks_log(KS_LOG_DEBUG, "Socket error\n");
+ return KS_STATUS_FAIL;
+ }
+
+ return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
+ ks_sockaddr_t *raddr,
+ uint8_t *transactionid,
+ ks_size_t transactionid_length,
+ long long errorcode,
+ const char *errorstr)
+{
+ ks_dht2_message_t error;
+ struct bencode *e;
+ ks_status_t ret = KS_STATUS_FAIL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(transactionid);
+ ks_assert(errorstr);
+
+ if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) {
+ return KS_STATUS_FAIL;
+ }
+
+ if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
+ goto done;
+ }
+
+ // @note e joins response.data and will be freed with it
+ ben_list_append(e, ben_int(errorcode));
+ ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
+
+ ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
+ ret = ks_dht2_send(dht, raddr, &error);
+
+ done:
+ ks_dht2_message_deinit(&error);
+ return ret;
+}
+
/**
*
*/
if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
+ } else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
+ ks_log(KS_LOG_DEBUG,
+ "Message response rejected due to spoofing from %s %d, expected %s %d\n",
+ raddr->host,
+ raddr->port,
+ transaction->raddr.host,
+ transaction->raddr.port);
} else {
+ // @todo mark transaction for later removal
+ transaction->finished = KS_TRUE;
ret = transaction->callback(dht, raddr, message);
}
return ret;
}
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+ struct bencode *e;
+ struct bencode *ec;
+ struct bencode *es;
+ const char *et;
+ ks_size_t es_len;
+ long long errorcode;
+ char error[KS_DHT_MESSAGE_ERROR_MAX_SIZE];
+ ks_dht2_transaction_t *transaction;
+ uint32_t *tid;
+ uint32_t transactionid;
+ ks_status_t ret = KS_STATUS_FAIL;
+
+ ks_assert(dht);
+ ks_assert(raddr);
+ ks_assert(message);
+
+ // @todo start of ks_dht2_message_parse_error
+ e = ben_dict_get_by_str(message->data, "e");
+ if (!e) {
+ ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n");
+ return KS_STATUS_FAIL;
+ }
+ ec = ben_list_get(e, 0);
+ es = ben_list_get(e, 1);
+ es_len = ben_str_len(es);
+ if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) {
+ ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len);
+ return KS_STATUS_FAIL;
+ }
+ errorcode = ben_int_val(ec);
+ et = ben_str_val(es);
+
+ memcpy(error, et, es_len);
+ error[es_len] = '\0';
+ // todo end of ks_dht2_message_parse_error
+
+ message->args = e;
+
+ tid = (uint32_t *)message->transactionid;
+ transactionid = ntohl(*tid);
+
+ transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+ ks_hash_read_unlock(dht->transactions_hash);
+
+ if (!transaction) {
+ ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
+ } else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
+ ks_log(KS_LOG_DEBUG,
+ "Message error rejected due to spoofing from %s %d, expected %s %d\n",
+ raddr->host,
+ raddr->port,
+ transaction->raddr.host,
+ transaction->raddr.port);
+ } else {
+ // @todo mark transaction for later removal
+ ks_dht2_message_callback_t callback;
+ transaction->finished = KS_TRUE;
+
+ if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
+ ret = callback(dht, raddr, message);
+ } else {
+ ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
+ ret = KS_STATUS_SUCCESS;
+ }
+ }
+
+ return ret;
+}
+
/**
*
*/
goto done;
}
- if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
+ if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
goto done;
}
ks_assert(dht);
ks_assert(raddr);
+ ks_assert(transactionid);
if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;