]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: add alternate UDP and non-UDP phases docs-develop-rrl-8r8r8r/deployments/5284
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 7 Oct 2024 16:28:49 +0000 (18:28 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 7 Oct 2024 17:06:50 +0000 (19:06 +0200)
daemon/defer.c
daemon/defer.h

index d86cd83776da3a4b2c9d13ec6ba1ee8799da71cd..38e97074e83b5e1864c147fc10b35383abc368d6 100644 (file)
 #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
 
 #define LOADS_THRESHOLDS     (uint16_t[])  {1<<4, 1<<8, 1<<11, -1}    // the last one should be UINT16_MAX
-#define QUEUES_CNT           (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS))  // -1 for synchronous, +1 for unverified
-#define UNVERIFIED_PRIORITY  1  // -1 synchronous, 1 async UDP, {0, 2, 3} other async
+#define QUEUES_CNT           (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1)  // +1 for unverified
+#define PRIORITY_SYNC        (-1)              // no queue
+#define PRIORITY_UDP         (QUEUES_CNT - 1)  // last queue
 
 #define KRU_CAPACITY         (1<<10)
 #define MAX_DECAY            (KRU_LIMIT * 0.0006929)  // -> halving counters in 1s of accounted time
 #define BASE_PRICE(nsec)     ((uint64_t)MAX_DECAY * nsec / 1000000ll)
 // TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time)
 
-#define REQ_TIMEOUT        5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
-#define IDLE_TIMEOUT       1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
-#define MAX_WAITING_REQS     10000 // if exceeded, process single deferred request immediatelly in poll phase
+#define REQ_TIMEOUT           5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
+#define IDLE_TIMEOUT          1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
+#define PHASE_UDP_TIMEOUT      400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
+#define PHASE_NON_UDP_TIMEOUT  400000 // ns (THREAD_CPUTIME);    after timeout or emptying queue
+#define MAX_WAITING_REQS        10000 // if exceeded, process single deferred request immediatelly in poll phase
 
 #define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)
 
@@ -53,6 +56,30 @@ protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
 int waiting_requests = 0;
 int queue_ix = QUEUES_CNT;  // MIN( last popped queue, first non-empty queue )
 
+enum phase {
+       PHASE_UDP      = 1,
+       PHASE_NON_UDP  = 2,
+       PHASE_ANY      = PHASE_UDP | PHASE_NON_UDP
+} phase = PHASE_ANY;
+uint64_t phase_elapsed = 0;    // ns
+bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account
+
+static inline void phase_set(enum phase p) {
+       if (phase != p) {
+               phase_elapsed = 0;
+               phase = p;
+       }
+}
+static inline void phase_account(uint64_t nsec) {
+       kr_assert(phase != PHASE_ANY);
+       phase_elapsed += nsec;
+       if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
+               phase_set(PHASE_NON_UDP);
+       } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) {
+               phase_set(PHASE_UDP);
+       }
+}
+
 struct pl_defer_iter_data {
        struct protolayer_data h;
        uint64_t req_stamp;   // time when request was received, uses get_stamp()
@@ -68,7 +95,14 @@ static bool using_avx2(void)
 }
 
 /// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
+void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) {
+       if (phase_accounting) {
+               phase_account(nsec);
+               phase_accounting = false;
+       }
+
+       if (!stream) return;  // UDP is not accounted in KRU
+
        _Alignas(16) uint8_t key[16] = {0, };
        uint16_t max_load = 0;
        uint8_t prefix = 0;
@@ -113,11 +147,17 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
 
 /// Determine priority of the request in [-1, QUEUES_CNT - 1].
 /// Lower value has higher priority, -1 should be synchronous.
+/// Both UDP and non-UDP may end up with synchronous priority
+/// if the phase is active and no requests can be scheduled before them.
 static inline int classify(const union kr_sockaddr *addr, bool stream)
 {
-       if (!stream) {
+       if (!stream) { // UDP
                VERBOSE_LOG("    unverified address\n");
-               return UNVERIFIED_PRIORITY; // UDP
+               if ((phase & PHASE_UDP) && (queue_len(queues[PRIORITY_UDP]) == 0)) {
+                       phase_set(PHASE_UDP);
+                       return PRIORITY_SYNC;
+               }
+               return PRIORITY_UDP;
        }
 
        uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
@@ -134,15 +174,15 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
                                0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
        }
 
-       int threshold_index = 0;  // 0: synchronous
-       for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++);
+       int priority = 0;
+       for (; LOADS_THRESHOLDS[priority] < max_load; priority++);
 
        VERBOSE_LOG("    load %d on /%d\n", max_load, prefix);
 
-       int priority = threshold_index - 1;
-       if (priority >= UNVERIFIED_PRIORITY)
-               priority++;
-
+       if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) {
+               phase_set(PHASE_NON_UDP);
+               return PRIORITY_SYNC;
+       }
        return priority;
 }
 
@@ -160,14 +200,36 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority)
        }
 }
 
-/// Pop and return the query with the highest priority, deactivate idle if not needed.
+/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
+/// deactivate idle if not needed.
 static inline struct protolayer_iter_ctx *pop_query(void)
 {
-       for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
-       if (queue_ix >= QUEUES_CNT) return NULL;
+       const int waiting_udp = queue_len(queues[PRIORITY_UDP]);
+       const int waiting_non_udp = waiting_requests - waiting_udp;
+
+       enum phase new_phase;
+       if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) {
+               new_phase = PHASE_NON_UDP;  // maybe changing from PHASE_ANY
+       } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) {
+               new_phase = PHASE_UDP;      // maybe changing from PHASE_ANY
+       } else if (waiting_non_udp > 0) {
+               new_phase = PHASE_NON_UDP;  // change from PHASE_UDP, no UDP queries
+       } else {
+               new_phase = PHASE_UDP;      // change from PHASE_NON_UDP, no non-UDP queries
+       }
+       phase_set(new_phase);
 
-       struct protolayer_iter_ctx *ctx = queue_head(queues[queue_ix]);
-       queue_pop(queues[queue_ix]);
+       int i;
+       if (phase == PHASE_NON_UDP) {
+               for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
+               if (queue_ix >= PRIORITY_UDP) kr_assert(false);
+               i = queue_ix;
+       } else {
+               i = PRIORITY_UDP;
+       }
+
+       struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
+       queue_pop(queues[i]);
        if (--waiting_requests <= 0) {
                kr_assert(waiting_requests == 0);
                uv_idle_stop(&idle_handle);
@@ -184,6 +246,7 @@ static inline void process_single_deferred(void) {
        if (ctx == NULL) return;
 
        defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
+       phase_accounting = true;
 
        struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
        uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
@@ -234,6 +297,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
 
        if (priority == -1) {
                VERBOSE_LOG("    CONTINUE\n");
+               phase_accounting = true;
                return protolayer_continue(ctx);
        }
 
@@ -260,10 +324,12 @@ static void defer_queues_idle(uv_idle_t *handle) {
                defer_sample_restart();
        }
        defer_sample_stop();  // TODO skip calling and use just restart elsewhere?
-       udp_queue_send_all(); // TODO keep here or call after processing each priority level?
-                             //      (or after UNVERIFIED_PRIORITY but beware future QUIC)
+       udp_queue_send_all();
+
        if (waiting_requests > 0) {
                VERBOSE_LOG("  %d waiting\n", waiting_requests);
+       } else {
+               phase_set(PHASE_ANY);
        }
        VERBOSE_LOG("POLL\n");
 }
index 3d7980779c27184e8ca7ba680a7558fd26bdcdfc..b110151d84282821f864ba0c7a621a634947c4ea 100644 (file)
@@ -10,11 +10,12 @@ int defer_init(uv_loop_t *loop);
 void defer_deinit(void);
 
 /// Increment KRU counters by the given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr);
+void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream);
 
 typedef struct {
        int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
        union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
+       bool stream;
        uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
 } defer_sample_state_t;
 extern defer_sample_state_t defer_sample_state;
@@ -47,7 +48,6 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
 {
        if (!defer || kr_fails_assert(addr)) return;
        if (!defer_sample_state.is_accounting) return;
-       if (!stream) return;  // UDP is not counted
 
        if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
                // TODO: this costs performance, so only in some debug mode?
@@ -66,6 +66,7 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
                defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
                break;
        }
+       defer_sample_state.stream = stream;
 }
 
 /// Stop accounting work - and change the source if applicable.
@@ -84,7 +85,7 @@ static inline void defer_sample_stop(void)
        // TODO: some queries of internal origin have suspicioiusly high numbers.
        // We won't be really accounting those, but it might suggest some other issue.
 
-       defer_account(elapsed, &defer_sample_state.addr);
+       defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
 }
 
 /// Stop accounting if active, then start again. Uses just one stamp.
@@ -96,7 +97,7 @@ static inline void defer_sample_restart(void)
 
        if (defer_sample_state.is_accounting > 0) {
                const uint64_t elapsed = stamp - defer_sample_state.stamp;
-               defer_account(elapsed, &defer_sample_state.addr);
+               defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
        }
 
        defer_sample_state.stamp = stamp;