]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: redesign UDP and non-UDP phase transition docs-develop-defe-x6j6qe/deployments/6011
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Fri, 10 Jan 2025 17:04:41 +0000 (18:04 +0100)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Fri, 10 Jan 2025 17:04:41 +0000 (18:04 +0100)
daemon/defer.c

index b1abcacd57deec602e2a64b3b16d1c11fc76c2f0..223a2d615e4bfff8fee521e9cb5ecd0d56f29de1 100644 (file)
@@ -85,31 +85,43 @@ ptrdiff_t waiting_requests_size = 0;  // signed for non-negativeness asserts
 int queue_ix = QUEUES_CNT;  // MIN( last popped queue, first non-empty queue )
 
 enum phase {
-       PHASE_UDP      = 1,
-       PHASE_NON_UDP  = 2,
-       PHASE_ANY      = PHASE_UDP | PHASE_NON_UDP
-} phase = PHASE_ANY;
-uint64_t phase_elapsed = 0;    // ns
-bool phase_accounting = false; // add accounted time to phase_elapsed in defer_account
-
-static inline void phase_set(enum phase p)
+       PHASE_NONE,
+       PHASE_UDP,
+       PHASE_NON_UDP
+} phase = PHASE_NONE;
+uint64_t phase_elapsed[3] = { 0 };  // ns; [PHASE_NONE] value is being incremented but never used
+const uint64_t phase_limits[3] = {0, PHASE_UDP_TIMEOUT, PHASE_NON_UDP_TIMEOUT};
+uint64_t phase_stamp = 0;
+
+static inline bool phase_over_limit(enum phase p)
 {
-       if (phase != p) {
-               phase_elapsed = 0;
-               phase = p;
-       }
+       return phase_elapsed[p] >= phase_limits[p];
+}
+
+/// Reset elapsed times of phases and set phase to UDP, NON_UDP, or NONE.
+static inline void phase_reset(enum phase p)
+{
+       phase_elapsed[PHASE_UDP] = 0;
+       phase_elapsed[PHASE_NON_UDP] = 0;
+       phase_stamp = defer_sample_state.stamp;
+       phase = p;
 }
-static inline void phase_charge(uint64_t nsec)
+
+/// Set phase to UDP or NON_UDP if it is not over limit or both are over limit (reset them).
+static inline bool phase_try_set(enum phase p)
 {
-       kr_assert(phase != PHASE_ANY);
-       phase_elapsed += nsec;
-       if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
-               phase_set(PHASE_NON_UDP);
-               phase_accounting = false;
-       } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) {
-               phase_set(PHASE_UDP);
-               phase_accounting = false;
+       phase_elapsed[phase] += defer_sample_state.stamp - phase_stamp;
+       phase_stamp = defer_sample_state.stamp;
+
+       if (!phase_over_limit(p)) {
+               phase = p;
+               return true;
+       } else if (phase_over_limit(PHASE_UDP) && phase_over_limit(PHASE_NON_UDP)) {
+               phase_reset(p);
+               return true;
        }
+
+       return false;
 }
 
 struct pl_defer_sess_data {
@@ -247,11 +259,7 @@ static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *
 /// Increment KRU counters by given time.
 void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
 {
-       if (phase_accounting) {
-               phase_charge(nsec);
-       }
-
-       if (!stream) return;  // UDP is not accounted in KRU
+       if (!stream) return;  // UDP is not accounted in KRU; TODO remove !stream invocations?
 
        _Alignas(16) uint8_t key[16] = {0, };
        const struct kru_conf *kru_conf;
@@ -312,7 +320,7 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
 }
 
 
-/// Push query to a queue according to its priority and activate idle.
+/// Push query to a queue according to its priority.
 static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
 {
        if (to_head_end) {
@@ -321,46 +329,30 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo
                queue_push(queues[priority], ctx);
        }
        queue_ix = MIN(queue_ix, priority);
-       if (waiting_requests++ <= 0) {
-               kr_assert(waiting_requests == 1);
-               uv_idle_start(&idle_handle, defer_queues_idle);
-               VERBOSE_LOG("  activating idle\n");
-       }
+       waiting_requests++;
 }
 
-/// Pop and return query from the specified queue, deactivate idle if not needed.
+/// Pop and return query from the specified queue..
 static inline struct protolayer_iter_ctx *pop_query_queue(int priority)
 {
        kr_assert(queue_len(queues[priority]) > 0);
        struct protolayer_iter_ctx *ctx = queue_head(queues[priority]);
        queue_pop(queues[priority]);
-       if (--waiting_requests <= 0) {
-               kr_assert(waiting_requests == 0);
-               uv_idle_stop(&idle_handle);
-               VERBOSE_LOG("  deactivating idle\n");
-       }
+       waiting_requests--;
+       kr_assert(waiting_requests >= 0);
        return ctx;
 }
 
 
-/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
-/// deactivate idle if not needed.
+/// Pop and return the query with the highest priority, UDP or non-UDP based on the current phase.
 static inline struct protolayer_iter_ctx *pop_query(void)
 {
        const int waiting_udp = queue_len(queues[PRIORITY_UDP]);
        const int waiting_non_udp = waiting_requests - waiting_udp;
 
-       enum phase new_phase;
-       if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) {
-               new_phase = PHASE_NON_UDP;  // maybe changing from PHASE_ANY
-       } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) {
-               new_phase = PHASE_UDP;      // maybe changing from PHASE_ANY
-       } else if (waiting_non_udp > 0) {
-               new_phase = PHASE_NON_UDP;  // change from PHASE_UDP, no UDP queries
-       } else {
-               new_phase = PHASE_UDP;      // change from PHASE_NON_UDP, no non-UDP queries
-       }
-       phase_set(new_phase);
+       if (!((waiting_non_udp > 0) && phase_try_set(PHASE_NON_UDP)) &&
+                 !((waiting_udp     > 0) && phase_try_set(PHASE_UDP)))
+               phase_reset(waiting_non_udp > 0 ? PHASE_NON_UDP : PHASE_UDP);
 
        int i;
        if (phase == PHASE_NON_UDP) {
@@ -501,13 +493,11 @@ static inline void process_deferred_over_size_limit(void) {
                defer_sample_state_t prev_sample_state;
                defer_sample_start(&prev_sample_state);
                do {
-                       phase_accounting = true;
                        process_single_deferred();  // possibly defers again without decreasing waiting_requests_size
                                // If the unwrapped query is to be processed here,
                                // it is the last iteration and the query is processed after returning.
                        defer_sample_restart();
                } while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
-               phase_accounting = false;
                defer_sample_stop(&prev_sample_state, true);
        }
 }
@@ -538,7 +528,6 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
                void *sess_data, void *iter_data,
                struct protolayer_iter_ctx *ctx)
 {
-       phase_accounting = false;
        if (!defer || ctx->session->outgoing)
                return protolayer_continue(ctx);
 
@@ -550,6 +539,8 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        VERBOSE_LOG("  %s UNWRAP\n",
                        kr_straddr(ctx->comm->src_addr));
 
+       uv_idle_start(&idle_handle, defer_queues_idle);
+
        if (queue_len(sdata->queue) > 0) {  // stream with preceding packet already deferred
                queue_push(sdata->queue, ctx);
                waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false);
@@ -561,14 +552,10 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
 
        int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
 
-       // Process synchronously if the phase is active and no requests can be scheduled before.
-       if ((
-                               ((priority == PRIORITY_UDP) && (phase & PHASE_UDP)) ||
-                               ((priority == 0) && (phase & PHASE_NON_UDP))
-                        ) && (queue_len(queues[priority]) == 0)) {
+       // Process synchronously unless there may exist requests that has to be processed first
+       if (((priority == 0) || (priority == PRIORITY_UDP)) && (queue_len(queues[priority]) == 0) &&
+                       phase_try_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP)) {
                VERBOSE_LOG("    CONTINUE\n");
-               phase_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP);
-               phase_accounting = true;
                return protolayer_continue(ctx);
        }
 
@@ -590,10 +577,6 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap(
                enum protolayer_event_type event, void **baton,
                struct session2 *session, void *sess_data)
 {
-       if ((event == PROTOLAYER_EVENT_EOF) || (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT)) {
-               // disable accounting only for events that cannot occur during incoming data processing
-               phase_accounting = false;
-       }
        if (!defer || !session->stream || session->outgoing)
                return PROTOLAYER_EVENT_PROPAGATE;
 
@@ -620,25 +603,26 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap(
 /// Idle: continue processing deferred requests.
 static void defer_queues_idle(uv_idle_t *handle)
 {
-       kr_assert(waiting_requests > 0);
        VERBOSE_LOG("IDLE\n");
-       VERBOSE_LOG("  %d waiting\n", waiting_requests);
-       defer_sample_start(NULL);
-       uint64_t idle_stamp = defer_sample_state.stamp;
-       do {
-               phase_accounting = true;
-               process_single_deferred();
-               defer_sample_restart();
-       } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
-       phase_accounting = false;
-       defer_sample_stop(NULL, true);
-       cleanup_queues();
-       udp_queue_send_all();
+       if (waiting_requests > 0) {
+               VERBOSE_LOG("  %d waiting\n", waiting_requests);
+               defer_sample_start(NULL);
+               uint64_t idle_stamp = defer_sample_state.stamp;
+               do {
+                       process_single_deferred();
+                       defer_sample_restart();
+               } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
+               defer_sample_stop(NULL, true);
+               cleanup_queues();
+               udp_queue_send_all();
+       }
 
        if (waiting_requests > 0) {
                VERBOSE_LOG("  %d waiting\n", waiting_requests);
        } else {
-               phase_set(PHASE_ANY);
+               phase_reset(PHASE_NONE);
+               VERBOSE_LOG("  deactivate idle\n");
+               uv_idle_stop(&idle_handle);
        }
        VERBOSE_LOG("POLL\n");
 }