#include "libmime/lang_detection.h"
#include "mempool_vars_internal.h"
#include "lua/lua_classnames.h"
-#include "libserver/composites/composites.h"
#include <math.h>
/* 60 seconds for worker's IO */
ucl_object_insert_key(top,
ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false);
- /* Composites statistics */
- if (ctx->cfg->composites_manager) {
- struct rspamd_composites_stats_export comp_stats;
- ucl_object_t *comp_obj, *time_obj;
-
- rspamd_composites_get_stats(ctx->cfg->composites_manager, &comp_stats);
-
- comp_obj = ucl_object_typed_new(UCL_OBJECT);
- ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.checked_slow),
- "checked_slow", 0, false);
- ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.checked_fast),
- "checked_fast", 0, false);
- ucl_object_insert_key(comp_obj, ucl_object_fromint(comp_stats.matched),
- "matched", 0, false);
-
- time_obj = ucl_object_typed_new(UCL_OBJECT);
- ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_slow_mean),
- "mean", 0, false);
- ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_slow_stddev),
- "stddev", 0, false);
- ucl_object_insert_key(time_obj, ucl_object_fromint(comp_stats.time_slow_count),
- "count", 0, false);
- ucl_object_insert_key(comp_obj, time_obj, "time_slow_ms", 0, false);
-
- time_obj = ucl_object_typed_new(UCL_OBJECT);
- ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_fast_mean),
- "mean", 0, false);
- ucl_object_insert_key(time_obj, ucl_object_fromdouble(comp_stats.time_fast_stddev),
- "stddev", 0, false);
- ucl_object_insert_key(time_obj, ucl_object_fromint(comp_stats.time_fast_count),
- "count", 0, false);
- ucl_object_insert_key(comp_obj, time_obj, "time_fast_ms", 0, false);
-
- ucl_object_insert_key(comp_obj,
- ucl_object_frombool(rspamd_composites_get_inverted_index(ctx->cfg->composites_manager)),
- "inverted_index_enabled", 0, false);
-
- ucl_object_insert_key(top, comp_obj, "composites", 0, false);
- }
+ /*
+ * Composites statistics are available via control socket /compositesstats
+ * which aggregates data from all workers
+ */
if (do_reset) {
session->ctx->srv->stat->messages_scanned = 0;
std::vector<symbol_remove_data>>
symbols_to_remove;
std::vector<bool> checked;
- bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */
+ bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */
+ uint64_t matched_count{}; /**< number of matched composites */
explicit composites_data(struct rspamd_task *task, struct rspamd_scan_result *mres)
- : task(task), composite(nullptr), metric_res(mres)
+ : task(task), composite(nullptr), metric_res(mres), matched_count(0)
{
checked.resize(rspamd_composites_manager_nelts(task->cfg->composites_manager) * 2,
false);
/* Result bit */
if (fabs(rc) > epsilon) {
cd->checked[comp->id * 2 + 1] = true;
+ cd->matched_count++;
rspamd_task_insert_result_full(cd->task, str_key, 1.0, NULL,
RSPAMD_SYMBOL_INSERT_SINGLE, cd->metric_res);
}
bool is_second_pass = (task->processed_stages & RSPAMD_TASK_STAGE_POST_FILTERS) != 0;
bool use_fast_path = cm->use_inverted_index && !is_second_pass;
- /* Probabilistic sampling for timing measurements (unless always_sample is set in config) */
+ /* Probabilistic sampling for timing measurements (unless always_sample is set) */
bool do_sample = task->cfg->composites_stats_always ||
(rspamd_random_uint64_fast() & COMPOSITES_SAMPLING_MASK) == 0;
ev_tstamp start_time = 0.0;
}
/* Update statistics */
+ uint64_t total_matched = 0;
+ for (const auto &cd: comp_data_vec) {
+ total_matched += cd.matched_count;
+ }
+ cm->stats.matched += total_matched;
+
if (use_fast_path) {
cm->stats.checked_fast += composites_checked;
}
cm->stats.checked_slow += composites_checked;
}
- /* Record timing with EMA */
+ /* Record timing with EMA (probabilistic sampling unless always_sample is set) */
if (do_sample && task->event_loop) {
ev_now_update_if_cheap(task->event_loop);
ev_tstamp elapsed_ms = (ev_now(task->event_loop) - start_time) * 1000.0;
#include "libutil/libev_helper.h"
#include "unix-std.h"
#include "utlist.h"
+#include "composites/composites.h"
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
{.name = {.begin = "/recompile", .len = sizeof("/recompile") - 1}, .type = RSPAMD_CONTROL_RECOMPILE},
{.name = {.begin = "/fuzzystat", .len = sizeof("/fuzzystat") - 1}, .type = RSPAMD_CONTROL_FUZZY_STAT},
{.name = {.begin = "/fuzzysync", .len = sizeof("/fuzzysync") - 1}, .type = RSPAMD_CONTROL_FUZZY_SYNC},
+ {.name = {.begin = "/compositesstats", .len = sizeof("/compositesstats") - 1}, .type = RSPAMD_CONTROL_COMPOSITES_STATS},
};
static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
double total_utime = 0, total_systime = 0;
struct ucl_parser *parser;
unsigned int total_conns = 0;
+ /* Composites stats aggregation */
+ uint64_t total_checked_slow = 0, total_checked_fast = 0, total_matched = 0;
rep = ucl_object_typed_new(UCL_OBJECT);
workers = ucl_object_typed_new(UCL_OBJECT);
case RSPAMD_CONTROL_FUZZY_SYNC:
ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.fuzzy_sync.status), "status", 0, false);
break;
+ case RSPAMD_CONTROL_COMPOSITES_STATS:
+ ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.checked_slow),
+ "checked_slow", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.checked_fast),
+ "checked_fast", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.composites_stats.matched),
+ "matched", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_slow_mean),
+ "time_slow_mean", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_slow_stddev),
+ "time_slow_stddev", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_fast_mean),
+ "time_fast_mean", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.composites_stats.time_fast_stddev),
+ "time_fast_stddev", 0, false);
+
+ total_checked_slow += elt->reply.reply.composites_stats.checked_slow;
+ total_checked_fast += elt->reply.reply.composites_stats.checked_fast;
+ total_matched += elt->reply.reply.composites_stats.matched;
+ break;
default:
break;
}
ucl_object_insert_key(rep, cur, "total", 0, false);
}
+ else if (session->cmd.type == RSPAMD_CONTROL_COMPOSITES_STATS) {
+ /* Total composites stats */
+ cur = ucl_object_typed_new(UCL_OBJECT);
+ ucl_object_insert_key(cur, ucl_object_fromint(total_checked_slow), "checked_slow", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromint(total_checked_fast), "checked_fast", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromint(total_matched), "matched", 0, false);
+
+ ucl_object_insert_key(rep, cur, "total", 0, false);
+ }
rspamd_control_send_ucl(session, rep);
ucl_object_unref(rep);
case RSPAMD_CONTROL_WORKERS_SPAWNED:
rep.reply.workers_spawned.status = 0;
break;
+ case RSPAMD_CONTROL_COMPOSITES_STATS:
+ if (cd->worker->srv->cfg && cd->worker->srv->cfg->composites_manager) {
+ struct rspamd_composites_stats_export comp_stats;
+ rspamd_composites_get_stats(cd->worker->srv->cfg->composites_manager, &comp_stats);
+ rep.reply.composites_stats.checked_slow = comp_stats.checked_slow;
+ rep.reply.composites_stats.checked_fast = comp_stats.checked_fast;
+ rep.reply.composites_stats.matched = comp_stats.matched;
+ rep.reply.composites_stats.time_slow_mean = comp_stats.time_slow_mean;
+ rep.reply.composites_stats.time_slow_stddev = comp_stats.time_slow_stddev;
+ rep.reply.composites_stats.time_fast_mean = comp_stats.time_fast_mean;
+ rep.reply.composites_stats.time_fast_stddev = comp_stats.time_fast_stddev;
+ }
+ break;
case RSPAMD_CONTROL_RERESOLVE:
if (cd->worker->srv->cfg) {
REF_RETAIN(cd->worker->srv->cfg);
else if (g_ascii_strcasecmp(str, "workers_spawned") == 0) {
ret = RSPAMD_CONTROL_WORKERS_SPAWNED;
}
+ else if (g_ascii_strcasecmp(str, "composites_stats") == 0) {
+ ret = RSPAMD_CONTROL_COMPOSITES_STATS;
+ }
return ret;
}
case RSPAMD_CONTROL_WORKERS_SPAWNED:
reply = "workers_spawned";
break;
+ case RSPAMD_CONTROL_COMPOSITES_STATS:
+ reply = "composites_stats";
+ break;
default:
break;
}
RSPAMD_CONTROL_CHILD_CHANGE,
RSPAMD_CONTROL_FUZZY_BLOCKED,
RSPAMD_CONTROL_WORKERS_SPAWNED,
+ RSPAMD_CONTROL_COMPOSITES_STATS,
RSPAMD_CONTROL_MAX
};
struct {
unsigned int workers_count;
} workers_spawned;
+ struct {
+ unsigned int unused;
+ } composites_stats;
} cmd;
};
struct {
unsigned int status;
} workers_spawned;
+ struct {
+ uint64_t checked_slow; /**< composites checked via slow path */
+ uint64_t checked_fast; /**< composites checked via inverted index */
+ uint64_t matched; /**< composites that matched */
+ double time_slow_mean; /**< EMA mean time in slow path (ms) */
+ double time_slow_stddev; /**< EMA stddev time in slow path (ms) */
+ double time_fast_mean; /**< EMA mean time in fast path (ms) */
+ double time_fast_stddev; /**< EMA stddev time in fast path (ms) */
+ } composites_stats;
} reply;
};
"reresolve - resolve upstreams addresses\n"
"recompile - recompile hyperscan regexes\n"
"fuzzystat - show fuzzy statistics\n"
- "fuzzysync - immediately sync fuzzy database to storage\n";
+ "fuzzysync - immediately sync fuzzy database to storage\n"
+ "compositesstats - show composites processing statistics\n";
}
else {
help_str = "Manage rspamd main control interface";
g_ascii_strcasecmp(cmd, "fuzzy_sync") == 0) {
path = "/fuzzysync";
}
+ else if (g_ascii_strcasecmp(cmd, "compositesstats") == 0 ||
+ g_ascii_strcasecmp(cmd, "composites_stats") == 0) {
+ path = "/compositesstats";
+ }
else {
rspamd_fprintf(stderr, "unknown command: %s\n", cmd);
exit(EXIT_FAILURE);