]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add control protocol command for composites statistics 5764/head
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 27 Nov 2025 10:39:54 +0000 (10:39 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 27 Nov 2025 10:39:54 +0000 (10:39 +0000)
- Add RSPAMD_CONTROL_COMPOSITES_STATS command to control protocol
- Add /compositesstats endpoint to control socket
- Add 'rspamadm control compositesstats' command
- Aggregate statistics from all workers with per-worker breakdown
- Remove composites stats from controller /stat (use control socket instead)
- Statistics always collected, timing sampled 1/256 (configurable)

src/controller.c
src/libserver/composites/composites.cxx
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/rspamadm/control.c

index 8f7a298a77c70ec3069680c48b2c98cf99fe14e3..d3672339c1d0fd01f3ca23eb5bbae1e7486df6f0 100644 (file)
@@ -35,7 +35,6 @@
 #include "libmime/lang_detection.h"
 #include "mempool_vars_internal.h"
 #include "lua/lua_classnames.h"
-#include "libserver/composites/composites.h"
 #include <math.h>
 
 /* 60 seconds for worker's IO */
@@ -2917,45 +2916,10 @@ rspamd_controller_handle_stat_common(
        ucl_object_insert_key(top,
                                                  ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false);
 
-       /* Composites statistics */
-       if (ctx->cfg->composites_manager) {
-               struct rspamd_composites_stats_export comp_stats;
-               ucl_object_t *comp_obj, *time_obj;
-
-               rspamd_composites_get_stats(ctx->cfg->composites_manager, &comp_stats);
-
-               comp_obj = ucl_object_typed_new(UCL_OBJECT);
-               ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.checked_slow),
-                                                         "checked_slow", 0, false);
-               ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.checked_fast),
-                                                         "checked_fast", 0, false);
-               ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.matched),
-                                                         "matched", 0, false);
-
-               time_obj = ucl_object_typed_new(UCL_OBJECT);
-               ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_slow_mean),
-                                                         "mean", 0, false);
-               ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_slow_stddev),
-                                                         "stddev", 0, false);
-               ucl_object_insert_key(time_obj, ucl_object_fromint(comp_stats.time_slow_count),
-                                                         "count", 0, false);
-               ucl_object_insert_key(comp_obj, time_obj, "time_slow_ms", 0, false);
-
-               time_obj = ucl_object_typed_new(UCL_OBJECT);
-               ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_fast_mean),
-                                                         "mean", 0, false);
-               ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_fast_stddev),
-                                                         "stddev", 0, false);
-               ucl_object_insert_key(time_obj, ucl_object_fromint(comp_stats.time_fast_count),
-                                                         "count", 0, false);
-               ucl_object_insert_key(comp_obj, time_obj, "time_fast_ms", 0, false);
-
-               ucl_object_insert_key(comp_obj,
-                                                         ucl_object_frombool(rspamd_composites_get_inverted_index(ctx->cfg->composites_manager)),
-                                                         "inverted_index_enabled", 0, false);
-
-               ucl_object_insert_key(top, comp_obj, "composites", 0, false);
-       }
+       /*
+        * Composites statistics are available via control socket /compositesstats
+        * which aggregates data from all workers
+        */
 
        if (do_reset) {
                session->ctx->srv->stat->messages_scanned = 0;
index 608b16a3a027e89d89b0a98f882b37f25592bb58..1670a9162a86f8af9f4205b77289d5daef710c61 100644 (file)
@@ -87,10 +87,11 @@ struct composites_data {
                                                                 std::vector<symbol_remove_data>>
                symbols_to_remove;
        std::vector<bool> checked;
-       bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */
+       bool is_second_pass;      /**< true if we're in COMPOSITES_POST stage */
+       uint64_t matched_count{}; /**< number of matched composites */
 
        explicit composites_data(struct rspamd_task *task, struct rspamd_scan_result *mres)
-               : task(task), composite(nullptr), metric_res(mres)
+               : task(task), composite(nullptr), metric_res(mres), matched_count(0)
        {
                checked.resize(rspamd_composites_manager_nelts(task->cfg->composites_manager) * 2,
                                           false);
@@ -867,6 +868,7 @@ composites_foreach_callback(gpointer key, gpointer value, void *data)
                        /* Result bit */
                        if (fabs(rc) > epsilon) {
                                cd->checked[comp->id * 2 + 1] = true;
+                               cd->matched_count++;
                                rspamd_task_insert_result_full(cd->task, str_key, 1.0, NULL,
                                                                                           RSPAMD_SYMBOL_INSERT_SINGLE, cd->metric_res);
                        }
@@ -981,7 +983,7 @@ composites_metric_callback(struct rspamd_task *task)
        bool is_second_pass = (task->processed_stages & RSPAMD_TASK_STAGE_POST_FILTERS) != 0;
        bool use_fast_path = cm->use_inverted_index && !is_second_pass;
 
-       /* Probabilistic sampling for timing measurements (unless always_sample is set in config) */
+       /* Probabilistic sampling for timing measurements (unless always_sample is set) */
        bool do_sample = task->cfg->composites_stats_always ||
                                         (rspamd_random_uint64_fast() & COMPOSITES_SAMPLING_MASK) == 0;
        ev_tstamp start_time = 0.0;
@@ -1067,6 +1069,12 @@ composites_metric_callback(struct rspamd_task *task)
        }
 
        /* Update statistics */
+       uint64_t total_matched = 0;
+       for (const auto &cd: comp_data_vec) {
+               total_matched += cd.matched_count;
+       }
+       cm->stats.matched += total_matched;
+
        if (use_fast_path) {
                cm->stats.checked_fast += composites_checked;
        }
@@ -1074,7 +1082,7 @@ composites_metric_callback(struct rspamd_task *task)
                cm->stats.checked_slow += composites_checked;
        }
 
-       /* Record timing with EMA */
+       /* Record timing with EMA (probabilistic sampling unless always_sample is set) */
        if (do_sample && task->event_loop) {
                ev_now_update_if_cheap(task->event_loop);
                ev_tstamp elapsed_ms = (ev_now(task->event_loop) - start_time) * 1000.0;
index 8b43ddec701233b5f45e1585a935d785c0fd57a3..a9e2f7366b21b37f50fa23b4fd35f04e07615b8a 100644 (file)
@@ -22,6 +22,7 @@
 #include "libutil/libev_helper.h"
 #include "unix-std.h"
 #include "utlist.h"
+#include "composites/composites.h"
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>
@@ -82,6 +83,7 @@ static const struct rspamd_control_cmd_match {
        {.name = {.begin = "/recompile", .len = sizeof("/recompile") - 1}, .type = RSPAMD_CONTROL_RECOMPILE},
        {.name = {.begin = "/fuzzystat", .len = sizeof("/fuzzystat") - 1}, .type = RSPAMD_CONTROL_FUZZY_STAT},
        {.name = {.begin = "/fuzzysync", .len = sizeof("/fuzzysync") - 1}, .type = RSPAMD_CONTROL_FUZZY_SYNC},
+       {.name = {.begin = "/compositesstats", .len = sizeof("/compositesstats") - 1}, .type = RSPAMD_CONTROL_COMPOSITES_STATS},
 };
 
 static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
@@ -170,6 +172,8 @@ rspamd_control_write_reply(struct rspamd_control_session *session)
        double total_utime = 0, total_systime = 0;
        struct ucl_parser *parser;
        unsigned int total_conns = 0;
+       /* Composites stats aggregation */
+       uint64_t total_checked_slow = 0, total_checked_fast = 0, total_matched = 0;
 
        rep = ucl_object_typed_new(UCL_OBJECT);
        workers = ucl_object_typed_new(UCL_OBJECT);
@@ -259,6 +263,26 @@ rspamd_control_write_reply(struct rspamd_control_session *session)
                case RSPAMD_CONTROL_FUZZY_SYNC:
                        ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.fuzzy_sync.status), "status", 0, false);
                        break;
+               case RSPAMD_CONTROL_COMPOSITES_STATS:
+                       ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.checked_slow),
+                                                                 "checked_slow", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.checked_fast),
+                                                                 "checked_fast", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.matched),
+                                                                 "matched", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_slow_mean),
+                                                                 "time_slow_mean", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_slow_stddev),
+                                                                 "time_slow_stddev", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_fast_mean),
+                                                                 "time_fast_mean", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_fast_stddev),
+                                                                 "time_fast_stddev", 0, false);
+
+                       total_checked_slow += elt->reply.reply.composites_stats.checked_slow;
+                       total_checked_fast += elt->reply.reply.composites_stats.checked_fast;
+                       total_matched += elt->reply.reply.composites_stats.matched;
+                       break;
                default:
                        break;
                }
@@ -282,6 +306,15 @@ rspamd_control_write_reply(struct rspamd_control_session *session)
 
                ucl_object_insert_key(rep, cur, "total", 0, false);
        }
+       else if (session->cmd.type == RSPAMD_CONTROL_COMPOSITES_STATS) {
+               /* Total composites stats */
+               cur = ucl_object_typed_new(UCL_OBJECT);
+               ucl_object_insert_key(cur, ucl_object_fromint(total_checked_slow), "checked_slow", 0, false);
+               ucl_object_insert_key(cur, ucl_object_fromint(total_checked_fast), "checked_fast", 0, false);
+               ucl_object_insert_key(cur, ucl_object_fromint(total_matched), "matched", 0, false);
+
+               ucl_object_insert_key(rep, cur, "total", 0, false);
+       }
 
        rspamd_control_send_ucl(session, rep);
        ucl_object_unref(rep);
@@ -731,6 +764,19 @@ rspamd_control_default_cmd_handler(int fd,
        case RSPAMD_CONTROL_WORKERS_SPAWNED:
                rep.reply.workers_spawned.status = 0;
                break;
+       case RSPAMD_CONTROL_COMPOSITES_STATS:
+               if (cd->worker->srv->cfg && cd->worker->srv->cfg->composites_manager) {
+                       struct rspamd_composites_stats_export comp_stats;
+                       rspamd_composites_get_stats(cd->worker->srv->cfg->composites_manager, &comp_stats);
+                       rep.reply.composites_stats.checked_slow = comp_stats.checked_slow;
+                       rep.reply.composites_stats.checked_fast = comp_stats.checked_fast;
+                       rep.reply.composites_stats.matched = comp_stats.matched;
+                       rep.reply.composites_stats.time_slow_mean = comp_stats.time_slow_mean;
+                       rep.reply.composites_stats.time_slow_stddev = comp_stats.time_slow_stddev;
+                       rep.reply.composites_stats.time_fast_mean = comp_stats.time_fast_mean;
+                       rep.reply.composites_stats.time_fast_stddev = comp_stats.time_fast_stddev;
+               }
+               break;
        case RSPAMD_CONTROL_RERESOLVE:
                if (cd->worker->srv->cfg) {
                        REF_RETAIN(cd->worker->srv->cfg);
@@ -1432,6 +1478,9 @@ rspamd_control_command_from_string(const char *str)
        else if (g_ascii_strcasecmp(str, "workers_spawned") == 0) {
                ret = RSPAMD_CONTROL_WORKERS_SPAWNED;
        }
+       else if (g_ascii_strcasecmp(str, "composites_stats") == 0) {
+               ret = RSPAMD_CONTROL_COMPOSITES_STATS;
+       }
 
        return ret;
 }
@@ -1475,6 +1524,9 @@ rspamd_control_command_to_string(enum rspamd_control_type cmd)
        case RSPAMD_CONTROL_WORKERS_SPAWNED:
                reply = "workers_spawned";
                break;
+       case RSPAMD_CONTROL_COMPOSITES_STATS:
+               reply = "composites_stats";
+               break;
        default:
                break;
        }
index 81603cab2b7d85fb5a08c63c6093937338759dea..247a6b235d7a6c64f8fed58eaf35232b29886a2e 100644 (file)
@@ -38,6 +38,7 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_CHILD_CHANGE,
        RSPAMD_CONTROL_FUZZY_BLOCKED,
        RSPAMD_CONTROL_WORKERS_SPAWNED,
+       RSPAMD_CONTROL_COMPOSITES_STATS,
        RSPAMD_CONTROL_MAX
 };
 
@@ -112,6 +113,9 @@ struct rspamd_control_command {
                struct {
                        unsigned int workers_count;
                } workers_spawned;
+               struct {
+                       unsigned int unused;
+               } composites_stats;
        } cmd;
 };
 
@@ -156,6 +160,15 @@ struct rspamd_control_reply {
                struct {
                        unsigned int status;
                } workers_spawned;
+               struct {
+                       uint64_t checked_slow;   /**< composites checked via slow path */
+                       uint64_t checked_fast;   /**< composites checked via inverted index */
+                       uint64_t matched;        /**< composites that matched */
+                       double time_slow_mean;   /**< EMA mean time in slow path (ms) */
+                       double time_slow_stddev; /**< EMA stddev time in slow path (ms) */
+                       double time_fast_mean;   /**< EMA mean time in fast path (ms) */
+                       double time_fast_stddev; /**< EMA stddev time in fast path (ms) */
+               } composites_stats;
        } reply;
 };
 
index cd550c04eec98c5eecb1efeba86d219ce95f5b8e..43e781aa7b871c22e38026b1b54e146bccfd9865 100644 (file)
@@ -84,7 +84,8 @@ rspamadm_control_help(gboolean full_help, const struct rspamadm_command *cmd)
                                   "reresolve - resolve upstreams addresses\n"
                                   "recompile - recompile hyperscan regexes\n"
                                   "fuzzystat - show fuzzy statistics\n"
-                                  "fuzzysync - immediately sync fuzzy database to storage\n";
+                                  "fuzzysync - immediately sync fuzzy database to storage\n"
+                                  "compositesstats - show composites processing statistics\n";
        }
        else {
                help_str = "Manage rspamd main control interface";
@@ -215,6 +216,10 @@ rspamadm_control(int argc, char **argv, const struct rspamadm_command *_cmd)
                         g_ascii_strcasecmp(cmd, "fuzzy_sync") == 0) {
                path = "/fuzzysync";
        }
+       else if (g_ascii_strcasecmp(cmd, "compositesstats") == 0 ||
+                        g_ascii_strcasecmp(cmd, "composites_stats") == 0) {
+               path = "/compositesstats";
+       }
        else {
                rspamd_fprintf(stderr, "unknown command: %s\n", cmd);
                exit(EXIT_FAILURE);