From: Vsevolod Stakhov Date: Sat, 18 Apr 2026 09:49:41 +0000 (+0100) Subject: [Feature] task: show what stalled at timeout X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a5e92ae110b32bece651546f472697afbefe52e8;p=thirdparty%2Frspamd.git [Feature] task: show what stalled at timeout When a task hits the timeout the log now includes a grouped summary of the still-pending async events (DNS / Redis / Lua HTTP / fuzzy check / ...) and the list of symbols that started but have not finished. This gives operators an immediate picture of which subsystem or rule stalled the scan, instead of only seeing "forced processing". The per-event "forced removed event on destroy" lines that previously printed at info level during rspamd_session_cleanup are demoted to debug — the new summary replaces them and keeps the log compact. --- diff --git a/src/libserver/async_session.c b/src/libserver/async_session.c index 9f87ffd33c..2df6ca664b 100644 --- a/src/libserver/async_session.c +++ b/src/libserver/async_session.c @@ -265,32 +265,19 @@ void rspamd_session_cleanup(struct rspamd_async_session *session, bool forced_cl int ret; if (ev->fin != NULL) { - if (forced_cleanup) { - msg_info_session("forced removed event on destroy: %p, subsystem: %s, scheduled from: %s", - ev->user_data, - ev->subsystem, - ev->event_source); - } - else { - msg_debug_session("removed event on destroy: %p, subsystem: %s", - ev->user_data, - ev->subsystem); - } + msg_debug_session("%sremoved event on destroy: %p, subsystem: %s, scheduled from: %s", + forced_cleanup ? "forced " : "", + ev->user_data, + ev->subsystem, + ev->event_source); ev->fin(ev->user_data); } else { - if (forced_cleanup) { - msg_info_session("NOT forced removed event on destroy - uncancellable: " - "%p, subsystem: %s, scheduled from: %s", - ev->user_data, - ev->subsystem, - ev->event_source); - } - else { - msg_debug_session("NOT removed event on destroy - uncancellable: %p, subsystem: %s", - ev->user_data, - ev->subsystem); - } + msg_debug_session("NOT %sremoved event on destroy - uncancellable: %p, subsystem: %s, scheduled from: %s", + forced_cleanup ? "forced " : "", + ev->user_data, + ev->subsystem, + ev->event_source); /* Assume an event is uncancellable, move it to a new hash table */ kh_put(rspamd_events_hash, uncancellable_events, ev, &ret); } @@ -298,8 +285,9 @@ void rspamd_session_cleanup(struct rspamd_async_session *session, bool forced_cl kh_destroy(rspamd_events_hash, session->events); session->events = uncancellable_events; - if (forced_cleanup) { - msg_info_session("pending %d uncancellable events", kh_size(uncancellable_events)); + if (forced_cleanup && kh_size(uncancellable_events) > 0) { + msg_info_session("pending %d uncancellable events after forced cleanup", + kh_size(uncancellable_events)); } else { msg_debug_session("pending %d uncancellable events", kh_size(uncancellable_events)); @@ -347,6 +335,151 @@ unsigned int rspamd_session_events_pending(struct rspamd_async_session *session) return npending; } +#define RSPAMD_DUMP_MAX_SUBSYSTEMS 16 +#define RSPAMD_DUMP_MAX_SOURCES_PER_SUB 4 + +void rspamd_session_describe_pending(struct rspamd_async_session *session, + GString **summary_out, + GString **details_out) +{ + struct rspamd_async_event *ev; + GString *summary, *details; + unsigned int total = 0; + unsigned int n_subsystems = 0; + unsigned int overflow_subsystems = 0; + unsigned int i, j; + + struct dump_source { + const char *source; + unsigned int count; + }; + struct dump_subsystem { + const char *name; + unsigned int count; + unsigned int distinct_sources; + unsigned int overflow_sources; + struct dump_source sources[RSPAMD_DUMP_MAX_SOURCES_PER_SUB]; + } subsystems[RSPAMD_DUMP_MAX_SUBSYSTEMS]; + + if (summary_out) { + *summary_out = NULL; + } + if (details_out) { + *details_out = NULL; + } + + if (session == NULL || kh_size(session->events) == 0) { + return; + } + + kh_foreach_key(session->events, ev, { + const char *sub = ev->subsystem ? ev->subsystem : "(null)"; + const char *src = ev->event_source ? ev->event_source : "(null)"; + struct dump_subsystem *s = NULL; + struct dump_source *src_e = NULL; + + total++; + + for (i = 0; i < n_subsystems; i++) { + if (strcmp(subsystems[i].name, sub) == 0) { + s = &subsystems[i]; + break; + } + } + + if (s == NULL) { + if (n_subsystems < RSPAMD_DUMP_MAX_SUBSYSTEMS) { + s = &subsystems[n_subsystems++]; + s->name = sub; + s->count = 0; + s->distinct_sources = 0; + s->overflow_sources = 0; + } + else { + overflow_subsystems++; + } + } + + if (s != NULL) { + s->count++; + + for (j = 0; j < s->distinct_sources; j++) { + if (strcmp(s->sources[j].source, src) == 0) { + src_e = &s->sources[j]; + break; + } + } + + if (src_e == NULL) { + if (s->distinct_sources < RSPAMD_DUMP_MAX_SOURCES_PER_SUB) { + src_e = &s->sources[s->distinct_sources++]; + src_e->source = src; + src_e->count = 0; + } + else { + s->overflow_sources++; + } + } + + if (src_e != NULL) { + src_e->count++; + } + } + }); + + if (total == 0) { + return; + } + + summary = g_string_sized_new(128); + rspamd_printf_gstring(summary, "total=%ud; by subsystem: ", total); + for (i = 0; i < n_subsystems; i++) { + if (i > 0) { + g_string_append(summary, ", "); + } + rspamd_printf_gstring(summary, "%s=%ud", + subsystems[i].name, subsystems[i].count); + } + if (overflow_subsystems > 0) { + rspamd_printf_gstring(summary, ", (+%ud more subsystems)", + overflow_subsystems); + } + + details = g_string_sized_new(256); + for (i = 0; i < n_subsystems; i++) { + if (i > 0) { + g_string_append(details, "; "); + } + rspamd_printf_gstring(details, "[%s:", subsystems[i].name); + for (j = 0; j < subsystems[i].distinct_sources; j++) { + rspamd_printf_gstring(details, " %s x%ud", + subsystems[i].sources[j].source, + subsystems[i].sources[j].count); + } + if (subsystems[i].overflow_sources > 0) { + rspamd_printf_gstring(details, " (+%ud more sources)", + subsystems[i].overflow_sources); + } + g_string_append_c(details, ']'); + } + + if (summary_out) { + *summary_out = summary; + } + else { + g_string_free(summary, TRUE); + } + if (details_out) { + *details_out = details; + } + else { + g_string_free(details, TRUE); + } +} + +#undef RSPAMD_DUMP_MAX_SUBSYSTEMS +#undef RSPAMD_DUMP_MAX_SOURCES_PER_SUB + rspamd_mempool_t * rspamd_session_mempool(struct rspamd_async_session *session) { diff --git a/src/libserver/async_session.h b/src/libserver/async_session.h index e4b9feb080..02aa9a5cdc 100644 --- a/src/libserver/async_session.h +++ b/src/libserver/async_session.h @@ -106,6 +106,23 @@ gboolean rspamd_session_pending(struct rspamd_async_session *session); */ unsigned int rspamd_session_events_pending(struct rspamd_async_session *session); +/** + * Builds human-readable descriptions of currently-pending async events grouped + * by subsystem. Produces two newly-allocated GStrings written to the out-params: + * - *summary_out : compact counts, e.g. "total=10; by subsystem: rspamd dns=7, fuzzy_check=3" + * - *details_out : distinct call-sites per subsystem, e.g. "[rspamd dns: /path/a.c:45 x5, /path/b.c:12 x2]; [fuzzy_check: /path/c.c:88 x3]" + * The caller owns both strings and MUST free them with g_string_free(..., TRUE). + * If there are no pending events, both out-params are set to NULL. + * Intended to be called from timeout handlers so the caller can log with the + * proper task module tag (msg_info_task). + * @param session session to dump + * @param summary_out receives the summary GString (may be NULL on return) + * @param details_out receives the detail GString (may be NULL on return) + */ +void rspamd_session_describe_pending(struct rspamd_async_session *session, + GString **summary_out, + GString **details_out); + /** * Returns TRUE if an async session is currently destroying diff --git a/src/libserver/rspamd_symcache.h b/src/libserver/rspamd_symcache.h index 2d63016d9a..06e265076c 100644 --- a/src/libserver/rspamd_symcache.h +++ b/src/libserver/rspamd_symcache.h @@ -544,6 +544,17 @@ rspamd_symcache_item_stat(struct rspamd_symcache_item *item); */ void rspamd_symcache_enable_profile(struct rspamd_task *task); +/** + * Builds a human-readable description of symbols that have been started but + * have not yet finished (i.e. are blocked on async events). Intended for use + * from timeout handlers so operators can see which rules stalled the task. + * Returns NULL if there are no inflight symbols. Caller owns the returned + * GString and MUST free it with g_string_free(..., TRUE). + * @param task + * @return + */ +GString *rspamd_symcache_describe_inflight_symbols(struct rspamd_task *task); + struct rspamd_symcache_timeout_item { double timeout; const struct rspamd_symcache_item *item; diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx index a185305f9b..886246e8a1 100644 --- a/src/libserver/symcache/symcache_c.cxx +++ b/src/libserver/symcache/symcache_c.cxx @@ -327,6 +327,18 @@ rspamd_symcache_item_stat(struct rspamd_symcache_item *item) return real_item->st; } +GString * +rspamd_symcache_describe_inflight_symbols(struct rspamd_task *task) +{ + if (task == nullptr || task->symcache_runtime == nullptr) { + return nullptr; + } + + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + return cache_runtime->describe_inflight_symbols(); +} + void rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache, const char *symbol, ucl_object_t *this_sym_ucl) diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx index c608298ea4..73f93b562d 100644 --- a/src/libserver/symcache/symcache_runtime.cxx +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -974,4 +974,56 @@ auto symcache_runtime::get_item_by_dynamic_item(cache_dynamic_item *dyn_item) co return nullptr; } +auto symcache_runtime::describe_inflight_symbols() const -> GString * +{ + GString *out = nullptr; + unsigned int nshown = 0; + unsigned int nsuppressed = 0; + constexpr unsigned int MAX_ITEMS_SHOWN = 32; + + if (items_inflight == 0) { + return nullptr; + } + + for (auto [i, item]: rspamd::enumerate(order->d)) { + auto *dyn_item = &dynamic_items[i]; + + if (dyn_item->status != cache_item_status::started) { + continue; + } + + if (nshown >= MAX_ITEMS_SHOWN) { + nsuppressed++; + continue; + } + + if (out == nullptr) { + out = g_string_sized_new(128); + } + else { + g_string_append(out, ", "); + } + + const auto &name = item->get_name(); + if (dyn_item->start_msec > 0) { + rspamd_printf_gstring(out, "%*s (async=%ud, started=%ud ms)", + (int) name.size(), name.data(), + (unsigned int) dyn_item->async_events, + (unsigned int) dyn_item->start_msec); + } + else { + rspamd_printf_gstring(out, "%*s (async=%ud)", + (int) name.size(), name.data(), + (unsigned int) dyn_item->async_events); + } + nshown++; + } + + if (nsuppressed > 0 && out != nullptr) { + rspamd_printf_gstring(out, " (+%ud more)", nsuppressed); + } + + return out; +} + }// namespace rspamd::symcache diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx index c8a9f7440c..a8f1a4d3c7 100644 --- a/src/libserver/symcache/symcache_runtime.hxx +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -236,6 +236,16 @@ public: slow_status = slow_status::disabled; } } + + /** + * Builds a human-readable description of symbols that have been started but + * have not yet finished (i.e. are waiting on async events: DNS, Redis, HTTP, + * etc.). Intended to be used from timeout handlers to surface which rules + * stalled the task. + * @return newly allocated GString (caller must g_string_free) or nullptr if + * no inflight symbols + */ + auto describe_inflight_symbols() const -> GString *; }; diff --git a/src/libserver/task.c b/src/libserver/task.c index 768d306c30..26fed92756 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -1936,6 +1936,29 @@ rspamd_task_stage_name(enum rspamd_task_stage stg) return ret; } +static void +rspamd_task_timeout_log_state(struct rspamd_task *task) +{ + GString *evt_summary = NULL, *evt_details = NULL, *inflight = NULL; + + rspamd_session_describe_pending(task->s, &evt_summary, &evt_details); + + if (evt_summary != NULL) { + msg_info_task("pending async events at timeout: %v", evt_summary); + g_string_free(evt_summary, TRUE); + } + if (evt_details != NULL) { + msg_info_task("pending async event sources: %v", evt_details); + g_string_free(evt_details, TRUE); + } + + inflight = rspamd_symcache_describe_inflight_symbols(task); + if (inflight != NULL) { + msg_info_task("inflight symbols at timeout: %v", inflight); + g_string_free(inflight, TRUE); + } +} + void rspamd_task_timeout(EV_P_ ev_timer *w, int revents) { struct rspamd_task *task = (struct rspamd_task *) w->data; @@ -1946,6 +1969,7 @@ void rspamd_task_timeout(EV_P_ ev_timer *w, int revents) "forced processing", ev_now(task->event_loop) - task->task_timestamp, w->repeat); + rspamd_task_timeout_log_state(task); if (task->cfg->soft_reject_on_timeout) { struct rspamd_action *action, *soft_reject; @@ -1975,6 +1999,7 @@ void rspamd_task_timeout(EV_P_ ev_timer *w, int revents) /* Postprocessing timeout */ msg_info_task("post-processing of task time out: %.1f second spent; forced processing", ev_now(task->event_loop) - task->task_timestamp); + rspamd_task_timeout_log_state(task); if (task->cfg->soft_reject_on_timeout) { struct rspamd_action *action, *soft_reject;