From e74ac39a088b9bb8729d18526e19660c1227508b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Luk=C3=A1=C5=A1=20Ondr=C3=A1=C4=8Dek?= Date: Mon, 6 Jan 2025 18:45:54 +0100 Subject: [PATCH] daemon/defer: fix memory consumption limit check --- daemon/defer.c | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/daemon/defer.c b/daemon/defer.c index 1336b3204..0e45a2520 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -439,8 +439,8 @@ static inline void process_single_deferred(void) while (time_now - log_time_orig + 1024 >= defer->log_period + 1024) { if (atomic_compare_exchange_weak_explicit(&defer->log_time, &log_time_orig, time_now, memory_order_relaxed, memory_order_relaxed)) { - kr_log_notice(DEFER, "Data from %s too long in queue, dropping.\n", - kr_straddr(ctx->comm->src_addr)); + kr_log_notice(DEFER, "Data from %s too long in queue, dropping. (%0.3f MiB in queues)\n", + kr_straddr(ctx->comm->src_addr), waiting_requests_size / 1024.0 / 1024.0); break; } } @@ -492,6 +492,23 @@ static inline void process_single_deferred(void) } } +/// Process as many deferred requests as needed to get memory consumption under limit. +static inline void process_deferred_over_size_limit(void) { + if (waiting_requests_size > MAX_WAITING_REQS_SIZE) { + defer_sample_state_t prev_sample_state; + defer_sample_start(&prev_sample_state); + phase_accounting = true; + do { + process_single_deferred(); // possibly defers again without decreasing waiting_requests_size + // If the unwrapped query is to be processed here, + // it is the last iteration and the query is processed after returning. + defer_sample_restart(); + } while (waiting_requests_size > MAX_WAITING_REQS_SIZE); + phase_accounting = false; + defer_sample_stop(&prev_sample_state, true); + } +} + /// Break expired requests at the beginning of queues, uses current stamp. static inline void cleanup_queues(void) { @@ -535,6 +552,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false); // payload counted in session wire buffer VERBOSE_LOG(" PUSH as follow-up\n"); + process_deferred_over_size_limit(); return protolayer_async(); } @@ -560,20 +578,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, !ctx->session->stream); // for stream, payload is counted in session wire buffer - if (waiting_requests_size > MAX_WAITING_REQS_SIZE) { - defer_sample_state_t prev_sample_state; - defer_sample_start(&prev_sample_state); - phase_accounting = true; - do { - process_single_deferred(); // possibly defers again without decreasing waiting_requests_size - // If the unwrapped query is to be processed here, - // it is the last iteration and the query is processed after returning. - defer_sample_restart(); - } while (waiting_requests_size > MAX_WAITING_REQS_SIZE); - phase_accounting = false; - defer_sample_stop(&prev_sample_state, true); - } - + process_deferred_over_size_limit(); return protolayer_async(); } -- 2.47.2