]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] task: route all task sessions through single constructor
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 18 Apr 2026 10:40:29 +0000 (11:40 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 18 Apr 2026 10:40:29 +0000 (11:40 +0100)
Only worker.c and 1 of 8 controller.c task sessions had the item-name
resolver wired, so timeout logs for scans from other entry points
(check, learn, scan-lua, worker_util startup, proxy, lua.task_from_mime)
lost symbol context entirely: the summary said only "rspamd lua http=1"
with no hint of which rule stalled.

Introduce rspamd_task_create_session() that wraps session_create and
set_item_name_resolver, and switch all 11 task-scoped session creation
sites to use it. The two controller sites with cbdata user_data (stats,
metrics) stay on rspamd_session_create — they do not execute symcache.

While here, collapse rspamd_session_describe_pending into a single
line that folds symbol/label info into each group key, e.g.
"total=5; rspamd dns[RBL_FOO]=3, rspamd lua http[X]=1". Previously the
symbol detail was a separate line that was silently omitted whenever
the resolver was not wired — exactly the breakage this fixes.

src/controller.c
src/libserver/async_session.c
src/libserver/async_session.h
src/libserver/task.c
src/libserver/task.h
src/libserver/worker_util.c
src/lua/lua_util.c
src/rspamd_proxy.c
src/worker.c

index 0dc3db5569a0b69ae57ed427b238149ae084bdca..db6b4604e00f5206529af3f22839e3083c23106c 100644 (file)
@@ -1602,13 +1602,10 @@ rspamd_controller_handle_lua_history(lua_State *L,
                                                                           session->pool, ctx->lang_det, ctx->event_loop, FALSE);
 
                                task->resolver = ctx->resolver;
-                               task->s = rspamd_session_create(session->pool,
-                                                                                               rspamd_controller_history_lua_fin_task,
-                                                                                               NULL,
-                                                                                               (event_finalizer_t) rspamd_task_free,
-                                                                                               task);
-                               rspamd_session_set_item_name_resolver(task->s,
-                                                                                                         rspamd_task_session_item_name_resolver);
+                               task->s = rspamd_task_create_session(task, session->pool,
+                                                                                                        rspamd_controller_history_lua_fin_task,
+                                                                                                        NULL,
+                                                                                                        (event_finalizer_t) rspamd_task_free);
                                task->fin_arg = conn_ent;
 
                                ptask = lua_newuserdata(L, sizeof(*ptask));
@@ -1953,11 +1950,10 @@ rspamd_controller_handle_lua(struct rspamd_http_connection_entry *conn_ent,
                                                   ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
-       task->s = rspamd_session_create(session->pool,
-                                                                       rspamd_controller_lua_fin_task,
-                                                                       NULL,
-                                                                       (event_finalizer_t) rspamd_task_free,
-                                                                       task);
+       task->s = rspamd_task_create_session(task, session->pool,
+                                                                                rspamd_controller_lua_fin_task,
+                                                                                NULL,
+                                                                                (event_finalizer_t) rspamd_task_free);
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
        task->sock = -1;
@@ -2152,11 +2148,10 @@ rspamd_controller_handle_learn_common(
        task->resolver = ctx->resolver;
        /* Manual learn: ensure errors are propagated (not auto-learn semantics) */
        task->flags &= ~RSPAMD_TASK_FLAG_LEARN_AUTO;
-       task->s = rspamd_session_create(session->pool,
-                                                                       rspamd_controller_learn_fin_task,
-                                                                       NULL,
-                                                                       (event_finalizer_t) rspamd_task_free,
-                                                                       task);
+       task->s = rspamd_task_create_session(task, session->pool,
+                                                                                rspamd_controller_learn_fin_task,
+                                                                                NULL,
+                                                                                (event_finalizer_t) rspamd_task_free);
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
        task->sock = -1;
@@ -2268,11 +2263,10 @@ rspamd_controller_handle_learnclass(
        task->resolver = ctx->resolver;
        /* Manual learn: ensure errors are propagated (not auto-learn semantics) */
        task->flags &= ~RSPAMD_TASK_FLAG_LEARN_AUTO;
-       task->s = rspamd_session_create(session->pool,
-                                                                       rspamd_controller_learn_fin_task,
-                                                                       NULL,
-                                                                       (event_finalizer_t) rspamd_task_free,
-                                                                       task);
+       task->s = rspamd_task_create_session(task, session->pool,
+                                                                                rspamd_controller_learn_fin_task,
+                                                                                NULL,
+                                                                                (event_finalizer_t) rspamd_task_free);
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
        task->sock = -1;
@@ -2339,11 +2333,10 @@ rspamd_controller_handle_scan(struct rspamd_http_connection_entry *conn_ent,
                                                   ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
-       task->s = rspamd_session_create(session->pool,
-                                                                       rspamd_controller_check_fin_task,
-                                                                       NULL,
-                                                                       (event_finalizer_t) rspamd_task_free,
-                                                                       task);
+       task->s = rspamd_task_create_session(task, session->pool,
+                                                                                rspamd_controller_check_fin_task,
+                                                                                NULL,
+                                                                                (event_finalizer_t) rspamd_task_free);
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
        task->sock = conn_ent->conn->fd;
@@ -3495,11 +3488,10 @@ rspamd_controller_handle_lua_plugin(struct rspamd_http_connection_entry *conn_en
                                                   ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
-       task->s = rspamd_session_create(session->pool,
-                                                                       rspamd_controller_lua_fin_task,
-                                                                       NULL,
-                                                                       (event_finalizer_t) rspamd_task_free,
-                                                                       task);
+       task->s = rspamd_task_create_session(task, session->pool,
+                                                                                rspamd_controller_lua_fin_task,
+                                                                                NULL,
+                                                                                (event_finalizer_t) rspamd_task_free);
        task->fin_arg = conn_ent;
        task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
        ;
index ccdd11bdafb287b3a4b92826545c1d600b3b5cdc..1835b6635be52f50d35bc10913bc454c514d7333 100644 (file)
@@ -369,182 +369,99 @@ rspamd_str_eq_nullable(const char *a, const char *b)
        return strcmp(a, b) == 0;
 }
 
-void rspamd_session_describe_pending(struct rspamd_async_session *session,
-                                                                        GString **summary_out,
-                                                                        GString **details_out)
+#define RSPAMD_DUMP_MAX_GROUPS 32
+
+GString *
+rspamd_session_describe_pending(struct rspamd_async_session *session)
 {
        struct rspamd_async_event *ev;
-       GString *summary, *details;
+       GString *out;
        unsigned int total = 0;
-       unsigned int n_subsystems = 0;
-       unsigned int overflow_subsystems = 0;
-       unsigned int total_detail_entries = 0;
-       unsigned int i, j;
+       unsigned int n_groups = 0;
+       unsigned int overflow_groups = 0;
+       unsigned int i;
 
-       struct dump_source {
+       struct dump_group {
+               const char *subsystem;
                const char *item_name;
                const char *label;
                unsigned int count;
-       };
-       struct dump_subsystem {
-               const char *name;
-               unsigned int count;
-               unsigned int distinct_sources;
-               unsigned int overflow_sources;
-               struct dump_source sources[RSPAMD_DUMP_MAX_SOURCES_PER_SUB];
-       } subsystems[RSPAMD_DUMP_MAX_SUBSYSTEMS];
-
-       if (summary_out) {
-               *summary_out = NULL;
-       }
-       if (details_out) {
-               *details_out = NULL;
-       }
+       } groups[RSPAMD_DUMP_MAX_GROUPS];
 
        if (session == NULL || kh_size(session->events) == 0) {
-               return;
+               return NULL;
        }
 
        kh_foreach_key(session->events, ev, {
                const char *sub = ev->subsystem ? ev->subsystem : "(null)";
                const char *item = ev->item_name;
                const char *lbl = ev->label;
-               struct dump_subsystem *s = NULL;
-               struct dump_source *src_e = NULL;
+               struct dump_group *g = NULL;
 
                total++;
 
-               for (i = 0; i < n_subsystems; i++) {
-                       if (strcmp(subsystems[i].name, sub) == 0) {
-                               s = &subsystems[i];
+               for (i = 0; i < n_groups; i++) {
+                       if (strcmp(groups[i].subsystem, sub) == 0 &&
+                               rspamd_str_eq_nullable(groups[i].item_name, item) &&
+                               rspamd_str_eq_nullable(groups[i].label, lbl)) {
+                               g = &groups[i];
                                break;
                        }
                }
 
-               if (s == NULL) {
-                       if (n_subsystems < RSPAMD_DUMP_MAX_SUBSYSTEMS) {
-                               s = &subsystems[n_subsystems++];
-                               s->name = sub;
-                               s->count = 0;
-                               s->distinct_sources = 0;
-                               s->overflow_sources = 0;
+               if (g == NULL) {
+                       if (n_groups < RSPAMD_DUMP_MAX_GROUPS) {
+                               g = &groups[n_groups++];
+                               g->subsystem = sub;
+                               g->item_name = item;
+                               g->label = lbl;
+                               g->count = 0;
                        }
                        else {
-                               overflow_subsystems++;
+                               overflow_groups++;
                        }
                }
 
-               if (s != NULL) {
-                       s->count++;
-
-                       /* Events without any annotation are counted in subsystem total
-                        * only — we do not emit a detail entry for them. */
-                       if (item != NULL || lbl != NULL) {
-                               for (j = 0; j < s->distinct_sources; j++) {
-                                       if (rspamd_str_eq_nullable(s->sources[j].item_name, item) &&
-                                               rspamd_str_eq_nullable(s->sources[j].label, lbl)) {
-                                               src_e = &s->sources[j];
-                                               break;
-                                       }
-                               }
-
-                               if (src_e == NULL) {
-                                       if (s->distinct_sources < RSPAMD_DUMP_MAX_SOURCES_PER_SUB) {
-                                               src_e = &s->sources[s->distinct_sources++];
-                                               src_e->item_name = item;
-                                               src_e->label = lbl;
-                                               src_e->count = 0;
-                                       }
-                                       else {
-                                               s->overflow_sources++;
-                                       }
-                               }
-
-                               if (src_e != NULL) {
-                                       src_e->count++;
-                               }
-                       }
+               if (g != NULL) {
+                       g->count++;
                }
        });
 
        if (total == 0) {
-               return;
-       }
-
-       summary = g_string_sized_new(128);
-       rspamd_printf_gstring(summary, "total=%ud; by subsystem: ", total);
-       for (i = 0; i < n_subsystems; i++) {
-               if (i > 0) {
-                       g_string_append(summary, ", ");
-               }
-               rspamd_printf_gstring(summary, "%s=%ud",
-                                                         subsystems[i].name, subsystems[i].count);
-       }
-       if (overflow_subsystems > 0) {
-               rspamd_printf_gstring(summary, ", (+%ud more subsystems)",
-                                                         overflow_subsystems);
+               return NULL;
        }
 
-       for (i = 0; i < n_subsystems; i++) {
-               total_detail_entries += subsystems[i].distinct_sources;
-       }
+       out = g_string_sized_new(256);
+       rspamd_printf_gstring(out, "total=%ud; ", total);
 
-       if (total_detail_entries == 0) {
-               details = NULL;
-       }
-       else {
-               bool first_sub = true;
-               details = g_string_sized_new(256);
-               for (i = 0; i < n_subsystems; i++) {
-                       struct dump_subsystem *s = &subsystems[i];
+       for (i = 0; i < n_groups; i++) {
+               const struct dump_group *g = &groups[i];
 
-                       if (s->distinct_sources == 0) {
-                               continue;
-                       }
+               if (i > 0) {
+                       g_string_append(out, ", ");
+               }
 
-                       if (!first_sub) {
-                               g_string_append(details, "; ");
-                       }
-                       first_sub = false;
-                       rspamd_printf_gstring(details, "[%s:", s->name);
-                       for (j = 0; j < s->distinct_sources; j++) {
-                               const char *it = s->sources[j].item_name;
-                               const char *lb = s->sources[j].label;
-
-                               g_string_append_c(details, ' ');
-                               if (it != NULL && lb != NULL) {
-                                       rspamd_printf_gstring(details, "%s(%s)", it, lb);
-                               }
-                               else if (it != NULL) {
-                                       rspamd_printf_gstring(details, "%s", it);
-                               }
-                               else {
-                                       rspamd_printf_gstring(details, "%s", lb);
-                               }
-                               rspamd_printf_gstring(details, " x%ud", s->sources[j].count);
-                       }
-                       if (s->overflow_sources > 0) {
-                               rspamd_printf_gstring(details, " (+%ud more)",
-                                                                         s->overflow_sources);
-                       }
-                       g_string_append_c(details, ']');
+               g_string_append(out, g->subsystem);
+               if (g->item_name != NULL && g->label != NULL) {
+                       rspamd_printf_gstring(out, "[%s/%s]", g->item_name, g->label);
+               }
+               else if (g->item_name != NULL) {
+                       rspamd_printf_gstring(out, "[%s]", g->item_name);
+               }
+               else if (g->label != NULL) {
+                       rspamd_printf_gstring(out, "[%s]", g->label);
                }
+               rspamd_printf_gstring(out, "=%ud", g->count);
        }
 
-       if (summary_out) {
-               *summary_out = summary;
-       }
-       else {
-               g_string_free(summary, TRUE);
-       }
-       if (details_out) {
-               *details_out = details;
-       }
-       else if (details != NULL) {
-               g_string_free(details, TRUE);
+       if (overflow_groups > 0) {
+               rspamd_printf_gstring(out, ", (+%ud more groups)", overflow_groups);
        }
+
+       return out;
 }
 
+#undef RSPAMD_DUMP_MAX_GROUPS
 #undef RSPAMD_DUMP_MAX_SUBSYSTEMS
 #undef RSPAMD_DUMP_MAX_SOURCES_PER_SUB
 
index b899194fe976daa116e44f75fd365e89d6a75834..6746851c104de9242c87a61afcaf41cac25f7d32 100644 (file)
@@ -125,25 +125,20 @@ gboolean rspamd_session_pending(struct rspamd_async_session *session);
 unsigned int rspamd_session_events_pending(struct rspamd_async_session *session);
 
 /**
- * Builds human-readable descriptions of currently-pending async events grouped
- * by subsystem. Produces two newly-allocated GStrings written to the out-params:
- *   - *summary_out : compact counts, e.g. "total=10; by subsystem: rspamd dns=7, fuzzy_check=3"
- *   - *details_out : within each subsystem, distinct (item, label) pairs with
- *                    counts, e.g. "[rspamd dns: RBL_FOO x5, SURBL_CHECK x2]; [rspamd lua tcp: RATELIMIT_CHECK(tcp write) x1]"
- * Events that have neither an owning item nor a label are counted in the summary
- * but omitted from the detail line. The caller owns both strings and MUST free
- * them with g_string_free(..., TRUE). If there are no pending events, both
- * out-params are set to NULL; if there are events but none of them carry
- * detail, details_out is set to NULL while summary_out is populated.
+ * Builds a single human-readable line describing all currently-pending async
+ * events, grouped by the (subsystem, item_name, label) triple. Each group is
+ * rendered as "<subsystem>[<item>/<label>]=N" when both item and label are
+ * known, degrading to "<subsystem>[<item>]=N", "<subsystem>[<label>]=N", or
+ * bare "<subsystem>=N" if fields are missing. Example output:
+ *   "total=5; rspamd dns[RBL_FOO]=3, rspamd dns[SURBL]=1, rspamd lua http[X]=1"
+ * Returns a newly-allocated GString that the caller MUST free with
+ * g_string_free(..., TRUE), or NULL if there are no pending events.
  * Intended to be called from timeout handlers so the caller can log with the
  * proper task module tag (msg_info_task).
  * @param session session to dump
- * @param summary_out receives the summary GString (may be NULL on return)
- * @param details_out receives the detail GString (may be NULL on return)
+ * @return newly-allocated GString or NULL
  */
-void rspamd_session_describe_pending(struct rspamd_async_session *session,
-                                                                        GString **summary_out,
-                                                                        GString **details_out);
+GString *rspamd_session_describe_pending(struct rspamd_async_session *session);
 
 
 /**
index 6c7417373472f9dae0f979952888cfaf5c8b77a8..2b5a443cf814998689520be50d29ee659e085a1c 100644 (file)
@@ -1936,7 +1936,7 @@ rspamd_task_stage_name(enum rspamd_task_stage stg)
        return ret;
 }
 
-const char *
+static const char *
 rspamd_task_session_item_name_resolver(gpointer ud)
 {
        struct rspamd_task *task = ud;
@@ -1954,20 +1954,32 @@ rspamd_task_session_item_name_resolver(gpointer ud)
        return rspamd_symcache_dyn_item_name(task, item);
 }
 
+struct rspamd_async_session *
+rspamd_task_create_session(struct rspamd_task *task,
+                                                  rspamd_mempool_t *pool,
+                                                  session_finalizer_t fin,
+                                                  event_finalizer_t restore,
+                                                  event_finalizer_t cleanup)
+{
+       struct rspamd_async_session *s;
+
+       g_assert(task != NULL);
+
+       s = rspamd_session_create(pool, fin, restore, cleanup, task);
+       rspamd_session_set_item_name_resolver(s, rspamd_task_session_item_name_resolver);
+
+       return s;
+}
+
 static void
 rspamd_task_timeout_log_state(struct rspamd_task *task)
 {
-       GString *evt_summary = NULL, *evt_details = NULL, *inflight = NULL;
+       GString *pending, *inflight;
 
-       rspamd_session_describe_pending(task->s, &evt_summary, &evt_details);
-
-       if (evt_summary != NULL) {
-               msg_info_task("pending async events at timeout: %v", evt_summary);
-               g_string_free(evt_summary, TRUE);
-       }
-       if (evt_details != NULL) {
-               msg_info_task("pending async event sources: %v", evt_details);
-               g_string_free(evt_details, TRUE);
+       pending = rspamd_session_describe_pending(task->s);
+       if (pending != NULL) {
+               msg_info_task("pending async events at timeout: %v", pending);
+               g_string_free(pending, TRUE);
        }
 
        inflight = rspamd_symcache_describe_inflight_symbols(task);
index 4cc8fa7699ba02663bf4fc98f76eb7b424bab0b1..fe8cbf901270d1ff51f68e36edd84e13c0d1c637 100644 (file)
@@ -415,11 +415,24 @@ const char *rspamd_task_stage_name(enum rspamd_task_stage stg);
  */
 void rspamd_task_timeout(EV_P_ ev_timer *w, int revents);
 
-/*
- * Resolver used by task's async session to snapshot the name of the currently
- * executing symcache symbol (or NULL if none). Argument is the task pointer.
+/**
+ * Creates an async session owned by @task and wires up the default task
+ * diagnostics resolver (used to snapshot the currently-executing symbol name
+ * on every rspamd_session_add_event). All task-scoped sessions should be
+ * created through this helper instead of calling rspamd_session_create
+ * directly, otherwise timeout logs will be missing symbol context.
+ * @param task owning task; must be non-NULL
+ * @param pool memory pool for the session
+ * @param fin session-fin callback
+ * @param restore session-restore callback (or NULL)
+ * @param cleanup session-cleanup callback (or NULL)
  */
-const char *rspamd_task_session_item_name_resolver(gpointer ud);
+struct rspamd_async_session *
+rspamd_task_create_session(struct rspamd_task *task,
+                                                  rspamd_mempool_t *pool,
+                                                  session_finalizer_t fin,
+                                                  event_finalizer_t restore,
+                                                  event_finalizer_t cleanup);
 
 /*
  * Called on unexpected IO error (e.g. ECONNRESET)
index 2d62dc2b797d2fc2bf44ffbba530e32eb98d7644..37e9cfaf960638540dce49148ca1fd726b19fa03 100644 (file)
@@ -153,11 +153,10 @@ rspamd_worker_call_finish_handlers(struct rspamd_worker *worker)
                task = rspamd_task_new(worker, cfg, NULL, NULL, ctx->event_loop, FALSE);
                task->resolver = ctx->resolver;
                task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
-               task->s = rspamd_session_create(task->task_pool,
-                                                                               rspamd_worker_finalize,
-                                                                               NULL,
-                                                                               (event_finalizer_t) rspamd_task_free,
-                                                                               task);
+               task->s = rspamd_task_create_session(task, task->task_pool,
+                                                                                        rspamd_worker_finalize,
+                                                                                        NULL,
+                                                                                        (event_finalizer_t) rspamd_task_free);
 
                DL_FOREACH(cfg->on_term_scripts, sc)
                {
index a5dcf031b2b6131fdc21a083cc7fb907f4348135..1f1e96940364feaa133bc14f6e2779f47095b5a0 100644 (file)
@@ -1076,8 +1076,8 @@ lua_util_process_message(lua_State *L)
                task->fin_callback = lua_util_task_fin;
                task->fin_arg = &res;
                task->resolver = rspamd_dns_resolver_init(NULL, base, cfg);
-               task->s = rspamd_session_create(task->task_pool, rspamd_task_fin,
-                                                                               NULL, (event_finalizer_t) rspamd_task_free, task);
+               task->s = rspamd_task_create_session(task, task->task_pool, rspamd_task_fin,
+                                                                                        NULL, (event_finalizer_t) rspamd_task_free);
 
                if (!rspamd_task_load_message(task, NULL, message, mlen)) {
                        lua_pushnil(L);
index 4a3bc4dd2ab368f83445371630880db2b6bc98a7..3ea84de691d1248a33266629cedf884ceb0343cb 100644 (file)
@@ -2800,8 +2800,8 @@ rspamd_proxy_self_scan(struct rspamd_proxy_session *session)
 
        task->fin_arg = session;
        task->resolver = session->ctx->resolver;
-       task->s = rspamd_session_create(task->task_pool, rspamd_proxy_task_fin,
-                                                                       NULL, (event_finalizer_t) rspamd_task_free, task);
+       task->s = rspamd_task_create_session(task, task->task_pool, rspamd_proxy_task_fin,
+                                                                                NULL, (event_finalizer_t) rspamd_task_free);
        data = rspamd_http_message_get_body(msg, &len);
 
        if (session->backend->settings_id) {
index e3cdd2f8943a3024d301436ee44f0df11fda3ecc..73caef5a9999550c9909c26a58a05949e02b8673 100644 (file)
@@ -165,10 +165,8 @@ rspamd_worker_body_handler(struct rspamd_http_connection *conn,
                                                                  session);
 
        /* Set up async session */
-       task->s = rspamd_session_create(task->task_pool, rspamd_task_fin,
-                                                                       NULL, (event_finalizer_t) rspamd_task_free, task);
-       rspamd_session_set_item_name_resolver(task->s,
-                                                                                 rspamd_task_session_item_name_resolver);
+       task->s = rspamd_task_create_session(task, task->task_pool, rspamd_task_fin,
+                                                                                NULL, (event_finalizer_t) rspamd_task_free);
 
        if (!rspamd_protocol_handle_request(task, msg)) {
                msg_err_task("cannot handle request: %e", task->err);