]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
defer: add request and idle timeouts, limit on waiting queries docs-develop-rrl-8r8r8r/deployments/4517
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 10 Jul 2024 15:28:45 +0000 (17:28 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Wed, 10 Jul 2024 15:47:27 +0000 (17:47 +0200)
daemon/defer.c
daemon/defer.h
daemon/mmapped.h
daemon/session2.c
daemon/session2.h
daemon/udp_queue.c
daemon/udp_queue.h

index a4a58e5be61aa3187750336e8784c9d3d9bebc72..134ce3a1ee4e9c99b975d717b1e791c7e8ff2749 100644 (file)
@@ -1,6 +1,7 @@
 #include "daemon/defer.h"
 #include "daemon/mmapped.h"
 #include "daemon/session2.h"
+#include "daemon/udp_queue.h"
 #include "lib/kru.h"
 #include "lib/utils.h"
 
 #define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
 #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
 
-#define LOADS_THRESHOLDS  (uint16_t[])  {1<<4, 1<<8, 1<<11, -1}
+#define LOADS_THRESHOLDS  (uint16_t[])  {1<<4, 1<<8, 1<<11, -1}    // the last one should be UINT16_MAX
 #define QUEUES_CNT        (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1)
 
-#define MAX_DECAY  (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s
-#define TIME_MULT  1/1   // max fraction of rate limit filled by one cpu (multiplies large int)  // TODO divide by #cpus?
+#define KRU_CAPACITY  (1<<10)
+#define MAX_DECAY     (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s
+#define TIME_MULT     1/1   // max fraction of rate limit filled by one cpu (multiplies large int)
+       // TODO divide by #cpus?
+
+#define REQ_TIMEOUT        5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
+#define IDLE_TIMEOUT       1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
+#define MAX_WAITING_REQS     10000 // if exceeded, process single deferred request immediatelly in poll phase
+
+#define VERBOSE_LOG(...) kr_log_notice(DEVEL, "defer |  " __VA_ARGS__)
+// #define VERBOSE_LOG(...)
 
 struct defer {
        size_t capacity;
@@ -29,13 +39,23 @@ struct defer {
 struct defer *defer = NULL;
 struct mmapped defer_mmapped = {0};
 
-uv_check_t check_handle;
-protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
-
 defer_sample_state_t defer_sample_state = {
        .is_accounting = 0,
 };
 
+uv_idle_t idle_handle;
+static void defer_queues_idle(uv_idle_t *handle);
+
+protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
+int waiting_requests = 0;
+int queue_ix = QUEUES_CNT;  // MIN( last popped queue, first non-empty queue )
+
+struct pl_defer_iter_data {
+       struct protolayer_data h;
+       uint64_t req_stamp;   // time when request was received, uses get_stamp()
+               // TODO use different clock than CLOCK_THREAD_CPUTIME_ID?
+};
+
 /// Return whether we're using optimized variant right now.
 static bool using_avx2(void)
 {
@@ -45,61 +65,51 @@ static bool using_avx2(void)
 }
 
 /// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr addr) {
+void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
        _Alignas(16) uint8_t key[16] = {0, };
        uint16_t max_load = 0;
        uint8_t prefix = 0;
-       if (defer_sample_state.addr.ip.sa_family == AF_INET6) {
-               struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)&defer_sample_state.addr.ip;
-               memcpy(key, &ipv6->sin6_addr, 16);
+       kru_price_t base_price = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll;  // TODO adjust
+
+       if (addr->ip.sa_family == AF_INET6) {
+               memcpy(key, &addr->ip6.sin6_addr, 16);
 
                kru_price_t prices[V6_PREFIXES_CNT];
                for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
-                       prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V6_RATE_MULT[i];  // TODO adjust
+                       prices[i] = base_price / V6_RATE_MULT[i];
                }
 
                max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
                                1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
-       } else if (defer_sample_state.addr.ip.sa_family == AF_INET) {
-               struct sockaddr_in *ipv4 = (struct sockaddr_in *)&defer_sample_state.addr.ip;
-               memcpy(key, &ipv4->sin_addr, 4);  // TODO append port?
+       } else if (addr->ip.sa_family == AF_INET) {
+               memcpy(key, &addr->ip4.sin_addr, 4);
 
                kru_price_t prices[V4_PREFIXES_CNT];
                for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
-                       prices[i] = (uint64_t)MAX_DECAY * nsec * TIME_MULT / 1000000ll / V4_RATE_MULT[i];   // TODO adjust
+                       prices[i] = base_price / V4_RATE_MULT[i];
                }
 
                max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
                                0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
        }
 
-       kr_log_notice(DEVEL, "%8.3f ms for %s, load: %d on /%d\n", nsec / 1000000.0,
-                       kr_straddr(&defer_sample_state.addr.ip), max_load, prefix);
+       VERBOSE_LOG("  %s ADD %4.3f ms -> load: %d on /%d\n",
+                       kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
 }
 
-/// Determine whether the request should be deferred during unwrapping.
-static enum protolayer_iter_cb_result pl_defer_unwrap(
-               void *sess_data, void *iter_data,
-               struct protolayer_iter_ctx *ctx)
+/// Determine priority of the request in [-1, QUEUES_CNT - 1].
+/// Lower value has higher priority, -1 should be synchronous.
+static inline int classify(const union kr_sockaddr *addr)
 {
-       if (ctx->session->outgoing)
-               return protolayer_continue(ctx);
-
-       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
-
        _Alignas(16) uint8_t key[16] = {0, };
        uint16_t max_load = 0;
        uint8_t prefix = 0;
-       if (ctx->comm->comm_addr->sa_family == AF_INET6) {
-               struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)ctx->comm->comm_addr;
-               memcpy(key, &ipv6->sin6_addr, 16);
-
+       if (addr->ip.sa_family == AF_INET6) {
+               memcpy(key, &addr->ip6.sin6_addr, 16);
                max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
                                1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
-       } else if (ctx->comm->comm_addr->sa_family == AF_INET) {
-               struct sockaddr_in *ipv4 = (struct sockaddr_in *)ctx->comm->comm_addr;
-               memcpy(key, &ipv4->sin_addr, 4);  // TODO append port?
-
+       } else if (addr->ip.sa_family == AF_INET) {
+               memcpy(key, &addr->ip4.sin_addr, 4);
                max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
                                0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
        }
@@ -107,39 +117,133 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        int threshold_index = 0;  // 0: synchronous
        for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++);
 
-       kr_log_notice(DEVEL, "DEFER | addr: %s, load: %d on /%d, queue: %d\n",
-                       kr_straddr(ctx->comm->src_addr),
-                       max_load, prefix, threshold_index);
+       VERBOSE_LOG("    load %d on /%d\n", max_load, prefix);
+
+       return threshold_index - 1;
+}
+
+
+
+/// Push query to a queue according to its priority and activate idle.
+static inline void push_query(struct protolayer_iter_ctx *ctx, int priority)
+{
+       queue_push(queues[priority], ctx);
+       queue_ix = MIN(queue_ix, priority);
+       if (waiting_requests++ <= 0) {
+               kr_assert(waiting_requests == 1);
+               uv_idle_start(&idle_handle, defer_queues_idle);
+               VERBOSE_LOG("  activating idle\n");
+       }
+}
+
+/// Pop and return the query with the highest priority, deactivate idle if not needed.
+static inline struct protolayer_iter_ctx *pop_query(void)
+{
+       for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
+       if (queue_ix >= QUEUES_CNT) return NULL;
+
+       struct protolayer_iter_ctx *ctx = queue_head(queues[queue_ix]);
+       queue_pop(queues[queue_ix]);
+       if (--waiting_requests <= 0) {
+               kr_assert(waiting_requests == 0);
+               uv_idle_stop(&idle_handle);
+               VERBOSE_LOG("  deactivating idle\n");
+       }
+       return ctx;
+}
+
+
+/// Process a single deferred query (or defer again) if there is any.
+/// Time accounting should have been just started, the stamp is used, accounted address is set.
+static inline void process_single_deferred(void) {
+       struct protolayer_iter_ctx *ctx = pop_query();
+       if (ctx == NULL) return;
+
+       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+
+       struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
+       uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
+
+       VERBOSE_LOG("  %s POP from %d after %4.3f ms\n",
+                       kr_straddr(ctx->comm->comm_addr),
+                       queue_ix,
+                       age_ns / 1000000.0);
+
+       if (age_ns >= REQ_TIMEOUT) {
+               VERBOSE_LOG("    BREAK\n");
+               protolayer_break(ctx, kr_error(ETIME));
+               return;
+       }
+
+       int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr);
+       if (priority > queue_ix) {  // priority dropped (got higher value)
+               VERBOSE_LOG("    PUSH to %d\n", priority);
+               push_query(ctx, priority);
+               return;
+       }
+
+       VERBOSE_LOG("    CONTINUE\n");
+       protolayer_continue(ctx);
+}
+
+/// Unwrap: defer or process the query synchronously.
+/// Time accounting should have been started, the stamp is used, accounted address is set.
+static enum protolayer_iter_cb_result pl_defer_unwrap(
+               void *sess_data, void *iter_data,
+               struct protolayer_iter_ctx *ctx)
+{
+       if (ctx->session->outgoing)
+               return protolayer_continue(ctx);
+
+       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+       struct pl_defer_iter_data *data = iter_data;
+       data->req_stamp = defer_sample_state.stamp;
+
+       VERBOSE_LOG("  %s UNWRAP\n",
+                       kr_straddr(ctx->comm->comm_addr));
+       int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr);
 
-       if (threshold_index == 0)
+       if (priority == -1) {
+               VERBOSE_LOG("    CONTINUE\n");
                return protolayer_continue(ctx);
+       }
 
-       queue_push(queues[threshold_index - 1], ctx);
+       VERBOSE_LOG("    PUSH to %d\n", priority);
+       push_query(ctx, priority);
+       while (waiting_requests > MAX_WAITING_REQS) {
+               defer_sample_restart();
+               process_single_deferred();  // possibly defers again without decreasing waiting_requests
+               // defer_sample_stop should be called soon outside
+       }
 
        return protolayer_async();
 }
 
-/// Continue processing deferred requests in libuv check phase.
-static void defer_queues_check(uv_check_t *handle) {
-       // TODO drop too old requests and/or break processing if it lasts too long (keeping some work to another check phase)
-       for (size_t i = 0; i < QUEUES_CNT; i++) {
-               while (queue_len(queues[i]) > 0) {
-                       defer_sample_start();
-                       struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
-                       queue_pop(queues[i]);
-                       defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
-                       kr_log_notice(DEVEL, "DEFER continue: %s\n",
-                                       kr_straddr(ctx->comm->comm_addr));
-                       protolayer_continue(ctx);
-                       defer_sample_stop();
-               }
+/// Idle: continue processing deferred requests.
+static void defer_queues_idle(uv_idle_t *handle) {
+       kr_assert(waiting_requests > 0);
+       VERBOSE_LOG("IDLE\n");
+       VERBOSE_LOG("  %d waiting\n", waiting_requests);
+       defer_sample_start();
+       uint64_t idle_stamp = defer_sample_state.stamp;
+       while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) {
+               process_single_deferred();
+               defer_sample_restart();
+       }
+       defer_sample_stop();  // TODO skip calling and use just restart elsewhere?
+       udp_queue_send_all();
+       if (waiting_requests > 0) {
+               VERBOSE_LOG("  %d waiting\n", waiting_requests);
        }
+       VERBOSE_LOG("POLL\n");
 }
 
-/// Initialize defer, incl. shared memory with KRU.
-int defer_init(uv_loop_t *loop) {
-       struct defer header = {  // TODO adjust hardcoded values
-               .capacity = 1 << 10,
+
+/// Initialize shared memory, queues, idle.
+int defer_init(uv_loop_t *loop)
+{
+       struct defer header = {
+               .capacity = KRU_CAPACITY,
                .max_decay = MAX_DECAY,
                .using_avx2 = using_avx2(),
        };
@@ -175,9 +279,7 @@ int defer_init(uv_loop_t *loop) {
        for (size_t i = 0; i < QUEUES_CNT; i++)
                queue_init(queues[i]);
 
-       protolayer_globals[PROTOLAYER_TYPE_DEFER].unwrap = pl_defer_unwrap;
-       uv_check_init(loop, &check_handle);
-       uv_check_start(&check_handle, defer_queues_check);
+       uv_idle_init(loop, &idle_handle);
        return 0;
 
 fail:
@@ -192,3 +294,13 @@ void defer_deinit(void)
        mmapped_deinit(&defer_mmapped);
        defer = NULL;
 }
+
+/// Initialize protolayer.
+__attribute__((constructor))
+static void defer_protolayers_init(void)
+{
+       protolayer_globals[PROTOLAYER_TYPE_DEFER] = (struct protolayer_globals){
+               .iter_size = sizeof(struct pl_defer_iter_data),
+               .unwrap = pl_defer_unwrap,
+       };
+}
index 7ded12880e378fa0fe659d4d7925e51e00ad808b..1856972a8315c8fa3c6f53de825164e1e831bcfa 100644 (file)
@@ -10,7 +10,7 @@ int defer_init(uv_loop_t *loop);
 void defer_deinit(void);
 
 /// Increment KRU counters by the given time.
-void defer_account(uint64_t nsec, union kr_sockaddr addr);
+void defer_account(uint64_t nsec, union kr_sockaddr *addr);
 
 typedef struct {
        int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
@@ -74,6 +74,7 @@ static inline void defer_sample_stop(void)
 
        if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
        if (--defer_sample_state.is_accounting) return;
+       if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
 
        const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
 
@@ -82,5 +83,22 @@ static inline void defer_sample_stop(void)
        // TODO: some queries of internal origin have suspicioiusly high numbers.
        // We won't be really accounting those, but it might suggest some other issue.
 
-       defer_account(elapsed, defer_sample_state.addr);
+       defer_account(elapsed, &defer_sample_state.addr);
+}
+
+/// Stop accounting if active, then start again. Uses just one stamp.
+static inline void defer_sample_restart(void)
+{
+       if (!defer) return;
+
+       uint64_t stamp = get_stamp();
+
+       if (defer_sample_state.is_accounting > 0) {
+               const uint64_t elapsed = stamp - defer_sample_state.stamp;
+               defer_account(elapsed, &defer_sample_state.addr);
+       }
+
+       defer_sample_state.stamp = stamp;
+       defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+       defer_sample_state.is_accounting = 1;
 }
index dd55903a570a3060a65595f506b17a1a97c14b7b..41414cba9446d849e2dbc6e644bc2083e11967eb 100644 (file)
@@ -22,5 +22,5 @@ int mmapped_init(struct mmapped *mmapped, const char *mmap_file, size_t size, vo
  * Returns zero on success and kr_error(errno) on system error. */
 int mmapped_init_continue(struct mmapped *mmapped);
 
-/* Free mmapped memory and truncate underlying file to zero size unless it is used by other processes. */
+/* Free mmapped memory and, unless the underlying file is used by other processes, truncate it to zero size. */
 void mmapped_deinit(struct mmapped *mmapped);
index 39617b0ab05cd132fc0950d52f71e9f3a0d9a06b..798d6f4d43aa5a6e1f5565b737c878878f685518 100644 (file)
@@ -348,6 +348,11 @@ static inline struct protolayer_data *protolayer_iter_data_get(
        return (struct protolayer_data *)(ctx->data + offset);
 }
 
+void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
+{
+       return protolayer_iter_data_get(ctx, ctx->layer_ix);
+}
+
 static inline ssize_t session2_get_protocol(
                struct session2 *s, enum protolayer_type protocol)
 {
index 37a9ec9c549d8c40d492644cde39434f9e8de09b..f10ed387b6f81fbfd25439589d8c46d64afe4066 100644 (file)
@@ -525,6 +525,11 @@ size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue);
  * queue iterators, as it does not need to iterate through the whole queue. */
 bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue);
 
+/** Gets layer-specific iteration data for the last processed layer.
+ * To be used after returning from its callback for async continuation but before calling protolayer_continue. */
+void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx);
+
+
 /** Layer-specific data - the generic struct. To be added as the first member of
  * each specific struct. */
 struct protolayer_data {
index 68d67ec602fe2b4d71988f9ab2ea9d09f08a8b30..0e829d2d6bcc5281f13acf13c20e5b5c0bc0aa8e 100644 (file)
@@ -86,15 +86,20 @@ static void udp_queue_send(int fd)
        q->len = 0;
 }
 
-/** Periodical callback to send all queued packets. */
-static void udp_queue_check(uv_check_t *handle)
-{
+/** Send all queued packets. */
+void udp_queue_send_all(void) {
        for (int i = 0; i < state.waiting_fds.len; ++i) {
                udp_queue_send(state.waiting_fds.at[i]);
        }
        state.waiting_fds.len = 0;
 }
 
+/** Periodical callback to send all queued packets. */
+static void udp_queue_check(uv_check_t *handle)
+{
+       udp_queue_send_all();
+}
+
 int udp_queue_init_global(uv_loop_t *loop)
 {
        int ret = uv_check_init(loop, &state.check_handle);
index ed0a32699d37b838908624529a05aef4d4043c6e..cb01f90a29d2fffeeaa73c702d44060d421c9af6 100644 (file)
@@ -17,3 +17,5 @@ int udp_queue_init_global(uv_loop_t *loop);
 void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len,
                     udp_queue_cb cb, void *baton);
 
+/** Send all queued packets immediatelly. */
+void udp_queue_send_all(void);