]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] task: show what stalled at timeout
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 18 Apr 2026 09:49:41 +0000 (10:49 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 18 Apr 2026 09:49:41 +0000 (10:49 +0100)
When a task hits the timeout the log now includes a grouped summary of
the still-pending async events (DNS / Redis / Lua HTTP / fuzzy check /
...) and the list of symbols that started but have not finished. This
gives operators an immediate picture of which subsystem or rule stalled
the scan, instead of only seeing "forced processing".

The per-event "forced removed event on destroy" lines that previously
printed at info level during rspamd_session_cleanup are demoted to
debug — the new summary replaces them and keeps the log compact.

src/libserver/async_session.c
src/libserver/async_session.h
src/libserver/rspamd_symcache.h
src/libserver/symcache/symcache_c.cxx
src/libserver/symcache/symcache_runtime.cxx
src/libserver/symcache/symcache_runtime.hxx
src/libserver/task.c

index 9f87ffd33cf98945941aef4298c41cf7e144b69b..2df6ca664b9c88f89164caba0748c898d7363793 100644 (file)
@@ -265,32 +265,19 @@ void rspamd_session_cleanup(struct rspamd_async_session *session, bool forced_cl
                int ret;
 
                if (ev->fin != NULL) {
-                       if (forced_cleanup) {
-                               msg_info_session("forced removed event on destroy: %p, subsystem: %s, scheduled from: %s",
-                                                                ev->user_data,
-                                                                ev->subsystem,
-                                                                ev->event_source);
-                       }
-                       else {
-                               msg_debug_session("removed event on destroy: %p, subsystem: %s",
-                                                                 ev->user_data,
-                                                                 ev->subsystem);
-                       }
+                       msg_debug_session("%sremoved event on destroy: %p, subsystem: %s, scheduled from: %s",
+                                                         forced_cleanup ? "forced " : "",
+                                                         ev->user_data,
+                                                         ev->subsystem,
+                                                         ev->event_source);
                        ev->fin(ev->user_data);
                }
                else {
-                       if (forced_cleanup) {
-                               msg_info_session("NOT forced removed event on destroy - uncancellable: "
-                                                                "%p, subsystem: %s, scheduled from: %s",
-                                                                ev->user_data,
-                                                                ev->subsystem,
-                                                                ev->event_source);
-                       }
-                       else {
-                               msg_debug_session("NOT removed event on destroy - uncancellable: %p, subsystem: %s",
-                                                                 ev->user_data,
-                                                                 ev->subsystem);
-                       }
+                       msg_debug_session("NOT %sremoved event on destroy - uncancellable: %p, subsystem: %s, scheduled from: %s",
+                                                         forced_cleanup ? "forced " : "",
+                                                         ev->user_data,
+                                                         ev->subsystem,
+                                                         ev->event_source);
                        /* Assume an event is uncancellable, move it to a new hash table */
                        kh_put(rspamd_events_hash, uncancellable_events, ev, &ret);
                }
@@ -298,8 +285,9 @@ void rspamd_session_cleanup(struct rspamd_async_session *session, bool forced_cl
 
        kh_destroy(rspamd_events_hash, session->events);
        session->events = uncancellable_events;
-       if (forced_cleanup) {
-               msg_info_session("pending %d uncancellable events", kh_size(uncancellable_events));
+       if (forced_cleanup && kh_size(uncancellable_events) > 0) {
+               msg_info_session("pending %d uncancellable events after forced cleanup",
+                                                kh_size(uncancellable_events));
        }
        else {
                msg_debug_session("pending %d uncancellable events", kh_size(uncancellable_events));
@@ -347,6 +335,151 @@ unsigned int rspamd_session_events_pending(struct rspamd_async_session *session)
        return npending;
 }
 
+#define RSPAMD_DUMP_MAX_SUBSYSTEMS 16
+#define RSPAMD_DUMP_MAX_SOURCES_PER_SUB 4
+
+void rspamd_session_describe_pending(struct rspamd_async_session *session,
+                                                                        GString **summary_out,
+                                                                        GString **details_out)
+{
+       struct rspamd_async_event *ev;
+       GString *summary, *details;
+       unsigned int total = 0;
+       unsigned int n_subsystems = 0;
+       unsigned int overflow_subsystems = 0;
+       unsigned int i, j;
+
+       struct dump_source {
+               const char *source;
+               unsigned int count;
+       };
+       struct dump_subsystem {
+               const char *name;
+               unsigned int count;
+               unsigned int distinct_sources;
+               unsigned int overflow_sources;
+               struct dump_source sources[RSPAMD_DUMP_MAX_SOURCES_PER_SUB];
+       } subsystems[RSPAMD_DUMP_MAX_SUBSYSTEMS];
+
+       if (summary_out) {
+               *summary_out = NULL;
+       }
+       if (details_out) {
+               *details_out = NULL;
+       }
+
+       if (session == NULL || kh_size(session->events) == 0) {
+               return;
+       }
+
+       kh_foreach_key(session->events, ev, {
+               const char *sub = ev->subsystem ? ev->subsystem : "(null)";
+               const char *src = ev->event_source ? ev->event_source : "(null)";
+               struct dump_subsystem *s = NULL;
+               struct dump_source *src_e = NULL;
+
+               total++;
+
+               for (i = 0; i < n_subsystems; i++) {
+                       if (strcmp(subsystems[i].name, sub) == 0) {
+                               s = &subsystems[i];
+                               break;
+                       }
+               }
+
+               if (s == NULL) {
+                       if (n_subsystems < RSPAMD_DUMP_MAX_SUBSYSTEMS) {
+                               s = &subsystems[n_subsystems++];
+                               s->name = sub;
+                               s->count = 0;
+                               s->distinct_sources = 0;
+                               s->overflow_sources = 0;
+                       }
+                       else {
+                               overflow_subsystems++;
+                       }
+               }
+
+               if (s != NULL) {
+                       s->count++;
+
+                       for (j = 0; j < s->distinct_sources; j++) {
+                               if (strcmp(s->sources[j].source, src) == 0) {
+                                       src_e = &s->sources[j];
+                                       break;
+                               }
+                       }
+
+                       if (src_e == NULL) {
+                               if (s->distinct_sources < RSPAMD_DUMP_MAX_SOURCES_PER_SUB) {
+                                       src_e = &s->sources[s->distinct_sources++];
+                                       src_e->source = src;
+                                       src_e->count = 0;
+                               }
+                               else {
+                                       s->overflow_sources++;
+                               }
+                       }
+
+                       if (src_e != NULL) {
+                               src_e->count++;
+                       }
+               }
+       });
+
+       if (total == 0) {
+               return;
+       }
+
+       summary = g_string_sized_new(128);
+       rspamd_printf_gstring(summary, "total=%ud; by subsystem: ", total);
+       for (i = 0; i < n_subsystems; i++) {
+               if (i > 0) {
+                       g_string_append(summary, ", ");
+               }
+               rspamd_printf_gstring(summary, "%s=%ud",
+                                                         subsystems[i].name, subsystems[i].count);
+       }
+       if (overflow_subsystems > 0) {
+               rspamd_printf_gstring(summary, ", (+%ud more subsystems)",
+                                                         overflow_subsystems);
+       }
+
+       details = g_string_sized_new(256);
+       for (i = 0; i < n_subsystems; i++) {
+               if (i > 0) {
+                       g_string_append(details, "; ");
+               }
+               rspamd_printf_gstring(details, "[%s:", subsystems[i].name);
+               for (j = 0; j < subsystems[i].distinct_sources; j++) {
+                       rspamd_printf_gstring(details, " %s x%ud",
+                                                                 subsystems[i].sources[j].source,
+                                                                 subsystems[i].sources[j].count);
+               }
+               if (subsystems[i].overflow_sources > 0) {
+                       rspamd_printf_gstring(details, " (+%ud more sources)",
+                                                                 subsystems[i].overflow_sources);
+               }
+               g_string_append_c(details, ']');
+       }
+
+       if (summary_out) {
+               *summary_out = summary;
+       }
+       else {
+               g_string_free(summary, TRUE);
+       }
+       if (details_out) {
+               *details_out = details;
+       }
+       else {
+               g_string_free(details, TRUE);
+       }
+}
+
+#undef RSPAMD_DUMP_MAX_SUBSYSTEMS
+#undef RSPAMD_DUMP_MAX_SOURCES_PER_SUB
+
 rspamd_mempool_t *
 rspamd_session_mempool(struct rspamd_async_session *session)
 {
index e4b9feb08031a0c5409ca305b6be2a8c072a7b53..02aa9a5cdcc5a84870e91ba8332d4ecaaead7997 100644 (file)
@@ -106,6 +106,23 @@ gboolean rspamd_session_pending(struct rspamd_async_session *session);
  */
 unsigned int rspamd_session_events_pending(struct rspamd_async_session *session);
 
+/**
+ * Builds human-readable descriptions of currently-pending async events grouped
+ * by subsystem. Produces two newly-allocated GStrings written to the out-params:
+ *   - *summary_out : compact counts, e.g. "total=10; by subsystem: rspamd dns=7, fuzzy_check=3"
+ *   - *details_out : distinct call-sites per subsystem, e.g. "[rspamd dns: /path/a.c:45 x5, /path/b.c:12 x2]; [fuzzy_check: /path/c.c:88 x3]"
+ * The caller owns both strings and MUST free them with g_string_free(..., TRUE).
+ * If there are no pending events, both out-params are set to NULL.
+ * Intended to be called from timeout handlers so the caller can log with the
+ * proper task module tag (msg_info_task).
+ * @param session session to dump
+ * @param summary_out receives the summary GString (may be NULL on return)
+ * @param details_out receives the detail GString (may be NULL on return)
+ */
+void rspamd_session_describe_pending(struct rspamd_async_session *session,
+                                                                        GString **summary_out,
+                                                                        GString **details_out);
+
 
 /**
  * Returns TRUE if an async session is currently destroying
index 2d63016d9acd98f185d766e8b5bc8faee315e46d..06e265076c0068651b4db14ad2270c9d2c64592f 100644 (file)
@@ -544,6 +544,17 @@ rspamd_symcache_item_stat(struct rspamd_symcache_item *item);
  */
 void rspamd_symcache_enable_profile(struct rspamd_task *task);
 
+/**
+ * Builds a human-readable description of symbols that have been started but
+ * have not yet finished (i.e. are blocked on async events). Intended for use
+ * from timeout handlers so operators can see which rules stalled the task.
+ * Returns NULL if there are no inflight symbols. Caller owns the returned
+ * GString and MUST free it with g_string_free(..., TRUE).
+ * @param task
+ * @return
+ */
+GString *rspamd_symcache_describe_inflight_symbols(struct rspamd_task *task);
+
 struct rspamd_symcache_timeout_item {
        double timeout;
        const struct rspamd_symcache_item *item;
index a185305f9b59ef151a2c6ce064ada5baa28cce40..886246e8a1c7a0c89e17ab3c4e993a9c90c9ccd2 100644 (file)
@@ -327,6 +327,18 @@ rspamd_symcache_item_stat(struct rspamd_symcache_item *item)
        return real_item->st;
 }
 
+GString *
+rspamd_symcache_describe_inflight_symbols(struct rspamd_task *task)
+{
+       if (task == nullptr || task->symcache_runtime == nullptr) {
+               return nullptr;
+       }
+
+       auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
+
+       return cache_runtime->describe_inflight_symbols();
+}
+
 void rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache,
                                                                                const char *symbol,
                                                                                ucl_object_t *this_sym_ucl)
index c608298ea4ebc606079dbaea6f21f860bec3e302..73f93b562da6f6268ef5ac596998ab899fa8c820 100644 (file)
@@ -974,4 +974,56 @@ auto symcache_runtime::get_item_by_dynamic_item(cache_dynamic_item *dyn_item) co
        return nullptr;
 }
 
+auto symcache_runtime::describe_inflight_symbols() const -> GString *
+{
+       GString *out = nullptr;
+       unsigned int nshown = 0;
+       unsigned int nsuppressed = 0;
+       constexpr unsigned int MAX_ITEMS_SHOWN = 32;
+
+       if (items_inflight == 0) {
+               return nullptr;
+       }
+
+       for (auto [i, item]: rspamd::enumerate(order->d)) {
+               auto *dyn_item = &dynamic_items[i];
+
+               if (dyn_item->status != cache_item_status::started) {
+                       continue;
+               }
+
+               if (nshown >= MAX_ITEMS_SHOWN) {
+                       nsuppressed++;
+                       continue;
+               }
+
+               if (out == nullptr) {
+                       out = g_string_sized_new(128);
+               }
+               else {
+                       g_string_append(out, ", ");
+               }
+
+               const auto &name = item->get_name();
+               if (dyn_item->start_msec > 0) {
+                       rspamd_printf_gstring(out, "%*s (async=%ud, started=%ud ms)",
+                                                                 (int) name.size(), name.data(),
+                                                                 (unsigned int) dyn_item->async_events,
+                                                                 (unsigned int) dyn_item->start_msec);
+               }
+               else {
+                       rspamd_printf_gstring(out, "%*s (async=%ud)",
+                                                                 (int) name.size(), name.data(),
+                                                                 (unsigned int) dyn_item->async_events);
+               }
+               nshown++;
+       }
+
+       if (nsuppressed > 0 && out != nullptr) {
+               rspamd_printf_gstring(out, " (+%ud more)", nsuppressed);
+       }
+
+       return out;
+}
+
 }// namespace rspamd::symcache
index c8a9f7440c97e9985f260a4934147779421d6ae8..a8f1a4d3c7aae0d58980e45ed22a512ccda69186 100644 (file)
@@ -236,6 +236,16 @@ public:
                        slow_status = slow_status::disabled;
                }
        }
+
+       /**
+        * Builds a human-readable description of symbols that have been started but
+        * have not yet finished (i.e. are waiting on async events: DNS, Redis, HTTP,
+        * etc.). Intended to be used from timeout handlers to surface which rules
+        * stalled the task.
+        * @return newly allocated GString (caller must g_string_free) or nullptr if
+        *         no inflight symbols
+        */
+       auto describe_inflight_symbols() const -> GString *;
 };
 
 
index 768d306c303c1bef743564e058c70962e8b65421..26fed927561c3ce981819b2f4dacc260d8d56388 100644 (file)
@@ -1936,6 +1936,29 @@ rspamd_task_stage_name(enum rspamd_task_stage stg)
        return ret;
 }
 
+static void
+rspamd_task_timeout_log_state(struct rspamd_task *task)
+{
+       GString *evt_summary = NULL, *evt_details = NULL, *inflight = NULL;
+
+       rspamd_session_describe_pending(task->s, &evt_summary, &evt_details);
+
+       if (evt_summary != NULL) {
+               msg_info_task("pending async events at timeout: %v", evt_summary);
+               g_string_free(evt_summary, TRUE);
+       }
+       if (evt_details != NULL) {
+               msg_info_task("pending async event sources: %v", evt_details);
+               g_string_free(evt_details, TRUE);
+       }
+
+       inflight = rspamd_symcache_describe_inflight_symbols(task);
+       if (inflight != NULL) {
+               msg_info_task("inflight symbols at timeout: %v", inflight);
+               g_string_free(inflight, TRUE);
+       }
+}
+
 void rspamd_task_timeout(EV_P_ ev_timer *w, int revents)
 {
        struct rspamd_task *task = (struct rspamd_task *) w->data;
@@ -1946,6 +1969,7 @@ void rspamd_task_timeout(EV_P_ ev_timer *w, int revents)
                                          "forced processing",
                                          ev_now(task->event_loop) - task->task_timestamp,
                                          w->repeat);
+               rspamd_task_timeout_log_state(task);
 
                if (task->cfg->soft_reject_on_timeout) {
                        struct rspamd_action *action, *soft_reject;
@@ -1975,6 +1999,7 @@ void rspamd_task_timeout(EV_P_ ev_timer *w, int revents)
                /* Postprocessing timeout */
                msg_info_task("post-processing of task time out: %.1f second spent; forced processing",
                                          ev_now(task->event_loop) - task->task_timestamp);
+               rspamd_task_timeout_log_state(task);
 
                if (task->cfg->soft_reject_on_timeout) {
                        struct rspamd_action *action, *soft_reject;