while (time_now - log_time_orig + 1024 >= defer->log_period + 1024) {
if (atomic_compare_exchange_weak_explicit(&defer->log_time, &log_time_orig, time_now,
memory_order_relaxed, memory_order_relaxed)) {
- kr_log_notice(DEFER, "Data from %s too long in queue, dropping.\n",
- kr_straddr(ctx->comm->src_addr));
+ kr_log_notice(DEFER, "Data from %s too long in queue, dropping. (%0.3f MiB in queues)\n",
+ kr_straddr(ctx->comm->src_addr), waiting_requests_size / 1024.0 / 1024.0);
break;
}
}
}
}
+/// Process as many deferred requests as needed to get memory consumption under limit.
+static inline void process_deferred_over_size_limit(void) {
+ if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
+ defer_sample_state_t prev_sample_state;
+ defer_sample_start(&prev_sample_state);
+ phase_accounting = true;
+ do {
+ process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
+ // If the unwrapped query is to be processed here,
+ // it is the last iteration and the query is processed after returning.
+ defer_sample_restart();
+ } while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
+ phase_accounting = false;
+ defer_sample_stop(&prev_sample_state, true);
+ }
+}
+
/// Break expired requests at the beginning of queues, uses current stamp.
static inline void cleanup_queues(void)
{
waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false);
// payload counted in session wire buffer
VERBOSE_LOG(" PUSH as follow-up\n");
+ process_deferred_over_size_limit();
return protolayer_async();
}
waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
// for stream, payload is counted in session wire buffer
- if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
- defer_sample_state_t prev_sample_state;
- defer_sample_start(&prev_sample_state);
- phase_accounting = true;
- do {
- process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
- // If the unwrapped query is to be processed here,
- // it is the last iteration and the query is processed after returning.
- defer_sample_restart();
- } while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
- phase_accounting = false;
- defer_sample_stop(&prev_sample_state, true);
- }
-
+ process_deferred_over_size_limit();
return protolayer_async();
}