From: Lukáš Ondráček Date: Mon, 10 Jun 2024 18:19:10 +0000 (+0200) Subject: defer: add new KRU instance and async queues X-Git-Tag: v6.0.9~1^2~42 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ca4a97aa6c97e2ae99f320dea56c58a5300da430;p=thirdparty%2Fknot-resolver.git defer: add new KRU instance and async queues --- diff --git a/daemon/defer.c b/daemon/defer.c index 66b266708..5d0ad2229 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -1,28 +1,192 @@ #include "daemon/defer.h" +#include "daemon/mmapped.h" +#include "daemon/session2.h" #include "lib/kru.h" +#include "lib/utils.h" -// TODO: move kru_defer to another file +#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 } +#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 } -#include "daemon/session2.h" +#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 } +#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 } + +#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES)) +#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES)) +#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) + +#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} +#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) + +#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s +#define TIME_MULT 1/1 // max fraction of rate limit filled by one cpu (multiplies large int) // TODO divide by #cpus? + +struct defer { + size_t capacity; + kru_price_t max_decay; + bool using_avx2; + uint8_t kru[] ALIGNED(64); +}; +struct defer *defer = NULL; +struct mmapped defer_mmapped = {0}; + +uv_check_t check_handle; +protolayer_iter_ctx_queue_t queues[QUEUES_CNT]; defer_sample_state_t defer_sample_state = { - .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU .is_accounting = 0, }; +/// Return whether we're using optimized variant right now. +static bool using_avx2(void) +{ + bool result = (KRU.initialize == KRU_AVX2.initialize); + kr_require(result || KRU.initialize == KRU_GENERIC.initialize); + return result; +} + +/// Increment KRU counters by given time. +void defer_account(uint64_t nsec, union kr_sockaddr addr) { + uint8_t key[16] ALIGNED(16) = {0, }; + uint16_t max_load = 0; + if (defer_sample_state.addr.ip.sa_family == AF_INET6) { + struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)&defer_sample_state.addr.ip; + memcpy(key, &ipv6->sin6_addr, 16); + + kru_price_t prices[V6_PREFIXES_CNT]; + for (size_t i = 0; i < V6_PREFIXES_CNT; i++) { + prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V6_RATE_MULT[i]; // TODO adjust + } + + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT); + } else if (defer_sample_state.addr.ip.sa_family == AF_INET) { + struct sockaddr_in *ipv4 = (struct sockaddr_in *)&defer_sample_state.addr.ip; + memcpy(key, &ipv4->sin_addr, 4); // TODO append port? + + kru_price_t prices[V4_PREFIXES_CNT]; + for (size_t i = 0; i < V4_PREFIXES_CNT; i++) { + prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V4_RATE_MULT[i]; // TODO adjust + } + + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT); + } + kr_log_notice(DEVEL, "%8.3f ms for %s, load: %d\n", nsec / 1000000.0, + kr_straddr(&defer_sample_state.addr.ip), max_load); +} + +/// Determine whether the request should be deferred during unwrapping. static enum protolayer_iter_cb_result pl_defer_unwrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { + if (ctx->session->outgoing) + return protolayer_continue(ctx); + + defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); + + uint8_t key[16] ALIGNED(16) = {0, }; + uint16_t max_load = 0; + if (ctx->comm->comm_addr->sa_family == AF_INET6) { + struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)ctx->comm->comm_addr; + memcpy(key, &ipv6->sin6_addr, 16); + + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT); + } else if (ctx->comm->comm_addr->sa_family == AF_INET) { + struct sockaddr_in *ipv4 = (struct sockaddr_in *)ctx->comm->comm_addr; + memcpy(key, &ipv4->sin_addr, 4); // TODO append port? - kr_log_notice(DEVEL, "DEFER: %s\n", - kr_straddr(&defer_sample_state.addr.ip)); + max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), + 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT); + } - return protolayer_continue(ctx); - //return protolayer_async(); + int threshold_index = 0; // 0: synchronous + for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++); + + kr_log_notice(DEVEL, "DEFER | addr: %s, load: %d, queue: %d\n", + kr_straddr(ctx->comm->src_addr), + max_load, threshold_index); + + if (threshold_index == 0) + return protolayer_continue(ctx); + + queue_push(queues[threshold_index - 1], ctx); + + return protolayer_async(); +} + +/// Continue processing deferred requests in libuv check phase. +static void defer_queues_check(uv_check_t *handle) { + // TODO drop too old requests and/or break processing if it lasts too long (keeping some work to another check phase) + for (size_t i = 0; i < QUEUES_CNT; i++) { + while (queue_len(queues[i]) > 0) { + defer_sample_start(); + struct protolayer_iter_ctx *ctx = queue_head(queues[i]); + queue_pop(queues[i]); + defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr); + kr_log_notice(DEVEL, "DEFER continue: %s\n", + kr_straddr(ctx->comm->comm_addr)); + protolayer_continue(ctx); + defer_sample_stop(); + } + } } -void defer_init(void) { +/// Initialize defer, incl. shared memory with KRU. +int defer_init(uv_loop_t *loop) { + struct defer header = { // TODO adjust hardcoded values + .capacity = 1 << 10, + .max_decay = MAX_DECAY, + .using_avx2 = using_avx2(), + }; + + size_t capacity_log = 0; + for (size_t c = header.capacity - 1; c > 0; c >>= 1) capacity_log++; + + size_t size = offsetof(struct defer, kru) + KRU.get_size(capacity_log); + size_t header_size = offsetof(struct defer, kru); + + int ret = mmapped_init(&defer_mmapped, "defer", size, &header, header_size); + if (ret == MMAPPED_WAS_FIRST) { + kr_log_info(SYSTEM, "Initializing prioritization...\n"); + + defer = defer_mmapped.mem; + + bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay); + if (!succ) { + defer = NULL; + ret = kr_error(EINVAL); + goto fail; + } + + ret = mmapped_init_continue(&defer_mmapped); + if (ret != 0) goto fail; + + kr_log_info(SYSTEM, "Prioritization initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic")); + } else if (ret == 0) { + defer = defer_mmapped.mem; + kr_log_info(SYSTEM, "Using existing prioritization data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic")); + } else goto fail; + + for (size_t i = 0; i < QUEUES_CNT; i++) + queue_init(queues[i]); + protolayer_globals[PROTOLAYER_TYPE_DEFER].unwrap = pl_defer_unwrap; + uv_check_init(loop, &check_handle); + uv_check_start(&check_handle, defer_queues_check); + return 0; + +fail: + + kr_log_crit(SYSTEM, "Initialization of shared prioritization data failed.\n"); + return ret; +} + +/// Deinitialize shared memory. +void defer_deinit(void) +{ + mmapped_deinit(&defer_mmapped); + defer = NULL; } diff --git a/daemon/defer.h b/daemon/defer.h index 32f7da7be..7ded12880 100644 --- a/daemon/defer.h +++ b/daemon/defer.h @@ -1,18 +1,29 @@ #include #include "lib/defines.h" #include "lib/utils.h" +#include "lib/kru.h" -// TODO: reconsider `static inline` cases below +/// Initialize defer, incl. shared memory with KRU. +int defer_init(uv_loop_t *loop); + +/// Deinitialize shared memory. +void defer_deinit(void); +/// Increment KRU counters by the given time. +void defer_account(uint64_t nsec, union kr_sockaddr addr); typedef struct { - bool do_sample; /// whether to sample; could be important if _COARSE isn't available int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1 union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet uint64_t stamp; /// monotonic nanoseconds, probably won't wrap } defer_sample_state_t; extern defer_sample_state_t defer_sample_state; +extern struct defer *defer; /// skip sampling/deferring if NULL + + +// TODO: reconsider `static inline` cases below + #include static inline uint64_t get_stamp(void) { @@ -24,7 +35,7 @@ static inline uint64_t get_stamp(void) /// Start accounting work, if not doing it already. static inline void defer_sample_start(void) { - if (!defer_sample_state.do_sample) return; + if (!defer) return; kr_assert(!defer_sample_state.is_accounting); ++defer_sample_state.is_accounting; defer_sample_state.stamp = get_stamp(); @@ -34,7 +45,7 @@ static inline void defer_sample_start(void) /// Annotate the work currently being accounted by an IP address. static inline void defer_sample_addr(const union kr_sockaddr *addr) { - if (!defer_sample_state.do_sample || kr_fails_assert(addr)) return; + if (!defer || kr_fails_assert(addr)) return; if (!defer_sample_state.is_accounting) return; if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) { @@ -59,7 +70,7 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr) /// Stop accounting work - and change the source if applicable. static inline void defer_sample_stop(void) { - if (!defer_sample_state.do_sample) return; + if (!defer) return; if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird if (--defer_sample_state.is_accounting) return; @@ -67,12 +78,9 @@ static inline void defer_sample_stop(void) const uint64_t elapsed = get_stamp() - defer_sample_state.stamp; // we accounted something - // FIXME: drop the log, add KRU, etc. - kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0, - kr_straddr(&defer_sample_state.addr.ip)); + // TODO: some queries of internal origin have suspicioiusly high numbers. // We won't be really accounting those, but it might suggest some other issue. -} - -void defer_init(void); + defer_account(elapsed, defer_sample_state.addr); +} diff --git a/daemon/main.c b/daemon/main.c index c21c2f4b9..95df2e5a3 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -589,7 +589,6 @@ int main(int argc, char **argv) io_protolayers_init(); tls_protolayers_init(); proxy_protolayers_init(); - defer_init(); #ifdef ENABLE_DOH2 http_protolayers_init(); #endif @@ -623,6 +622,11 @@ int main(int argc, char **argv) lua_settop(the_engine->L, 0); } + if (defer_init(loop) != 0) { + ret = EXIT_FAILURE; + goto cleanup; + } + ret = kr_rules_init_ensure(); if (ret) { kr_log_error(RULES, "failed to initialize policy rule engine: %s\n", @@ -657,6 +661,7 @@ cleanup:/* Cleanup. */ worker_deinit(); engine_deinit(); network_deinit(); + defer_deinit(); kr_rules_commit(false); kr_rules_deinit(); if (loop != NULL) { diff --git a/daemon/ratelimiting.c b/daemon/ratelimiting.c index 320f0e18f..e6b134308 100644 --- a/daemon/ratelimiting.c +++ b/daemon/ratelimiting.c @@ -4,15 +4,15 @@ #include "lib/utils.h" #include "lib/resolve.h" -#define RRL_V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 } -#define RRL_V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 } +#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 } +#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 } -#define RRL_V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 } -#define RRL_V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 } +#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 } +#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 } -#define RRL_V4_PREFIXES_CNT (sizeof(RRL_V4_PREFIXES) / sizeof(*RRL_V4_PREFIXES)) -#define RRL_V6_PREFIXES_CNT (sizeof(RRL_V6_PREFIXES) / sizeof(*RRL_V6_PREFIXES)) -#define RRL_MAX_PREFIXES_CNT ((RRL_V4_PREFIXES_CNT > RRL_V6_PREFIXES_CNT) ? RRL_V4_PREFIXES_CNT : RRL_V6_PREFIXES_CNT) +#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES)) +#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES)) +#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) struct ratelimiting { size_t capacity; @@ -20,8 +20,8 @@ struct ratelimiting { uint32_t rate_limit; uint16_t tc_limit; bool using_avx2; - kru_price_t v4_prices[RRL_V4_PREFIXES_CNT]; - kru_price_t v6_prices[RRL_V6_PREFIXES_CNT]; + kru_price_t v4_prices[V4_PREFIXES_CNT]; + kru_price_t v6_prices[V6_PREFIXES_CNT]; uint8_t kru[] ALIGNED(64); }; struct ratelimiting *ratelimiting = NULL; @@ -63,14 +63,18 @@ int ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_l (uint64_t) base_price * rate_limit / 1000; bool succ = KRU.initialize((struct kru *)ratelimiting->kru, capacity_log, max_decay); - kr_require(succ); + if (!succ) { + ratelimiting = NULL; + ret = kr_error(EINVAL); + goto fail; + } - for (size_t i = 0; i < RRL_V4_PREFIXES_CNT; i++) { - ratelimiting->v4_prices[i] = base_price / RRL_V4_RATE_MULT[i]; + for (size_t i = 0; i < V4_PREFIXES_CNT; i++) { + ratelimiting->v4_prices[i] = base_price / V4_RATE_MULT[i]; } - for (size_t i = 0; i < RRL_V6_PREFIXES_CNT; i++) { - ratelimiting->v6_prices[i] = base_price / RRL_V6_RATE_MULT[i]; + for (size_t i = 0; i < V6_PREFIXES_CNT; i++) { + ratelimiting->v6_prices[i] = base_price / V6_RATE_MULT[i]; } ret = mmapped_init_continue(&ratelimiting_mmapped); @@ -87,14 +91,11 @@ int ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_l fail: kr_log_crit(SYSTEM, "Initialization of shared rate-limiting data failed.\n"); - return ret; } void ratelimiting_deinit(void) { - if (ratelimiting == NULL) return; - mmapped_deinit(&ratelimiting_mmapped); ratelimiting = NULL; } @@ -114,13 +115,13 @@ bool ratelimiting_request_begin(struct kr_request *req) memcpy(key, &ipv6->sin6_addr, 16); limited_prefix = KRU.limited_multi_prefix_or((struct kru *)ratelimiting->kru, kr_now(), - 1, key, RRL_V6_PREFIXES, ratelimiting->v6_prices, RRL_V6_PREFIXES_CNT, &max_final_load); + 1, key, V6_PREFIXES, ratelimiting->v6_prices, V6_PREFIXES_CNT, &max_final_load); } else { struct sockaddr_in *ipv4 = (struct sockaddr_in *)req->qsource.addr; memcpy(key, &ipv4->sin_addr, 4); // TODO append port? limited_prefix = KRU.limited_multi_prefix_or((struct kru *)ratelimiting->kru, kr_now(), - 0, key, RRL_V4_PREFIXES, ratelimiting->v4_prices, RRL_V4_PREFIXES_CNT, &max_final_load); + 0, key, V4_PREFIXES, ratelimiting->v4_prices, V4_PREFIXES_CNT, &max_final_load); } limited = (limited_prefix ? 2 : (max_final_load > ratelimiting->tc_limit ? 1 : 0)); } diff --git a/daemon/ratelimiting.test/tests.c b/daemon/ratelimiting.test/tests.c index 04e445ce6..eb65a7249 100644 --- a/daemon/ratelimiting.test/tests.c +++ b/daemon/ratelimiting.test/tests.c @@ -53,7 +53,7 @@ uint32_t _count_test(int expected_passing, int addr_family, char *addr_format, u static void the_tests(void **state) { /* IPv4 multi-prefix tests */ - static_assert(RRL_V4_PREFIXES_CNT == 4, + static_assert(V4_PREFIXES_CNT == 4, "There are no more IPv4 limited prefixes (/32, /24, /20, /18 will be tested)."); count_test("IPv4 instant limit /32", INST(V4, 32), 0, @@ -81,7 +81,7 @@ static void the_tests(void **state) AF_INET, "128.0.64.0", 0, 0); /* IPv6 multi-prefix tests */ - static_assert(RRL_V6_PREFIXES_CNT == 5, + static_assert(V6_PREFIXES_CNT == 5, "There are no more IPv6 limited prefixes (/128, /64, /56, /48, /32 will be tested)."); count_test("IPv6 instant limit /128, independent to IPv4", INST(V6, 128), 0, diff --git a/daemon/ratelimiting.test/tests.inc.c b/daemon/ratelimiting.test/tests.inc.c index 9e56bc67f..157da5587 100644 --- a/daemon/ratelimiting.test/tests.inc.c +++ b/daemon/ratelimiting.test/tests.inc.c @@ -40,7 +40,7 @@ uint64_t fakeclock_now(void); // Accessing RRL configuration of INSTANT/RATE limits for V4/V6 and specific prefix. #define LIMIT(type, Vx, prefix) (RRL_MULT(Vx, prefix) * RRL_ ## type ## _LIMIT) -#define RRL_CONFIG(Vx, name) RRL_ ## Vx ## _ ## name +#define RRL_CONFIG(Vx, name) Vx ## _ ## name #define RRL_MULT(Vx, prefix) get_mult(RRL_CONFIG(Vx, PREFIXES), RRL_CONFIG(Vx, RATE_MULT), RRL_CONFIG(Vx, PREFIXES_CNT), prefix) static inline kru_price_t get_mult(uint8_t prefixes[], kru_price_t mults[], size_t cnt, uint8_t wanted_prefix) { for (size_t i = 0; i < cnt; i++) diff --git a/daemon/session2.c b/daemon/session2.c index 13f1e52e8..39617b0ab 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -590,17 +590,13 @@ static int session2_submit( if (!had_comm_param) comm = &session->comm_storage; - // RRL: at this point we might start doing nontrivial work, + // DEFER: at this point we might start doing nontrivial work, // but we may not know the client's IP yet. // Note two cases: incoming session (new request) // vs. outgoing session (resuming work on some request) - if (direction == PROTOLAYER_UNWRAP) { + if (direction == PROTOLAYER_UNWRAP) defer_sample_start(); - // In particular we don't want to miss en/decryption work - // for regular connections from clients. - if (!session->outgoing && session->secure && !proxy_allowed(comm->comm_addr)) - defer_sample_addr((const union kr_sockaddr *)comm->comm_addr); - } + int ret; struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size); diff --git a/daemon/session2.h b/daemon/session2.h index c2ba5b85d..37a9ec9c5 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -241,7 +241,7 @@ static inline size_t wire_buf_free_space_length(const struct wire_buf *wb) * stream (may span multiple (un)wraps). */\ XX(DNS_SINGLE_STREAM) /**< Singular packet WITH prepended size in a * stream (may span multiple (un)wraps). */\ - /* Requests prioritization */\ + /* Prioritization of requests */\ XX(DEFER) /** The identifiers of protocol layer types. */ diff --git a/daemon/worker.c b/daemon/worker.c index 3c39f423c..58ca58395 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -364,7 +364,6 @@ static struct request_ctx *request_create(struct session2 *session, /* We need to store a copy of peer address. */ memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr)); req->qsource.addr = &ctx->source.addr.ip; - defer_sample_addr(&ctx->source.addr); if (!comm_addr) comm_addr = src_addr;