int ret;
if (ev->fin != NULL) {
- if (forced_cleanup) {
- msg_info_session("forced removed event on destroy: %p, subsystem: %s, scheduled from: %s",
- ev->user_data,
- ev->subsystem,
- ev->event_source);
- }
- else {
- msg_debug_session("removed event on destroy: %p, subsystem: %s",
- ev->user_data,
- ev->subsystem);
- }
+ msg_debug_session("%sremoved event on destroy: %p, subsystem: %s, scheduled from: %s",
+ forced_cleanup ? "forced " : "",
+ ev->user_data,
+ ev->subsystem,
+ ev->event_source);
ev->fin(ev->user_data);
}
else {
- if (forced_cleanup) {
- msg_info_session("NOT forced removed event on destroy - uncancellable: "
- "%p, subsystem: %s, scheduled from: %s",
- ev->user_data,
- ev->subsystem,
- ev->event_source);
- }
- else {
- msg_debug_session("NOT removed event on destroy - uncancellable: %p, subsystem: %s",
- ev->user_data,
- ev->subsystem);
- }
+ msg_debug_session("NOT %sremoved event on destroy - uncancellable: %p, subsystem: %s, scheduled from: %s",
+ forced_cleanup ? "forced " : "",
+ ev->user_data,
+ ev->subsystem,
+ ev->event_source);
/* Assume an event is uncancellable, move it to a new hash table */
kh_put(rspamd_events_hash, uncancellable_events, ev, &ret);
}
kh_destroy(rspamd_events_hash, session->events);
session->events = uncancellable_events;
- if (forced_cleanup) {
- msg_info_session("pending %d uncancellable events", kh_size(uncancellable_events));
+ if (forced_cleanup && kh_size(uncancellable_events) > 0) {
+ msg_info_session("pending %d uncancellable events after forced cleanup",
+ kh_size(uncancellable_events));
}
else {
msg_debug_session("pending %d uncancellable events", kh_size(uncancellable_events));
return npending;
}
+#define RSPAMD_DUMP_MAX_SUBSYSTEMS 16
+#define RSPAMD_DUMP_MAX_SOURCES_PER_SUB 4
+
+void rspamd_session_describe_pending(struct rspamd_async_session *session,
+ GString **summary_out,
+ GString **details_out)
+{
+ struct rspamd_async_event *ev;
+ GString *summary, *details;
+ unsigned int total = 0;
+ unsigned int n_subsystems = 0;
+ unsigned int overflow_subsystems = 0;
+ unsigned int i, j;
+
+ struct dump_source {
+ const char *source;
+ unsigned int count;
+ };
+ struct dump_subsystem {
+ const char *name;
+ unsigned int count;
+ unsigned int distinct_sources;
+ unsigned int overflow_sources;
+ struct dump_source sources[RSPAMD_DUMP_MAX_SOURCES_PER_SUB];
+ } subsystems[RSPAMD_DUMP_MAX_SUBSYSTEMS];
+
+ if (summary_out) {
+ *summary_out = NULL;
+ }
+ if (details_out) {
+ *details_out = NULL;
+ }
+
+ if (session == NULL || kh_size(session->events) == 0) {
+ return;
+ }
+
+ kh_foreach_key(session->events, ev, {
+ const char *sub = ev->subsystem ? ev->subsystem : "(null)";
+ const char *src = ev->event_source ? ev->event_source : "(null)";
+ struct dump_subsystem *s = NULL;
+ struct dump_source *src_e = NULL;
+
+ total++;
+
+ for (i = 0; i < n_subsystems; i++) {
+ if (strcmp(subsystems[i].name, sub) == 0) {
+ s = &subsystems[i];
+ break;
+ }
+ }
+
+ if (s == NULL) {
+ if (n_subsystems < RSPAMD_DUMP_MAX_SUBSYSTEMS) {
+ s = &subsystems[n_subsystems++];
+ s->name = sub;
+ s->count = 0;
+ s->distinct_sources = 0;
+ s->overflow_sources = 0;
+ }
+ else {
+ overflow_subsystems++;
+ }
+ }
+
+ if (s != NULL) {
+ s->count++;
+
+ for (j = 0; j < s->distinct_sources; j++) {
+ if (strcmp(s->sources[j].source, src) == 0) {
+ src_e = &s->sources[j];
+ break;
+ }
+ }
+
+ if (src_e == NULL) {
+ if (s->distinct_sources < RSPAMD_DUMP_MAX_SOURCES_PER_SUB) {
+ src_e = &s->sources[s->distinct_sources++];
+ src_e->source = src;
+ src_e->count = 0;
+ }
+ else {
+ s->overflow_sources++;
+ }
+ }
+
+ if (src_e != NULL) {
+ src_e->count++;
+ }
+ }
+ });
+
+ if (total == 0) {
+ return;
+ }
+
+ summary = g_string_sized_new(128);
+ rspamd_printf_gstring(summary, "total=%ud; by subsystem: ", total);
+ for (i = 0; i < n_subsystems; i++) {
+ if (i > 0) {
+ g_string_append(summary, ", ");
+ }
+ rspamd_printf_gstring(summary, "%s=%ud",
+ subsystems[i].name, subsystems[i].count);
+ }
+ if (overflow_subsystems > 0) {
+ rspamd_printf_gstring(summary, ", (+%ud more subsystems)",
+ overflow_subsystems);
+ }
+
+ details = g_string_sized_new(256);
+ for (i = 0; i < n_subsystems; i++) {
+ if (i > 0) {
+ g_string_append(details, "; ");
+ }
+ rspamd_printf_gstring(details, "[%s:", subsystems[i].name);
+ for (j = 0; j < subsystems[i].distinct_sources; j++) {
+ rspamd_printf_gstring(details, " %s x%ud",
+ subsystems[i].sources[j].source,
+ subsystems[i].sources[j].count);
+ }
+ if (subsystems[i].overflow_sources > 0) {
+ rspamd_printf_gstring(details, " (+%ud more sources)",
+ subsystems[i].overflow_sources);
+ }
+ g_string_append_c(details, ']');
+ }
+
+ if (summary_out) {
+ *summary_out = summary;
+ }
+ else {
+ g_string_free(summary, TRUE);
+ }
+ if (details_out) {
+ *details_out = details;
+ }
+ else {
+ g_string_free(details, TRUE);
+ }
+}
+
+#undef RSPAMD_DUMP_MAX_SUBSYSTEMS
+#undef RSPAMD_DUMP_MAX_SOURCES_PER_SUB
+
rspamd_mempool_t *
rspamd_session_mempool(struct rspamd_async_session *session)
{
*/
unsigned int rspamd_session_events_pending(struct rspamd_async_session *session);
+/**
+ * Builds human-readable descriptions of currently-pending async events grouped
+ * by subsystem. Produces two newly-allocated GStrings written to the out-params:
+ * - *summary_out : compact counts, e.g. "total=10; by subsystem: rspamd dns=7, fuzzy_check=3"
+ * - *details_out : distinct call-sites per subsystem, e.g. "[rspamd dns: /path/a.c:45 x5, /path/b.c:12 x2]; [fuzzy_check: /path/c.c:88 x3]"
+ * The caller owns both strings and MUST free them with g_string_free(..., TRUE).
+ * If there are no pending events, both out-params are set to NULL.
+ * Intended to be called from timeout handlers so the caller can log with the
+ * proper task module tag (msg_info_task).
+ * @param session session to dump
+ * @param summary_out receives the summary GString (may be NULL on return)
+ * @param details_out receives the detail GString (may be NULL on return)
+ */
+void rspamd_session_describe_pending(struct rspamd_async_session *session,
+ GString **summary_out,
+ GString **details_out);
+
/**
* Returns TRUE if an async session is currently destroying
*/
void rspamd_symcache_enable_profile(struct rspamd_task *task);
+/**
+ * Builds a human-readable description of symbols that have been started but
+ * have not yet finished (i.e. are blocked on async events). Intended for use
+ * from timeout handlers so operators can see which rules stalled the task.
+ * Returns NULL if there are no inflight symbols. Caller owns the returned
+ * GString and MUST free it with g_string_free(..., TRUE).
+ * @param task
+ * @return
+ */
+GString *rspamd_symcache_describe_inflight_symbols(struct rspamd_task *task);
+
struct rspamd_symcache_timeout_item {
double timeout;
const struct rspamd_symcache_item *item;
return real_item->st;
}
+GString *
+rspamd_symcache_describe_inflight_symbols(struct rspamd_task *task)
+{
+ if (task == nullptr || task->symcache_runtime == nullptr) {
+ return nullptr;
+ }
+
+ auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
+
+ return cache_runtime->describe_inflight_symbols();
+}
+
void rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache,
const char *symbol,
ucl_object_t *this_sym_ucl)
return nullptr;
}
+auto symcache_runtime::describe_inflight_symbols() const -> GString *
+{
+ GString *out = nullptr;
+ unsigned int nshown = 0;
+ unsigned int nsuppressed = 0;
+ constexpr unsigned int MAX_ITEMS_SHOWN = 32;
+
+ if (items_inflight == 0) {
+ return nullptr;
+ }
+
+ for (auto [i, item]: rspamd::enumerate(order->d)) {
+ auto *dyn_item = &dynamic_items[i];
+
+ if (dyn_item->status != cache_item_status::started) {
+ continue;
+ }
+
+ if (nshown >= MAX_ITEMS_SHOWN) {
+ nsuppressed++;
+ continue;
+ }
+
+ if (out == nullptr) {
+ out = g_string_sized_new(128);
+ }
+ else {
+ g_string_append(out, ", ");
+ }
+
+ const auto &name = item->get_name();
+ if (dyn_item->start_msec > 0) {
+ rspamd_printf_gstring(out, "%*s (async=%ud, started=%ud ms)",
+ (int) name.size(), name.data(),
+ (unsigned int) dyn_item->async_events,
+ (unsigned int) dyn_item->start_msec);
+ }
+ else {
+ rspamd_printf_gstring(out, "%*s (async=%ud)",
+ (int) name.size(), name.data(),
+ (unsigned int) dyn_item->async_events);
+ }
+ nshown++;
+ }
+
+ if (nsuppressed > 0 && out != nullptr) {
+ rspamd_printf_gstring(out, " (+%ud more)", nsuppressed);
+ }
+
+ return out;
+}
+
}// namespace rspamd::symcache
slow_status = slow_status::disabled;
}
}
+
+ /**
+ * Builds a human-readable description of symbols that have been started but
+ * have not yet finished (i.e. are waiting on async events: DNS, Redis, HTTP,
+ * etc.). Intended to be used from timeout handlers to surface which rules
+ * stalled the task.
+ * @return newly allocated GString (caller must g_string_free) or nullptr if
+ * no inflight symbols
+ */
+ auto describe_inflight_symbols() const -> GString *;
};
return ret;
}
+static void
+rspamd_task_timeout_log_state(struct rspamd_task *task)
+{
+ GString *evt_summary = NULL, *evt_details = NULL, *inflight = NULL;
+
+ rspamd_session_describe_pending(task->s, &evt_summary, &evt_details);
+
+ if (evt_summary != NULL) {
+ msg_info_task("pending async events at timeout: %v", evt_summary);
+ g_string_free(evt_summary, TRUE);
+ }
+ if (evt_details != NULL) {
+ msg_info_task("pending async event sources: %v", evt_details);
+ g_string_free(evt_details, TRUE);
+ }
+
+ inflight = rspamd_symcache_describe_inflight_symbols(task);
+ if (inflight != NULL) {
+ msg_info_task("inflight symbols at timeout: %v", inflight);
+ g_string_free(inflight, TRUE);
+ }
+}
+
void rspamd_task_timeout(EV_P_ ev_timer *w, int revents)
{
struct rspamd_task *task = (struct rspamd_task *) w->data;
"forced processing",
ev_now(task->event_loop) - task->task_timestamp,
w->repeat);
+ rspamd_task_timeout_log_state(task);
if (task->cfg->soft_reject_on_timeout) {
struct rspamd_action *action, *soft_reject;
/* Postprocessing timeout */
msg_info_task("post-processing of task time out: %.1f second spent; forced processing",
ev_now(task->event_loop) - task->task_timestamp);
+ rspamd_task_timeout_log_state(task);
if (task->cfg->soft_reject_on_timeout) {
struct rspamd_action *action, *soft_reject;