]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
defer: add new KRU instance and async queues docs-develop-rrl-8r8r8r/deployments/4324
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 10 Jun 2024 18:19:10 +0000 (20:19 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 10 Jun 2024 18:19:10 +0000 (20:19 +0200)
daemon/defer.c
daemon/defer.h
daemon/main.c
daemon/ratelimiting.c
daemon/ratelimiting.test/tests.c
daemon/ratelimiting.test/tests.inc.c
daemon/session2.c
daemon/session2.h
daemon/worker.c

index 66b266708573b4fa47b89162758c1a17b9f2009b..5d0ad222982eda98963fba5f9421522cc4b522c3 100644 (file)
 #include "daemon/defer.h"
+#include "daemon/mmapped.h"
+#include "daemon/session2.h"
 #include "lib/kru.h"
+#include "lib/utils.h"
 
-// TODO: move kru_defer to another file
+#define V4_PREFIXES  (uint8_t[])       {  18,  20, 24, 32 }
+#define V4_RATE_MULT (kru_price_t[])   { 768, 256, 32,  1 }
 
-#include "daemon/session2.h"
+#define V6_PREFIXES  (uint8_t[])       { 32, 48, 56, 64, 128 }
+#define V6_RATE_MULT (kru_price_t[])   { 64,  4,  3,  2,   1 }
+
+#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
+#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
+#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
+
+#define LOADS_THRESHOLDS  (uint16_t[])  {1<<4, 1<<8, 1<<11, -1}
+#define QUEUES_CNT        (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1)
+
+#define MAX_DECAY  (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s
+#define TIME_MULT  1/1   // max fraction of rate limit filled by one cpu (multiplies large int)  // TODO divide by #cpus?
+
+struct defer {
+       size_t capacity;
+       kru_price_t max_decay;
+       bool using_avx2;
+       uint8_t kru[] ALIGNED(64);
+};
+struct defer *defer = NULL;
+struct mmapped defer_mmapped = {0};
+
+uv_check_t check_handle;
+protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
 
 defer_sample_state_t defer_sample_state = {
-       .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU
        .is_accounting = 0,
 };
 
+/// Return whether we're using optimized variant right now.
+static bool using_avx2(void)
+{
+       bool result = (KRU.initialize == KRU_AVX2.initialize);
+       kr_require(result || KRU.initialize == KRU_GENERIC.initialize);
+       return result;
+}
+
+/// Increment KRU counters by given time.
+void defer_account(uint64_t nsec, union kr_sockaddr addr) {
+       uint8_t key[16] ALIGNED(16) = {0, };
+       uint16_t max_load = 0;
+       if (defer_sample_state.addr.ip.sa_family == AF_INET6) {
+               struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)&defer_sample_state.addr.ip;
+               memcpy(key, &ipv6->sin6_addr, 16);
+
+               kru_price_t prices[V6_PREFIXES_CNT];
+               for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
+                       prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V6_RATE_MULT[i];  // TODO adjust
+               }
+
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+                               1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT);
+       } else if (defer_sample_state.addr.ip.sa_family == AF_INET) {
+               struct sockaddr_in *ipv4 = (struct sockaddr_in *)&defer_sample_state.addr.ip;
+               memcpy(key, &ipv4->sin_addr, 4);  // TODO append port?
+
+               kru_price_t prices[V4_PREFIXES_CNT];
+               for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
+                       prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V4_RATE_MULT[i];   // TODO adjust
+               }
+
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+                               0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT);
+       }
 
+       kr_log_notice(DEVEL, "%8.3f ms for %s, load: %d\n", nsec / 1000000.0,
+                       kr_straddr(&defer_sample_state.addr.ip), max_load);
+}
+
+/// Determine whether the request should be deferred during unwrapping.
 static enum protolayer_iter_cb_result pl_defer_unwrap(
                void *sess_data, void *iter_data,
                struct protolayer_iter_ctx *ctx)
 {
+       if (ctx->session->outgoing)
+               return protolayer_continue(ctx);
+
+       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+
+       uint8_t key[16] ALIGNED(16) = {0, };
+       uint16_t max_load = 0;
+       if (ctx->comm->comm_addr->sa_family == AF_INET6) {
+               struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)ctx->comm->comm_addr;
+               memcpy(key, &ipv6->sin6_addr, 16);
+
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+                               1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT);
+       } else if (ctx->comm->comm_addr->sa_family == AF_INET) {
+               struct sockaddr_in *ipv4 = (struct sockaddr_in *)ctx->comm->comm_addr;
+               memcpy(key, &ipv4->sin_addr, 4);  // TODO append port?
 
-       kr_log_notice(DEVEL, "DEFER: %s\n",
-                       kr_straddr(&defer_sample_state.addr.ip));
+               max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
+                               0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT);
+       }
 
-       return protolayer_continue(ctx);
-       //return protolayer_async();
+       int threshold_index = 0;  // 0: synchronous
+       for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++);
+
+       kr_log_notice(DEVEL, "DEFER | addr: %s, load: %d, queue: %d\n",
+                       kr_straddr(ctx->comm->src_addr),
+                       max_load, threshold_index);
+
+       if (threshold_index == 0)
+               return protolayer_continue(ctx);
+
+       queue_push(queues[threshold_index - 1], ctx);
+
+       return protolayer_async();
+}
+
+/// Continue processing deferred requests in libuv check phase.
+static void defer_queues_check(uv_check_t *handle) {
+       // TODO drop too old requests and/or break processing if it lasts too long (keeping some work to another check phase)
+       for (size_t i = 0; i < QUEUES_CNT; i++) {
+               while (queue_len(queues[i]) > 0) {
+                       defer_sample_start();
+                       struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
+                       queue_pop(queues[i]);
+                       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+                       kr_log_notice(DEVEL, "DEFER continue: %s\n",
+                                       kr_straddr(ctx->comm->comm_addr));
+                       protolayer_continue(ctx);
+                       defer_sample_stop();
+               }
+       }
 }
 
-void defer_init(void) {
+/// Initialize defer, incl. shared memory with KRU.
+int defer_init(uv_loop_t *loop) {
+       struct defer header = {  // TODO adjust hardcoded values
+               .capacity = 1 << 10,
+               .max_decay = MAX_DECAY,
+               .using_avx2 = using_avx2(),
+       };
+
+       size_t capacity_log = 0;
+       for (size_t c = header.capacity - 1; c > 0; c >>= 1) capacity_log++;
+
+       size_t size = offsetof(struct defer, kru) + KRU.get_size(capacity_log);
+       size_t header_size = offsetof(struct defer, kru);
+
+       int ret = mmapped_init(&defer_mmapped, "defer", size, &header, header_size);
+       if (ret == MMAPPED_WAS_FIRST) {
+               kr_log_info(SYSTEM, "Initializing prioritization...\n");
+
+               defer = defer_mmapped.mem;
+
+               bool succ = KRU.initialize((struct kru *)defer->kru, capacity_log, header.max_decay);
+               if (!succ) {
+                       defer = NULL;
+                       ret = kr_error(EINVAL);
+                       goto fail;
+               }
+
+               ret = mmapped_init_continue(&defer_mmapped);
+               if (ret != 0) goto fail;
+
+               kr_log_info(SYSTEM, "Prioritization initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+       } else if (ret == 0) {
+               defer = defer_mmapped.mem;
+               kr_log_info(SYSTEM, "Using existing prioritization data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+       } else goto fail;
+
+       for (size_t i = 0; i < QUEUES_CNT; i++)
+               queue_init(queues[i]);
+
        protolayer_globals[PROTOLAYER_TYPE_DEFER].unwrap = pl_defer_unwrap;
+       uv_check_init(loop, &check_handle);
+       uv_check_start(&check_handle, defer_queues_check);
+       return 0;
+
+fail:
+
+       kr_log_crit(SYSTEM, "Initialization of shared prioritization data failed.\n");
+       return ret;
+}
+
+/// Deinitialize shared memory.
+void defer_deinit(void)
+{
+       mmapped_deinit(&defer_mmapped);
+       defer = NULL;
 }
index 32f7da7be909a11e4d1c7976e3b41407129454f7..7ded12880e378fa0fe659d4d7925e51e00ad808b 100644 (file)
@@ -1,18 +1,29 @@
 #include <stdbool.h>
 #include "lib/defines.h"
 #include "lib/utils.h"
+#include "lib/kru.h"
 
-// TODO: reconsider `static inline` cases below
+/// Initialize defer, incl. shared memory with KRU.
+int defer_init(uv_loop_t *loop);
+
+/// Deinitialize shared memory.
+void defer_deinit(void);
 
+/// Increment KRU counters by the given time.
+void defer_account(uint64_t nsec, union kr_sockaddr addr);
 
 typedef struct {
-       bool do_sample; /// whether to sample; could be important if _COARSE isn't available
        int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
        union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
        uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
 } defer_sample_state_t;
 extern defer_sample_state_t defer_sample_state;
 
+extern struct defer *defer;  /// skip sampling/deferring if NULL
+
+
+// TODO: reconsider `static inline` cases below
+
 #include <time.h>
 static inline uint64_t get_stamp(void)
 {
@@ -24,7 +35,7 @@ static inline uint64_t get_stamp(void)
 /// Start accounting work, if not doing it already.
 static inline void defer_sample_start(void)
 {
-       if (!defer_sample_state.do_sample) return;
+       if (!defer) return;
        kr_assert(!defer_sample_state.is_accounting);
        ++defer_sample_state.is_accounting;
        defer_sample_state.stamp = get_stamp();
@@ -34,7 +45,7 @@ static inline void defer_sample_start(void)
 /// Annotate the work currently being accounted by an IP address.
 static inline void defer_sample_addr(const union kr_sockaddr *addr)
 {
-       if (!defer_sample_state.do_sample || kr_fails_assert(addr)) return;
+       if (!defer || kr_fails_assert(addr)) return;
        if (!defer_sample_state.is_accounting) return;
 
        if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
@@ -59,7 +70,7 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr)
 /// Stop accounting work - and change the source if applicable.
 static inline void defer_sample_stop(void)
 {
-       if (!defer_sample_state.do_sample) return;
+       if (!defer) return;
 
        if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
        if (--defer_sample_state.is_accounting) return;
@@ -67,12 +78,9 @@ static inline void defer_sample_stop(void)
        const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
 
        // we accounted something
-       // FIXME: drop the log, add KRU, etc.
-       kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0,
-                       kr_straddr(&defer_sample_state.addr.ip));
+
        // TODO: some queries of internal origin have suspicioiusly high numbers.
        // We won't be really accounting those, but it might suggest some other issue.
-}
 
-
-void defer_init(void);
+       defer_account(elapsed, defer_sample_state.addr);
+}
index c21c2f4b9bc45cb8299f59c26250536a98113bf3..95df2e5a3a80ce436150e46121d2c9d991141afd 100644 (file)
@@ -589,7 +589,6 @@ int main(int argc, char **argv)
        io_protolayers_init();
        tls_protolayers_init();
        proxy_protolayers_init();
-       defer_init();
 #ifdef ENABLE_DOH2
        http_protolayers_init();
 #endif
@@ -623,6 +622,11 @@ int main(int argc, char **argv)
                lua_settop(the_engine->L, 0);
        }
 
+       if (defer_init(loop) != 0) {
+               ret = EXIT_FAILURE;
+               goto cleanup;
+       }
+
        ret = kr_rules_init_ensure();
        if (ret) {
                kr_log_error(RULES, "failed to initialize policy rule engine: %s\n",
@@ -657,6 +661,7 @@ cleanup:/* Cleanup. */
        worker_deinit();
        engine_deinit();
        network_deinit();
+       defer_deinit();
        kr_rules_commit(false);
        kr_rules_deinit();
        if (loop != NULL) {
index 320f0e18f06b4e75acf68abbfe6193f4a9c6f92d..e6b134308ad867dfd7cb8251545545622907a7d2 100644 (file)
@@ -4,15 +4,15 @@
 #include "lib/utils.h"
 #include "lib/resolve.h"
 
-#define RRL_V4_PREFIXES  (uint8_t[])       {  18,  20, 24, 32 }
-#define RRL_V4_RATE_MULT (kru_price_t[])   { 768, 256, 32,  1 }
+#define V4_PREFIXES  (uint8_t[])       {  18,  20, 24, 32 }
+#define V4_RATE_MULT (kru_price_t[])   { 768, 256, 32,  1 }
 
-#define RRL_V6_PREFIXES  (uint8_t[])       { 32, 48, 56, 64, 128 }
-#define RRL_V6_RATE_MULT (kru_price_t[])   { 64,  4,  3,  2,   1 }
+#define V6_PREFIXES  (uint8_t[])       { 32, 48, 56, 64, 128 }
+#define V6_RATE_MULT (kru_price_t[])   { 64,  4,  3,  2,   1 }
 
-#define RRL_V4_PREFIXES_CNT (sizeof(RRL_V4_PREFIXES) / sizeof(*RRL_V4_PREFIXES))
-#define RRL_V6_PREFIXES_CNT (sizeof(RRL_V6_PREFIXES) / sizeof(*RRL_V6_PREFIXES))
-#define RRL_MAX_PREFIXES_CNT ((RRL_V4_PREFIXES_CNT > RRL_V6_PREFIXES_CNT) ? RRL_V4_PREFIXES_CNT : RRL_V6_PREFIXES_CNT)
+#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
+#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
+#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
 
 struct ratelimiting {
        size_t capacity;
@@ -20,8 +20,8 @@ struct ratelimiting {
        uint32_t rate_limit;
        uint16_t tc_limit;
        bool using_avx2;
-       kru_price_t v4_prices[RRL_V4_PREFIXES_CNT];
-       kru_price_t v6_prices[RRL_V6_PREFIXES_CNT];
+       kru_price_t v4_prices[V4_PREFIXES_CNT];
+       kru_price_t v6_prices[V6_PREFIXES_CNT];
        uint8_t kru[] ALIGNED(64);
 };
 struct ratelimiting *ratelimiting = NULL;
@@ -63,14 +63,18 @@ int ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_l
                        (uint64_t) base_price * rate_limit / 1000;
 
                bool succ = KRU.initialize((struct kru *)ratelimiting->kru, capacity_log, max_decay);
-               kr_require(succ);
+               if (!succ) {
+                       ratelimiting = NULL;
+                       ret = kr_error(EINVAL);
+                       goto fail;
+               }
 
-               for (size_t i = 0; i < RRL_V4_PREFIXES_CNT; i++) {
-                       ratelimiting->v4_prices[i] = base_price / RRL_V4_RATE_MULT[i];
+               for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
+                       ratelimiting->v4_prices[i] = base_price / V4_RATE_MULT[i];
                }
 
-               for (size_t i = 0; i < RRL_V6_PREFIXES_CNT; i++) {
-                       ratelimiting->v6_prices[i] = base_price / RRL_V6_RATE_MULT[i];
+               for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
+                       ratelimiting->v6_prices[i] = base_price / V6_RATE_MULT[i];
                }
 
                ret = mmapped_init_continue(&ratelimiting_mmapped);
@@ -87,14 +91,11 @@ int ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_l
 fail:
 
        kr_log_crit(SYSTEM, "Initialization of shared rate-limiting data failed.\n");
-
        return ret;
 }
 
 void ratelimiting_deinit(void)
 {
-       if (ratelimiting == NULL) return;
-
        mmapped_deinit(&ratelimiting_mmapped);
        ratelimiting = NULL;
 }
@@ -114,13 +115,13 @@ bool ratelimiting_request_begin(struct kr_request *req)
                        memcpy(key, &ipv6->sin6_addr, 16);
 
                        limited_prefix = KRU.limited_multi_prefix_or((struct kru *)ratelimiting->kru, kr_now(),
-                                       1, key, RRL_V6_PREFIXES, ratelimiting->v6_prices, RRL_V6_PREFIXES_CNT, &max_final_load);
+                                       1, key, V6_PREFIXES, ratelimiting->v6_prices, V6_PREFIXES_CNT, &max_final_load);
                } else {
                        struct sockaddr_in *ipv4 = (struct sockaddr_in *)req->qsource.addr;
                        memcpy(key, &ipv4->sin_addr, 4);  // TODO append port?
 
                        limited_prefix = KRU.limited_multi_prefix_or((struct kru *)ratelimiting->kru, kr_now(),
-                                       0, key, RRL_V4_PREFIXES, ratelimiting->v4_prices, RRL_V4_PREFIXES_CNT, &max_final_load);
+                                       0, key, V4_PREFIXES, ratelimiting->v4_prices, V4_PREFIXES_CNT, &max_final_load);
                }
                limited = (limited_prefix ? 2 : (max_final_load > ratelimiting->tc_limit ? 1 : 0));
        }
index 04e445ce67ab7c4d1884f772087ebf809890734f..eb65a72491346af48383120cc85bfc1680f580ef 100644 (file)
@@ -53,7 +53,7 @@ uint32_t _count_test(int expected_passing, int addr_family, char *addr_format, u
 static void the_tests(void **state)
 {
        /* IPv4 multi-prefix tests */
-       static_assert(RRL_V4_PREFIXES_CNT == 4,
+       static_assert(V4_PREFIXES_CNT == 4,
                        "There are no more IPv4 limited prefixes (/32, /24, /20, /18 will be tested).");
 
        count_test("IPv4 instant limit /32", INST(V4, 32), 0,
@@ -81,7 +81,7 @@ static void the_tests(void **state)
                        AF_INET, "128.0.64.0", 0, 0);
 
        /* IPv6 multi-prefix tests */
-       static_assert(RRL_V6_PREFIXES_CNT == 5,
+       static_assert(V6_PREFIXES_CNT == 5,
                        "There are no more IPv6 limited prefixes (/128, /64, /56, /48, /32 will be tested).");
 
        count_test("IPv6 instant limit /128, independent to IPv4", INST(V6, 128), 0,
index 9e56bc67f51020d2a9ed501f6a833d4cc7500fe0..157da55872af34753736dc5d5e4fc1043a184692 100644 (file)
@@ -40,7 +40,7 @@ uint64_t fakeclock_now(void);
 // Accessing RRL configuration of INSTANT/RATE limits for V4/V6 and specific prefix.
 #define LIMIT(type, Vx, prefix) (RRL_MULT(Vx, prefix) * RRL_ ## type ## _LIMIT)
 
-#define RRL_CONFIG(Vx, name) RRL_ ## Vx ## _ ## name
+#define RRL_CONFIG(Vx, name) Vx ## _ ## name
 #define RRL_MULT(Vx, prefix) get_mult(RRL_CONFIG(Vx, PREFIXES), RRL_CONFIG(Vx, RATE_MULT), RRL_CONFIG(Vx, PREFIXES_CNT), prefix)
 static inline kru_price_t get_mult(uint8_t prefixes[], kru_price_t mults[], size_t cnt, uint8_t wanted_prefix) {
        for (size_t i = 0; i < cnt; i++)
index 13f1e52e8b44d16ff109325d5032afb31aa7ca38..39617b0ab05cd132fc0950d52f71e9f3a0d9a06b 100644 (file)
@@ -590,17 +590,13 @@ static int session2_submit(
        if (!had_comm_param)
                comm = &session->comm_storage;
 
-       // RRL: at this point we might start doing nontrivial work,
+       // DEFER: at this point we might start doing nontrivial work,
        // but we may not know the client's IP yet.
        // Note two cases: incoming session (new request)
        // vs. outgoing session (resuming work on some request)
-       if (direction == PROTOLAYER_UNWRAP) {
+       if (direction == PROTOLAYER_UNWRAP)
                defer_sample_start();
-               // In particular we don't want to miss en/decryption work
-               // for regular connections from clients.
-               if (!session->outgoing && session->secure && !proxy_allowed(comm->comm_addr))
-                       defer_sample_addr((const union kr_sockaddr *)comm->comm_addr);
-       }
+
        int ret;
 
        struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
index c2ba5b85db6ecbfd24c9c9c44d7feca40b89368c..37a9ec9c549d8c40d492644cde39434f9e8de09b 100644 (file)
@@ -241,7 +241,7 @@ static inline size_t wire_buf_free_space_length(const struct wire_buf *wb)
                              * stream (may span multiple (un)wraps). */\
        XX(DNS_SINGLE_STREAM) /**< Singular packet WITH prepended size in a
                               * stream (may span multiple (un)wraps). */\
-       /* Requests prioritization */\
+       /* Prioritization of requests */\
        XX(DEFER)
 
 /** The identifiers of protocol layer types. */
index 3c39f423c39e8ab834ce97258bbe7f6ee9e8e28e..58ca5839571609d28ff3b5c0a9275f55614745dc 100644 (file)
@@ -364,7 +364,6 @@ static struct request_ctx *request_create(struct session2 *session,
                /* We need to store a copy of peer address. */
                memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr));
                req->qsource.addr = &ctx->source.addr.ip;
-               defer_sample_addr(&ctx->source.addr);
 
                if (!comm_addr)
                        comm_addr = src_addr;