]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: cleanup heads of queues after idle docs-develop-rrl-8r8r8r/deployments/5334
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 14 Oct 2024 16:36:15 +0000 (18:36 +0200)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 14 Oct 2024 16:36:15 +0000 (18:36 +0200)
daemon/defer.c

index 2d78786adc82f4d9a4fb1332aaf159f807a3a722..b0375ceec1b4cd5cffdce394c96132950d229449 100644 (file)
@@ -210,6 +210,21 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo
        }
 }
 
+/// Pop and return query from the specified queue, deactivate idle if not needed.
+static inline struct protolayer_iter_ctx *pop_query_queue(int priority)
+{
+       kr_assert(queue_len(queues[priority]) > 0);
+       struct protolayer_iter_ctx *ctx = queue_head(queues[priority]);
+       queue_pop(queues[priority]);
+       if (--waiting_requests <= 0) {
+               kr_assert(waiting_requests == 0);
+               uv_idle_stop(&idle_handle);
+               VERBOSE_LOG("  deactivating idle\n");
+       }
+       return ctx;
+}
+
+
 /// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
 /// deactivate idle if not needed.
 static inline struct protolayer_iter_ctx *pop_query(void)
@@ -238,17 +253,28 @@ static inline struct protolayer_iter_ctx *pop_query(void)
                i = PRIORITY_UDP;
        }
 
-       struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
-       queue_pop(queues[i]);
-       if (--waiting_requests <= 0) {
-               kr_assert(waiting_requests == 0);
-               uv_idle_stop(&idle_handle);
-               VERBOSE_LOG("  deactivating idle\n");
-       }
-       return ctx;
+       return pop_query_queue(i);
 }
 
 
+// Break the given query; for streams break also all follow-up queries and force-close the stream.
+static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
+{
+       if (ctx->session->stream) {
+               struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+               if (!ctx->session->closing) {
+                       session2_force_close(ctx->session); // session is not freed here as iter contexts exist
+               }
+               queue_pop(sdata->queue);
+               while (queue_len(sdata->queue) > 0) {
+                       protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
+                       ctx = queue_head(sdata->queue);
+                       queue_pop(sdata->queue);
+               }
+       }
+       protolayer_break(ctx, kr_error(err));
+}
+
 /// Process a single deferred query (or defer again) if there is any.
 /// Time accounting should have been just started, the stamp is used, accounted address is set.
 static inline void process_single_deferred(void) {
@@ -269,30 +295,13 @@ static inline void process_single_deferred(void) {
 
        if (ctx->session->closing) {
                VERBOSE_LOG("    BREAK (session is closing)\n");
-               if (ctx->session->stream) {
-                       queue_pop(sdata->queue);
-                       while (queue_len(sdata->queue) > 0) {
-                               protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist
-                               ctx = queue_head(sdata->queue);
-                               queue_pop(sdata->queue);
-                       }
-               }
-               protolayer_break(ctx, kr_error(ECANCELED));
+               break_query(ctx, ECANCELED);
                return;
        }
 
        if (age_ns >= REQ_TIMEOUT) {
                VERBOSE_LOG("    BREAK (timeout)\n");
-               if (ctx->session->stream) {
-                       while (queue_len(sdata->queue) > 0) {
-                               ctx = queue_head(sdata->queue);
-                               queue_pop(sdata->queue);
-                               protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet
-                       }
-                       session2_force_close(ctx->session);
-               } else {
-                       protolayer_break(ctx, kr_error(ETIME));
-               }
+               break_query(ctx, ETIME);
                return;
        }
 
@@ -316,6 +325,25 @@ static inline void process_single_deferred(void) {
        protolayer_continue(ctx);
 }
 
+/// Break expired requests at the beginning of queues, uses current stamp.
+static inline void cleanup_queues(void) {
+       for (int i = 0; i < QUEUES_CNT; i++) {
+               int cnt = 0;
+               while (queue_len(queues[i]) > 0) {
+                       struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
+                       struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+                       uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
+                       if (age_ns < REQ_TIMEOUT) break;
+                       pop_query_queue(i);
+                       break_query(ctx, ETIME);
+                       cnt++;
+               }
+               if (cnt > 0) {
+                       VERBOSE_LOG("  BREAK %d queries from %d\n", cnt, i);
+               }
+       }
+}
+
 /// Unwrap: defer or process the query synchronously.
 /// Time accounting should have been started, the stamp is used, accounted address is set.
 static enum protolayer_iter_cb_result pl_defer_unwrap(
@@ -372,6 +400,7 @@ static void defer_queues_idle(uv_idle_t *handle) {
                process_single_deferred();
                defer_sample_restart();
        }
+       cleanup_queues();
        defer_sample_stop();  // TODO skip calling and use just restart elsewhere?
        udp_queue_send_all();