From: Vsevolod Stakhov Date: Thu, 27 Nov 2025 10:39:54 +0000 (+0000) Subject: [Feature] Add control protocol command for composites statistics X-Git-Tag: 3.14.1~5^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1b2fd7b0fc644e1adffd00d0ff7139b7ddc549a4;p=thirdparty%2Frspamd.git [Feature] Add control protocol command for composites statistics - Add RSPAMD_CONTROL_COMPOSITES_STATS command to control protocol - Add /compositesstats endpoint to control socket - Add 'rspamadm control compositesstats' command - Aggregate statistics from all workers with per-worker breakdown - Remove composites stats from controller /stat (use control socket instead) - Statistics always collected, timing sampled 1/256 (configurable) --- diff --git a/src/controller.c b/src/controller.c index 8f7a298a77..d3672339c1 100644 --- a/src/controller.c +++ b/src/controller.c @@ -35,7 +35,6 @@ #include "libmime/lang_detection.h" #include "mempool_vars_internal.h" #include "lua/lua_classnames.h" -#include "libserver/composites/composites.h" #include /* 60 seconds for worker's IO */ @@ -2917,45 +2916,10 @@ rspamd_controller_handle_stat_common( ucl_object_insert_key(top, ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false); - /* Composites statistics */ - if (ctx->cfg->composites_manager) { - struct rspamd_composites_stats_export comp_stats; - ucl_object_t *comp_obj, *time_obj; - - rspamd_composites_get_stats(ctx->cfg->composites_manager, &comp_stats); - - comp_obj = ucl_object_typed_new(UCL_OBJECT); - ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.checked_slow), - "checked_slow", 0, false); - ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.checked_fast), - "checked_fast", 0, false); - ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.matched), - "matched", 0, false); - - time_obj = ucl_object_typed_new(UCL_OBJECT); - ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_slow_mean), - "mean", 0, false); - ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_slow_stddev), - "stddev", 0, false); - ucl_object_insert_key(time_obj, ucl_object_fromint(comp_stats.time_slow_count), - "count", 0, false); - ucl_object_insert_key(comp_obj, time_obj, "time_slow_ms", 0, false); - - time_obj = ucl_object_typed_new(UCL_OBJECT); - ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_fast_mean), - "mean", 0, false); - ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_fast_stddev), - "stddev", 0, false); - ucl_object_insert_key(time_obj, ucl_object_fromint(comp_stats.time_fast_count), - "count", 0, false); - ucl_object_insert_key(comp_obj, time_obj, "time_fast_ms", 0, false); - - ucl_object_insert_key(comp_obj, - ucl_object_frombool(rspamd_composites_get_inverted_index(ctx->cfg->composites_manager)), - "inverted_index_enabled", 0, false); - - ucl_object_insert_key(top, comp_obj, "composites", 0, false); - } + /* + * Composites statistics are available via control socket /compositesstats + * which aggregates data from all workers + */ if (do_reset) { session->ctx->srv->stat->messages_scanned = 0; diff --git a/src/libserver/composites/composites.cxx b/src/libserver/composites/composites.cxx index 608b16a3a0..1670a9162a 100644 --- a/src/libserver/composites/composites.cxx +++ b/src/libserver/composites/composites.cxx @@ -87,10 +87,11 @@ struct composites_data { std::vector> symbols_to_remove; std::vector checked; - bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */ + bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */ + uint64_t matched_count{}; /**< number of matched composites */ explicit composites_data(struct rspamd_task *task, struct rspamd_scan_result *mres) - : task(task), composite(nullptr), metric_res(mres) + : task(task), composite(nullptr), metric_res(mres), matched_count(0) { checked.resize(rspamd_composites_manager_nelts(task->cfg->composites_manager) * 2, false); @@ -867,6 +868,7 @@ composites_foreach_callback(gpointer key, gpointer value, void *data) /* Result bit */ if (fabs(rc) > epsilon) { cd->checked[comp->id * 2 + 1] = true; + cd->matched_count++; rspamd_task_insert_result_full(cd->task, str_key, 1.0, NULL, RSPAMD_SYMBOL_INSERT_SINGLE, cd->metric_res); } @@ -981,7 +983,7 @@ composites_metric_callback(struct rspamd_task *task) bool is_second_pass = (task->processed_stages & RSPAMD_TASK_STAGE_POST_FILTERS) != 0; bool use_fast_path = cm->use_inverted_index && !is_second_pass; - /* Probabilistic sampling for timing measurements (unless always_sample is set in config) */ + /* Probabilistic sampling for timing measurements (unless always_sample is set) */ bool do_sample = task->cfg->composites_stats_always || (rspamd_random_uint64_fast() & COMPOSITES_SAMPLING_MASK) == 0; ev_tstamp start_time = 0.0; @@ -1067,6 +1069,12 @@ composites_metric_callback(struct rspamd_task *task) } /* Update statistics */ + uint64_t total_matched = 0; + for (const auto &cd: comp_data_vec) { + total_matched += cd.matched_count; + } + cm->stats.matched += total_matched; + if (use_fast_path) { cm->stats.checked_fast += composites_checked; } @@ -1074,7 +1082,7 @@ composites_metric_callback(struct rspamd_task *task) cm->stats.checked_slow += composites_checked; } - /* Record timing with EMA */ + /* Record timing with EMA (probabilistic sampling unless always_sample is set) */ if (do_sample && task->event_loop) { ev_now_update_if_cheap(task->event_loop); ev_tstamp elapsed_ms = (ev_now(task->event_loop) - start_time) * 1000.0; diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 8b43ddec70..a9e2f7366b 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -22,6 +22,7 @@ #include "libutil/libev_helper.h" #include "unix-std.h" #include "utlist.h" +#include "composites/composites.h" #ifdef HAVE_SYS_RESOURCE_H #include @@ -82,6 +83,7 @@ static const struct rspamd_control_cmd_match { {.name = {.begin = "/recompile", .len = sizeof("/recompile") - 1}, .type = RSPAMD_CONTROL_RECOMPILE}, {.name = {.begin = "/fuzzystat", .len = sizeof("/fuzzystat") - 1}, .type = RSPAMD_CONTROL_FUZZY_STAT}, {.name = {.begin = "/fuzzysync", .len = sizeof("/fuzzysync") - 1}, .type = RSPAMD_CONTROL_FUZZY_SYNC}, + {.name = {.begin = "/compositesstats", .len = sizeof("/compositesstats") - 1}, .type = RSPAMD_CONTROL_COMPOSITES_STATS}, }; static void rspamd_control_ignore_io_handler(int fd, short what, void *ud); @@ -170,6 +172,8 @@ rspamd_control_write_reply(struct rspamd_control_session *session) double total_utime = 0, total_systime = 0; struct ucl_parser *parser; unsigned int total_conns = 0; + /* Composites stats aggregation */ + uint64_t total_checked_slow = 0, total_checked_fast = 0, total_matched = 0; rep = ucl_object_typed_new(UCL_OBJECT); workers = ucl_object_typed_new(UCL_OBJECT); @@ -259,6 +263,26 @@ rspamd_control_write_reply(struct rspamd_control_session *session) case RSPAMD_CONTROL_FUZZY_SYNC: ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.fuzzy_sync.status), "status", 0, false); break; + case RSPAMD_CONTROL_COMPOSITES_STATS: + ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.checked_slow), + "checked_slow", 0, false); + ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.checked_fast), + "checked_fast", 0, false); + ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.matched), + "matched", 0, false); + ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_slow_mean), + "time_slow_mean", 0, false); + ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_slow_stddev), + "time_slow_stddev", 0, false); + ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_fast_mean), + "time_fast_mean", 0, false); + ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_fast_stddev), + "time_fast_stddev", 0, false); + + total_checked_slow += elt->reply.reply.composites_stats.checked_slow; + total_checked_fast += elt->reply.reply.composites_stats.checked_fast; + total_matched += elt->reply.reply.composites_stats.matched; + break; default: break; } @@ -282,6 +306,15 @@ rspamd_control_write_reply(struct rspamd_control_session *session) ucl_object_insert_key(rep, cur, "total", 0, false); } + else if (session->cmd.type == RSPAMD_CONTROL_COMPOSITES_STATS) { + /* Total composites stats */ + cur = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(cur, ucl_object_fromint(total_checked_slow), "checked_slow", 0, false); + ucl_object_insert_key(cur, ucl_object_fromint(total_checked_fast), "checked_fast", 0, false); + ucl_object_insert_key(cur, ucl_object_fromint(total_matched), "matched", 0, false); + + ucl_object_insert_key(rep, cur, "total", 0, false); + } rspamd_control_send_ucl(session, rep); ucl_object_unref(rep); @@ -731,6 +764,19 @@ rspamd_control_default_cmd_handler(int fd, case RSPAMD_CONTROL_WORKERS_SPAWNED: rep.reply.workers_spawned.status = 0; break; + case RSPAMD_CONTROL_COMPOSITES_STATS: + if (cd->worker->srv->cfg && cd->worker->srv->cfg->composites_manager) { + struct rspamd_composites_stats_export comp_stats; + rspamd_composites_get_stats(cd->worker->srv->cfg->composites_manager, &comp_stats); + rep.reply.composites_stats.checked_slow = comp_stats.checked_slow; + rep.reply.composites_stats.checked_fast = comp_stats.checked_fast; + rep.reply.composites_stats.matched = comp_stats.matched; + rep.reply.composites_stats.time_slow_mean = comp_stats.time_slow_mean; + rep.reply.composites_stats.time_slow_stddev = comp_stats.time_slow_stddev; + rep.reply.composites_stats.time_fast_mean = comp_stats.time_fast_mean; + rep.reply.composites_stats.time_fast_stddev = comp_stats.time_fast_stddev; + } + break; case RSPAMD_CONTROL_RERESOLVE: if (cd->worker->srv->cfg) { REF_RETAIN(cd->worker->srv->cfg); @@ -1432,6 +1478,9 @@ rspamd_control_command_from_string(const char *str) else if (g_ascii_strcasecmp(str, "workers_spawned") == 0) { ret = RSPAMD_CONTROL_WORKERS_SPAWNED; } + else if (g_ascii_strcasecmp(str, "composites_stats") == 0) { + ret = RSPAMD_CONTROL_COMPOSITES_STATS; + } return ret; } @@ -1475,6 +1524,9 @@ rspamd_control_command_to_string(enum rspamd_control_type cmd) case RSPAMD_CONTROL_WORKERS_SPAWNED: reply = "workers_spawned"; break; + case RSPAMD_CONTROL_COMPOSITES_STATS: + reply = "composites_stats"; + break; default: break; } diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index 81603cab2b..247a6b235d 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -38,6 +38,7 @@ enum rspamd_control_type { RSPAMD_CONTROL_CHILD_CHANGE, RSPAMD_CONTROL_FUZZY_BLOCKED, RSPAMD_CONTROL_WORKERS_SPAWNED, + RSPAMD_CONTROL_COMPOSITES_STATS, RSPAMD_CONTROL_MAX }; @@ -112,6 +113,9 @@ struct rspamd_control_command { struct { unsigned int workers_count; } workers_spawned; + struct { + unsigned int unused; + } composites_stats; } cmd; }; @@ -156,6 +160,15 @@ struct rspamd_control_reply { struct { unsigned int status; } workers_spawned; + struct { + uint64_t checked_slow; /**< composites checked via slow path */ + uint64_t checked_fast; /**< composites checked via inverted index */ + uint64_t matched; /**< composites that matched */ + double time_slow_mean; /**< EMA mean time in slow path (ms) */ + double time_slow_stddev; /**< EMA stddev time in slow path (ms) */ + double time_fast_mean; /**< EMA mean time in fast path (ms) */ + double time_fast_stddev; /**< EMA stddev time in fast path (ms) */ + } composites_stats; } reply; }; diff --git a/src/rspamadm/control.c b/src/rspamadm/control.c index cd550c04ee..43e781aa7b 100644 --- a/src/rspamadm/control.c +++ b/src/rspamadm/control.c @@ -84,7 +84,8 @@ rspamadm_control_help(gboolean full_help, const struct rspamadm_command *cmd) "reresolve - resolve upstreams addresses\n" "recompile - recompile hyperscan regexes\n" "fuzzystat - show fuzzy statistics\n" - "fuzzysync - immediately sync fuzzy database to storage\n"; + "fuzzysync - immediately sync fuzzy database to storage\n" + "compositesstats - show composites processing statistics\n"; } else { help_str = "Manage rspamd main control interface"; @@ -215,6 +216,10 @@ rspamadm_control(int argc, char **argv, const struct rspamadm_command *_cmd) g_ascii_strcasecmp(cmd, "fuzzy_sync") == 0) { path = "/fuzzysync"; } + else if (g_ascii_strcasecmp(cmd, "compositesstats") == 0 || + g_ascii_strcasecmp(cmd, "composites_stats") == 0) { + path = "/compositesstats"; + } else { rspamd_fprintf(stderr, "unknown command: %s\n", cmd); exit(EXIT_FAILURE);