From: Shane Bryldt Date: Mon, 5 Dec 2016 20:43:52 +0000 (+0000) Subject: FS-9775: Added support for removing finished transactions via latent purging while... X-Git-Tag: v1.8.0~893^2~77 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7ac7a7e75b05ff27ea7ee0960ea33c6afa0052cb;p=thirdparty%2Ffreeswitch.git FS-9775: Added support for removing finished transactions via latent purging while expiring Also added support to send error message responses and updated the test to confirm, errors still need to be updated to send an error responses --- diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 81ed2f558d..15fcb32548 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -8,12 +8,21 @@ KS_BEGIN_EXTERN_C /** * */ -KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); +KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht); + KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr); KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index c82b1c49f1..d0460d5425 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -75,10 +75,13 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht2_register_type(dht, "q", ks_dht2_process_query); ks_dht2_register_type(dht, "r", ks_dht2_process_response); - // @todo ks_hash_insert the r/e callbacks into type registry + ks_dht2_register_type(dht, "e", ks_dht2_process_error); ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping); + + ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + // @todo register 301 error for internal get/put CAS hash mismatch retry handler dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; @@ -111,7 +114,6 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) dht->recv_buffer_length = 0; for (int32_t i = 0; i < dht->endpoints_size; ++i) { ks_dht2_endpoint_t *ep = dht->endpoints[i]; - //ks_hash_remove(dht->endpoints_hash, ep->addr.host); ks_dht2_endpoint_deinit(ep); ks_dht2_endpoint_free(ep); } @@ -139,6 +141,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) ks_hash_destroy(&dht->registry_query); dht->registry_query = NULL; } + if (dht->registry_error) { + ks_hash_destroy(&dht->registry_error); + dht->registry_error = NULL; + } ks_dht2_nodeid_deinit(&dht->nodeid); @@ -191,6 +197,18 @@ KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_register_error(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback) +{ + ks_assert(dht); + ks_assert(value); + ks_assert(callback); + + return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; +} + /** * */ @@ -297,52 +315,17 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout) } } + ks_dht2_idle(dht); + return KS_STATUS_SUCCESS; } /** * */ -KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) { - // @todo lookup standard def for IPV6 max size - char ip[48]; - ks_dht2_endpoint_t *ep; - // @todo calculate max IPV6 payload size? - char buf[1000]; - ks_size_t buf_len; - ks_assert(dht); - ks_assert(raddr); - ks_assert(message); - ks_assert(message->data); - - // @todo blacklist check - - ks_ip_route(ip, sizeof(ip), raddr->host); - - if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { - ks_sockaddr_t addr; - ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); - if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - } - - if (!ep) { - ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); - return KS_STATUS_FAIL; - } - - buf_len = ben_encode2(buf, sizeof(buf), message->data); - - ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port); - ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); - - if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) { - ks_log(KS_LOG_DEBUG, "Socket error\n"); - return KS_STATUS_FAIL; - } return KS_STATUS_SUCCESS; } @@ -350,20 +333,49 @@ KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dh /** * */ -KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) +KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) { ks_assert(dht); + if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + return KS_STATUS_SUCCESS; } /** * */ -KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) +KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht) { + ks_hash_iterator_t *it = NULL; + ks_time_t now = ks_time_now_sec(); + ks_assert(dht); + // @todo add delay between checking expirations, every 10 seconds? + + ks_hash_write_lock(dht->transactions_hash); + for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key = NULL; + ks_dht2_transaction_t *value = NULL; + ks_bool_t remove = KS_FALSE; + + ks_hash_this(it, &key, NULL, (void **)&value); + if (value->finished) { + remove = KS_TRUE; + } else if (value->expiration <= now) { + ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); + remove = KS_TRUE; + } + if (remove) { + ks_hash_remove(dht->transactions_hash, (char *)key); + ks_pool_free(value->pool, value); + } + } + ks_hash_write_unlock(dht->transactions_hash); + return KS_STATUS_SUCCESS; } @@ -411,6 +423,96 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) return ret; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + // @todo lookup standard def for IPV6 max size + char ip[48]; + ks_dht2_endpoint_t *ep; + // @todo calculate max IPV6 payload size? + char buf[1000]; + ks_size_t buf_len; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + ks_assert(message->data); + + // @todo blacklist check + + ks_ip_route(ip, sizeof(ip), raddr->host); + + if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { + ks_sockaddr_t addr; + ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); + if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + } + + if (!ep) { + ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); + return KS_STATUS_FAIL; + } + + buf_len = ben_encode2(buf, sizeof(buf), message->data); + + ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port); + ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); + + if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Socket error\n"); + return KS_STATUS_FAIL; + } + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr) +{ + ks_dht2_message_t error; + struct bencode *e; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(transactionid); + ks_assert(errorstr); + + if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) { + goto done; + } + + // @note e joins response.data and will be freed with it + ben_list_append(e, ben_int(errorcode)); + ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); + + ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); + ret = ks_dht2_send(dht, raddr, &error); + + done: + ks_dht2_message_deinit(&error); + return ret; +} + /** * */ @@ -497,13 +599,96 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t * if (!transaction) { ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); + } else if (!ks_addr_cmp(raddr, &transaction->raddr)) { + ks_log(KS_LOG_DEBUG, + "Message response rejected due to spoofing from %s %d, expected %s %d\n", + raddr->host, + raddr->port, + transaction->raddr.host, + transaction->raddr.port); } else { + // @todo mark transaction for later removal + transaction->finished = KS_TRUE; ret = transaction->callback(dht, raddr, message); } return ret; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + struct bencode *e; + struct bencode *ec; + struct bencode *es; + const char *et; + ks_size_t es_len; + long long errorcode; + char error[KS_DHT_MESSAGE_ERROR_MAX_SIZE]; + ks_dht2_transaction_t *transaction; + uint32_t *tid; + uint32_t transactionid; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + + // @todo start of ks_dht2_message_parse_error + e = ben_dict_get_by_str(message->data, "e"); + if (!e) { + ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n"); + return KS_STATUS_FAIL; + } + ec = ben_list_get(e, 0); + es = ben_list_get(e, 1); + es_len = ben_str_len(es); + if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) { + ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len); + return KS_STATUS_FAIL; + } + errorcode = ben_int_val(ec); + et = ben_str_val(es); + + memcpy(error, et, es_len); + error[es_len] = '\0'; + // todo end of ks_dht2_message_parse_error + + message->args = e; + + tid = (uint32_t *)message->transactionid; + transactionid = ntohl(*tid); + + transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); + ks_hash_read_unlock(dht->transactions_hash); + + if (!transaction) { + ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); + } else if (!ks_addr_cmp(raddr, &transaction->raddr)) { + ks_log(KS_LOG_DEBUG, + "Message error rejected due to spoofing from %s %d, expected %s %d\n", + raddr->host, + raddr->port, + transaction->raddr.host, + transaction->raddr.port); + } else { + // @todo mark transaction for later removal + ks_dht2_message_callback_t callback; + transaction->finished = KS_TRUE; + + if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) { + ret = callback(dht, raddr, message); + } else { + ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); + ret = KS_STATUS_SUCCESS; + } + } + + return ret; +} + /** * */ @@ -585,7 +770,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *r goto done; } - if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { + if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { goto done; } @@ -634,6 +819,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht, ks_assert(dht); ks_assert(raddr); + ks_assert(transactionid); if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index b6ed56c3ae..8985387ef1 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -16,7 +16,9 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 +#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 +#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 typedef struct ks_dht2_s ks_dht2_t; typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t; @@ -54,9 +56,11 @@ struct ks_dht2_endpoint_s { struct ks_dht2_transaction_s { ks_pool_t *pool; + ks_sockaddr_t raddr; uint32_t transactionid; ks_dht2_message_callback_t callback; - // @todo expiration data + ks_time_t expiration; + ks_bool_t finished; }; @@ -71,6 +75,7 @@ struct ks_dht2_s { ks_hash_t *registry_type; ks_hash_t *registry_query; + ks_hash_t *registry_error; ks_bool_t bind_ipv4; ks_bool_t bind_ipv6; @@ -137,6 +142,10 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message, uint8_t *transactionid, ks_size_t transactionid_length, struct bencode **args); +KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args); /** * @@ -150,6 +159,7 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *tras KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction); KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, + ks_sockaddr_t *raddr, uint32_t transactionid, ks_dht2_message_callback_t callback); KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction); diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index a84eb60e77..ed88a0fe63 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -207,6 +207,33 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message, return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args) +{ + struct bencode *e; + + ks_assert(message); + ks_assert(transactionid); + + ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); + ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1)); + + // @note r joins message->data and will be freed with it + e = ben_list(); + ben_dict_set(message->data, ben_blob("e", 1), e); + + if (args) { + *args = e; + } + + return KS_STATUS_SUCCESS; +} + /* For Emacs: * Local Variables: diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 9cfe883433..3b62f8eaf3 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -48,15 +48,20 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transact * */ KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, + ks_sockaddr_t *raddr, uint32_t transactionid, ks_dht2_message_callback_t callback) { ks_assert(transaction); + ks_assert(raddr); ks_assert(transaction->pool); ks_assert(callback); + transaction->raddr = *raddr; transaction->transactionid = transactionid; transaction->callback = callback; + transaction->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY; + transaction->finished = KS_FALSE; return KS_STATUS_SUCCESS; } @@ -68,8 +73,11 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transa { ks_assert(transaction); + transaction->raddr = (const ks_sockaddr_t){ 0 }; transaction->transactionid = 0; transaction->callback = NULL; + transaction->expiration = 0; + transaction->finished = KS_FALSE; return KS_STATUS_SUCCESS; } diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 482449c689..f54b776511 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -10,6 +10,7 @@ ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message { diag("dht_z_callback\n"); ok(message->transactionid[0] == '4' && message->transactionid[1] == '2'); + ks_dht2_send_error(dht, raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); return KS_STATUS_SUCCESS; } @@ -97,6 +98,8 @@ int main() { err = ks_dht2_process(dht1, &raddr); ok(err == KS_STATUS_SUCCESS); + err = ks_dht2_pulse(&dht2, 1000); + ok(err == KS_STATUS_SUCCESS); //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);