session->pool, ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
- task->s = rspamd_session_create(session->pool,
- rspamd_controller_history_lua_fin_task,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
- rspamd_session_set_item_name_resolver(task->s,
- rspamd_task_session_item_name_resolver);
+ task->s = rspamd_task_create_session(task, session->pool,
+ rspamd_controller_history_lua_fin_task,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
task->fin_arg = conn_ent;
ptask = lua_newuserdata(L, sizeof(*ptask));
ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
- task->s = rspamd_session_create(session->pool,
- rspamd_controller_lua_fin_task,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
+ task->s = rspamd_task_create_session(task, session->pool,
+ rspamd_controller_lua_fin_task,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
task->sock = -1;
task->resolver = ctx->resolver;
/* Manual learn: ensure errors are propagated (not auto-learn semantics) */
task->flags &= ~RSPAMD_TASK_FLAG_LEARN_AUTO;
- task->s = rspamd_session_create(session->pool,
- rspamd_controller_learn_fin_task,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
+ task->s = rspamd_task_create_session(task, session->pool,
+ rspamd_controller_learn_fin_task,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
task->sock = -1;
task->resolver = ctx->resolver;
/* Manual learn: ensure errors are propagated (not auto-learn semantics) */
task->flags &= ~RSPAMD_TASK_FLAG_LEARN_AUTO;
- task->s = rspamd_session_create(session->pool,
- rspamd_controller_learn_fin_task,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
+ task->s = rspamd_task_create_session(task, session->pool,
+ rspamd_controller_learn_fin_task,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
task->sock = -1;
ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
- task->s = rspamd_session_create(session->pool,
- rspamd_controller_check_fin_task,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
+ task->s = rspamd_task_create_session(task, session->pool,
+ rspamd_controller_check_fin_task,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
task->sock = conn_ent->conn->fd;
ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
- task->s = rspamd_session_create(session->pool,
- rspamd_controller_lua_fin_task,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
+ task->s = rspamd_task_create_session(task, session->pool,
+ rspamd_controller_lua_fin_task,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
;
return strcmp(a, b) == 0;
}
-void rspamd_session_describe_pending(struct rspamd_async_session *session,
- GString **summary_out,
- GString **details_out)
+#define RSPAMD_DUMP_MAX_GROUPS 32
+
+GString *
+rspamd_session_describe_pending(struct rspamd_async_session *session)
{
struct rspamd_async_event *ev;
- GString *summary, *details;
+ GString *out;
unsigned int total = 0;
- unsigned int n_subsystems = 0;
- unsigned int overflow_subsystems = 0;
- unsigned int total_detail_entries = 0;
- unsigned int i, j;
+ unsigned int n_groups = 0;
+ unsigned int overflow_groups = 0;
+ unsigned int i;
- struct dump_source {
+ struct dump_group {
+ const char *subsystem;
const char *item_name;
const char *label;
unsigned int count;
- };
- struct dump_subsystem {
- const char *name;
- unsigned int count;
- unsigned int distinct_sources;
- unsigned int overflow_sources;
- struct dump_source sources[RSPAMD_DUMP_MAX_SOURCES_PER_SUB];
- } subsystems[RSPAMD_DUMP_MAX_SUBSYSTEMS];
-
- if (summary_out) {
- *summary_out = NULL;
- }
- if (details_out) {
- *details_out = NULL;
- }
+ } groups[RSPAMD_DUMP_MAX_GROUPS];
if (session == NULL || kh_size(session->events) == 0) {
- return;
+ return NULL;
}
kh_foreach_key(session->events, ev, {
const char *sub = ev->subsystem ? ev->subsystem : "(null)";
const char *item = ev->item_name;
const char *lbl = ev->label;
- struct dump_subsystem *s = NULL;
- struct dump_source *src_e = NULL;
+ struct dump_group *g = NULL;
total++;
- for (i = 0; i < n_subsystems; i++) {
- if (strcmp(subsystems[i].name, sub) == 0) {
- s = &subsystems[i];
+ for (i = 0; i < n_groups; i++) {
+ if (strcmp(groups[i].subsystem, sub) == 0 &&
+ rspamd_str_eq_nullable(groups[i].item_name, item) &&
+ rspamd_str_eq_nullable(groups[i].label, lbl)) {
+ g = &groups[i];
break;
}
}
- if (s == NULL) {
- if (n_subsystems < RSPAMD_DUMP_MAX_SUBSYSTEMS) {
- s = &subsystems[n_subsystems++];
- s->name = sub;
- s->count = 0;
- s->distinct_sources = 0;
- s->overflow_sources = 0;
+ if (g == NULL) {
+ if (n_groups < RSPAMD_DUMP_MAX_GROUPS) {
+ g = &groups[n_groups++];
+ g->subsystem = sub;
+ g->item_name = item;
+ g->label = lbl;
+ g->count = 0;
}
else {
- overflow_subsystems++;
+ overflow_groups++;
}
}
- if (s != NULL) {
- s->count++;
-
- /* Events without any annotation are counted in subsystem total
- * only — we do not emit a detail entry for them. */
- if (item != NULL || lbl != NULL) {
- for (j = 0; j < s->distinct_sources; j++) {
- if (rspamd_str_eq_nullable(s->sources[j].item_name, item) &&
- rspamd_str_eq_nullable(s->sources[j].label, lbl)) {
- src_e = &s->sources[j];
- break;
- }
- }
-
- if (src_e == NULL) {
- if (s->distinct_sources < RSPAMD_DUMP_MAX_SOURCES_PER_SUB) {
- src_e = &s->sources[s->distinct_sources++];
- src_e->item_name = item;
- src_e->label = lbl;
- src_e->count = 0;
- }
- else {
- s->overflow_sources++;
- }
- }
-
- if (src_e != NULL) {
- src_e->count++;
- }
- }
+ if (g != NULL) {
+ g->count++;
}
});
if (total == 0) {
- return;
- }
-
- summary = g_string_sized_new(128);
- rspamd_printf_gstring(summary, "total=%ud; by subsystem: ", total);
- for (i = 0; i < n_subsystems; i++) {
- if (i > 0) {
- g_string_append(summary, ", ");
- }
- rspamd_printf_gstring(summary, "%s=%ud",
- subsystems[i].name, subsystems[i].count);
- }
- if (overflow_subsystems > 0) {
- rspamd_printf_gstring(summary, ", (+%ud more subsystems)",
- overflow_subsystems);
+ return NULL;
}
- for (i = 0; i < n_subsystems; i++) {
- total_detail_entries += subsystems[i].distinct_sources;
- }
+ out = g_string_sized_new(256);
+ rspamd_printf_gstring(out, "total=%ud; ", total);
- if (total_detail_entries == 0) {
- details = NULL;
- }
- else {
- bool first_sub = true;
- details = g_string_sized_new(256);
- for (i = 0; i < n_subsystems; i++) {
- struct dump_subsystem *s = &subsystems[i];
+ for (i = 0; i < n_groups; i++) {
+ const struct dump_group *g = &groups[i];
- if (s->distinct_sources == 0) {
- continue;
- }
+ if (i > 0) {
+ g_string_append(out, ", ");
+ }
- if (!first_sub) {
- g_string_append(details, "; ");
- }
- first_sub = false;
- rspamd_printf_gstring(details, "[%s:", s->name);
- for (j = 0; j < s->distinct_sources; j++) {
- const char *it = s->sources[j].item_name;
- const char *lb = s->sources[j].label;
-
- g_string_append_c(details, ' ');
- if (it != NULL && lb != NULL) {
- rspamd_printf_gstring(details, "%s(%s)", it, lb);
- }
- else if (it != NULL) {
- rspamd_printf_gstring(details, "%s", it);
- }
- else {
- rspamd_printf_gstring(details, "%s", lb);
- }
- rspamd_printf_gstring(details, " x%ud", s->sources[j].count);
- }
- if (s->overflow_sources > 0) {
- rspamd_printf_gstring(details, " (+%ud more)",
- s->overflow_sources);
- }
- g_string_append_c(details, ']');
+ g_string_append(out, g->subsystem);
+ if (g->item_name != NULL && g->label != NULL) {
+ rspamd_printf_gstring(out, "[%s/%s]", g->item_name, g->label);
+ }
+ else if (g->item_name != NULL) {
+ rspamd_printf_gstring(out, "[%s]", g->item_name);
+ }
+ else if (g->label != NULL) {
+ rspamd_printf_gstring(out, "[%s]", g->label);
}
+ rspamd_printf_gstring(out, "=%ud", g->count);
}
- if (summary_out) {
- *summary_out = summary;
- }
- else {
- g_string_free(summary, TRUE);
- }
- if (details_out) {
- *details_out = details;
- }
- else if (details != NULL) {
- g_string_free(details, TRUE);
+ if (overflow_groups > 0) {
+ rspamd_printf_gstring(out, ", (+%ud more groups)", overflow_groups);
}
+
+ return out;
}
+#undef RSPAMD_DUMP_MAX_GROUPS
#undef RSPAMD_DUMP_MAX_SUBSYSTEMS
#undef RSPAMD_DUMP_MAX_SOURCES_PER_SUB
unsigned int rspamd_session_events_pending(struct rspamd_async_session *session);
/**
- * Builds human-readable descriptions of currently-pending async events grouped
- * by subsystem. Produces two newly-allocated GStrings written to the out-params:
- * - *summary_out : compact counts, e.g. "total=10; by subsystem: rspamd dns=7, fuzzy_check=3"
- * - *details_out : within each subsystem, distinct (item, label) pairs with
- * counts, e.g. "[rspamd dns: RBL_FOO x5, SURBL_CHECK x2]; [rspamd lua tcp: RATELIMIT_CHECK(tcp write) x1]"
- * Events that have neither an owning item nor a label are counted in the summary
- * but omitted from the detail line. The caller owns both strings and MUST free
- * them with g_string_free(..., TRUE). If there are no pending events, both
- * out-params are set to NULL; if there are events but none of them carry
- * detail, details_out is set to NULL while summary_out is populated.
+ * Builds a single human-readable line describing all currently-pending async
+ * events, grouped by the (subsystem, item_name, label) triple. Each group is
+ * rendered as "<subsystem>[<item>/<label>]=N" when both item and label are
+ * known, degrading to "<subsystem>[<item>]=N", "<subsystem>[<label>]=N", or
+ * bare "<subsystem>=N" if fields are missing. Example output:
+ * "total=5; rspamd dns[RBL_FOO]=3, rspamd dns[SURBL]=1, rspamd lua http[X]=1"
+ * Returns a newly-allocated GString that the caller MUST free with
+ * g_string_free(..., TRUE), or NULL if there are no pending events.
* Intended to be called from timeout handlers so the caller can log with the
* proper task module tag (msg_info_task).
* @param session session to dump
- * @param summary_out receives the summary GString (may be NULL on return)
- * @param details_out receives the detail GString (may be NULL on return)
+ * @return newly-allocated GString or NULL
*/
-void rspamd_session_describe_pending(struct rspamd_async_session *session,
- GString **summary_out,
- GString **details_out);
+GString *rspamd_session_describe_pending(struct rspamd_async_session *session);
/**
return ret;
}
-const char *
+static const char *
rspamd_task_session_item_name_resolver(gpointer ud)
{
struct rspamd_task *task = ud;
return rspamd_symcache_dyn_item_name(task, item);
}
+struct rspamd_async_session *
+rspamd_task_create_session(struct rspamd_task *task,
+ rspamd_mempool_t *pool,
+ session_finalizer_t fin,
+ event_finalizer_t restore,
+ event_finalizer_t cleanup)
+{
+ struct rspamd_async_session *s;
+
+ g_assert(task != NULL);
+
+ s = rspamd_session_create(pool, fin, restore, cleanup, task);
+ rspamd_session_set_item_name_resolver(s, rspamd_task_session_item_name_resolver);
+
+ return s;
+}
+
static void
rspamd_task_timeout_log_state(struct rspamd_task *task)
{
- GString *evt_summary = NULL, *evt_details = NULL, *inflight = NULL;
+ GString *pending, *inflight;
- rspamd_session_describe_pending(task->s, &evt_summary, &evt_details);
-
- if (evt_summary != NULL) {
- msg_info_task("pending async events at timeout: %v", evt_summary);
- g_string_free(evt_summary, TRUE);
- }
- if (evt_details != NULL) {
- msg_info_task("pending async event sources: %v", evt_details);
- g_string_free(evt_details, TRUE);
+ pending = rspamd_session_describe_pending(task->s);
+ if (pending != NULL) {
+ msg_info_task("pending async events at timeout: %v", pending);
+ g_string_free(pending, TRUE);
}
inflight = rspamd_symcache_describe_inflight_symbols(task);
*/
void rspamd_task_timeout(EV_P_ ev_timer *w, int revents);
-/*
- * Resolver used by task's async session to snapshot the name of the currently
- * executing symcache symbol (or NULL if none). Argument is the task pointer.
+/**
+ * Creates an async session owned by @task and wires up the default task
+ * diagnostics resolver (used to snapshot the currently-executing symbol name
+ * on every rspamd_session_add_event). All task-scoped sessions should be
+ * created through this helper instead of calling rspamd_session_create
+ * directly, otherwise timeout logs will be missing symbol context.
+ * @param task owning task; must be non-NULL
+ * @param pool memory pool for the session
+ * @param fin session-fin callback
+ * @param restore session-restore callback (or NULL)
+ * @param cleanup session-cleanup callback (or NULL)
*/
-const char *rspamd_task_session_item_name_resolver(gpointer ud);
+struct rspamd_async_session *
+rspamd_task_create_session(struct rspamd_task *task,
+ rspamd_mempool_t *pool,
+ session_finalizer_t fin,
+ event_finalizer_t restore,
+ event_finalizer_t cleanup);
/*
* Called on unexpected IO error (e.g. ECONNRESET)
task = rspamd_task_new(worker, cfg, NULL, NULL, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
- task->s = rspamd_session_create(task->task_pool,
- rspamd_worker_finalize,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
+ task->s = rspamd_task_create_session(task, task->task_pool,
+ rspamd_worker_finalize,
+ NULL,
+ (event_finalizer_t) rspamd_task_free);
DL_FOREACH(cfg->on_term_scripts, sc)
{
task->fin_callback = lua_util_task_fin;
task->fin_arg = &res;
task->resolver = rspamd_dns_resolver_init(NULL, base, cfg);
- task->s = rspamd_session_create(task->task_pool, rspamd_task_fin,
- NULL, (event_finalizer_t) rspamd_task_free, task);
+ task->s = rspamd_task_create_session(task, task->task_pool, rspamd_task_fin,
+ NULL, (event_finalizer_t) rspamd_task_free);
if (!rspamd_task_load_message(task, NULL, message, mlen)) {
lua_pushnil(L);
task->fin_arg = session;
task->resolver = session->ctx->resolver;
- task->s = rspamd_session_create(task->task_pool, rspamd_proxy_task_fin,
- NULL, (event_finalizer_t) rspamd_task_free, task);
+ task->s = rspamd_task_create_session(task, task->task_pool, rspamd_proxy_task_fin,
+ NULL, (event_finalizer_t) rspamd_task_free);
data = rspamd_http_message_get_body(msg, &len);
if (session->backend->settings_id) {
session);
/* Set up async session */
- task->s = rspamd_session_create(task->task_pool, rspamd_task_fin,
- NULL, (event_finalizer_t) rspamd_task_free, task);
- rspamd_session_set_item_name_resolver(task->s,
- rspamd_task_session_item_name_resolver);
+ task->s = rspamd_task_create_session(task, task->task_pool, rspamd_task_fin,
+ NULL, (event_finalizer_t) rspamd_task_free);
if (!rspamd_protocol_handle_request(task, msg)) {
msg_err_task("cannot handle request: %e", task->err);