From: Lukáš Ondráček Date: Mon, 14 Oct 2024 16:36:15 +0000 (+0200) Subject: daemon/defer: cleanup heads of queues after idle X-Git-Tag: v6.0.9~1^2~17 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=refs%2Fenvironments%2Fdocs-develop-rrl-8r8r8r%2Fdeployments%2F5334;p=thirdparty%2Fknot-resolver.git daemon/defer: cleanup heads of queues after idle --- diff --git a/daemon/defer.c b/daemon/defer.c index 2d78786ad..b0375ceec 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -210,6 +210,21 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo } } +/// Pop and return query from the specified queue, deactivate idle if not needed. +static inline struct protolayer_iter_ctx *pop_query_queue(int priority) +{ + kr_assert(queue_len(queues[priority]) > 0); + struct protolayer_iter_ctx *ctx = queue_head(queues[priority]); + queue_pop(queues[priority]); + if (--waiting_requests <= 0) { + kr_assert(waiting_requests == 0); + uv_idle_stop(&idle_handle); + VERBOSE_LOG(" deactivating idle\n"); + } + return ctx; +} + + /// Pop and return the query with the highest priority, UDP or non-UDP based on current phase, /// deactivate idle if not needed. static inline struct protolayer_iter_ctx *pop_query(void) @@ -238,17 +253,28 @@ static inline struct protolayer_iter_ctx *pop_query(void) i = PRIORITY_UDP; } - struct protolayer_iter_ctx *ctx = queue_head(queues[i]); - queue_pop(queues[i]); - if (--waiting_requests <= 0) { - kr_assert(waiting_requests == 0); - uv_idle_stop(&idle_handle); - VERBOSE_LOG(" deactivating idle\n"); - } - return ctx; + return pop_query_queue(i); } +// Break the given query; for streams break also all follow-up queries and force-close the stream. +static inline void break_query(struct protolayer_iter_ctx *ctx, int err) +{ + if (ctx->session->stream) { + struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); + if (!ctx->session->closing) { + session2_force_close(ctx->session); // session is not freed here as iter contexts exist + } + queue_pop(sdata->queue); + while (queue_len(sdata->queue) > 0) { + protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist + ctx = queue_head(sdata->queue); + queue_pop(sdata->queue); + } + } + protolayer_break(ctx, kr_error(err)); +} + /// Process a single deferred query (or defer again) if there is any. /// Time accounting should have been just started, the stamp is used, accounted address is set. static inline void process_single_deferred(void) { @@ -269,30 +295,13 @@ static inline void process_single_deferred(void) { if (ctx->session->closing) { VERBOSE_LOG(" BREAK (session is closing)\n"); - if (ctx->session->stream) { - queue_pop(sdata->queue); - while (queue_len(sdata->queue) > 0) { - protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist - ctx = queue_head(sdata->queue); - queue_pop(sdata->queue); - } - } - protolayer_break(ctx, kr_error(ECANCELED)); + break_query(ctx, ECANCELED); return; } if (age_ns >= REQ_TIMEOUT) { VERBOSE_LOG(" BREAK (timeout)\n"); - if (ctx->session->stream) { - while (queue_len(sdata->queue) > 0) { - ctx = queue_head(sdata->queue); - queue_pop(sdata->queue); - protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet - } - session2_force_close(ctx->session); - } else { - protolayer_break(ctx, kr_error(ETIME)); - } + break_query(ctx, ETIME); return; } @@ -316,6 +325,25 @@ static inline void process_single_deferred(void) { protolayer_continue(ctx); } +/// Break expired requests at the beginning of queues, uses current stamp. +static inline void cleanup_queues(void) { + for (int i = 0; i < QUEUES_CNT; i++) { + int cnt = 0; + while (queue_len(queues[i]) > 0) { + struct protolayer_iter_ctx *ctx = queue_head(queues[i]); + struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); + uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp; + if (age_ns < REQ_TIMEOUT) break; + pop_query_queue(i); + break_query(ctx, ETIME); + cnt++; + } + if (cnt > 0) { + VERBOSE_LOG(" BREAK %d queries from %d\n", cnt, i); + } + } +} + /// Unwrap: defer or process the query synchronously. /// Time accounting should have been started, the stamp is used, accounted address is set. static enum protolayer_iter_cb_result pl_defer_unwrap( @@ -372,6 +400,7 @@ static void defer_queues_idle(uv_idle_t *handle) { process_single_deferred(); defer_sample_restart(); } + cleanup_queues(); defer_sample_stop(); // TODO skip calling and use just restart elsewhere? udp_queue_send_all();