From ac8d5173cce870872aec7c54a42a2f379d3f6719 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 14 Oct 2025 14:59:01 +0100 Subject: [PATCH] [Fix] Implement two-phase composite evaluation for postfilter dependencies Fixes #5674 where composite rules combining postfilter/statistics symbols with regular filter symbols failed to trigger. Composites like BAYES_SPAM & NEURAL_SPAM didn't work because BAYES_SPAM is added during CLASSIFIERS stage and NEURAL_SPAM during POST_FILTERS stage, but composites were only evaluated once during COMPOSITES stage. Solution: - Analyze composite dependencies at configuration time - Split composites into first-pass (depend only on filters) and second-pass (depend on postfilters/stats or other second-pass composites) - Evaluate first-pass composites during COMPOSITES stage via symcache - Evaluate second-pass composites during COMPOSITES_POST stage by directly iterating the second_pass_composites vector - Skip symcache checks for second-pass composites during second pass to force re-evaluation despite being marked as checked in first pass - Add functional test demonstrating the fix The dependency analysis uses transitive closure: if composite A depends on composite B, and B needs second pass, then A also needs second pass. --- src/libserver/cfg_utils.cxx | 5 + src/libserver/composites/composites.cxx | 46 +++++- src/libserver/composites/composites.h | 8 + .../composites/composites_internal.hxx | 13 +- .../composites/composites_manager.cxx | 145 ++++++++++++++++++ src/libserver/task.c | 9 +- .../109_composites_postfilter.robot | 16 ++ test/functional/configs/merged-local.conf | 7 + test/functional/configs/merged.conf | 3 + test/functional/lua/composites_postfilter.lua | 34 ++++ 10 files changed, 270 insertions(+), 16 deletions(-) create mode 100644 test/functional/cases/001_merged/109_composites_postfilter.robot create mode 100644 test/functional/lua/composites_postfilter.lua diff --git a/src/libserver/cfg_utils.cxx b/src/libserver/cfg_utils.cxx index 87011432a2..3ae7dbdfc5 100644 --- a/src/libserver/cfg_utils.cxx +++ b/src/libserver/cfg_utils.cxx @@ -1004,6 +1004,11 @@ rspamd_config_post_load(struct rspamd_config *cfg, if (hs_ret == RSPAMD_HYPERSCAN_LOAD_ERROR) { msg_debug_config("cannot load hyperscan database, disable it"); } + + /* Process composite dependencies after symcache is initialized */ + if (cfg->composites_manager && rspamd_composites_manager_nelts(cfg->composites_manager) > 0) { + rspamd_composites_process_deps(cfg->composites_manager, cfg); + } } if (opts & RSPAMD_CONFIG_INIT_LIBS) { diff --git a/src/libserver/composites/composites.cxx b/src/libserver/composites/composites.cxx index 0e7adfb8ae..c9e11649aa 100644 --- a/src/libserver/composites/composites.cxx +++ b/src/libserver/composites/composites.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,12 +85,15 @@ struct composites_data { std::vector> symbols_to_remove; std::vector checked; + bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */ explicit composites_data(struct rspamd_task *task, struct rspamd_scan_result *mres) : task(task), composite(nullptr), metric_res(mres) { checked.resize(rspamd_composites_manager_nelts(task->cfg->composites_manager) * 2, false); + /* Determine if we're in second pass by checking if POST_FILTERS stage has been processed */ + is_second_pass = (task->processed_stages & RSPAMD_TASK_STAGE_POST_FILTERS) != 0; } }; @@ -808,11 +811,24 @@ composites_foreach_callback(gpointer key, gpointer value, void *data) cd->composite = comp; task = cd->task; + /* Skip composites that don't belong to current pass */ + if (cd->is_second_pass != comp->second_pass) { + msg_debug_composites("skip composite %s (pass mismatch: current=%s, composite=%s)", + str_key, + cd->is_second_pass ? "second" : "first", + comp->second_pass ? "second" : "first"); + return; + } + msg_debug_composites("process composite %s", str_key); if (!cd->checked[cd->composite->id * 2]) { - if (rspamd_symcache_is_checked(cd->task, cd->task->cfg->cache, - str_key)) { + /* For second-pass composites during second pass, skip symcache check since they were + * already marked as checked during first pass but not actually evaluated */ + bool skip_symcache_check = (cd->is_second_pass && comp->second_pass); + + if (!skip_symcache_check && + rspamd_symcache_is_checked(cd->task, cd->task->cfg->cache, str_key)) { msg_debug_composites("composite %s is checked in symcache but not " "in composites bitfield", cd->composite->sym.c_str()); @@ -956,6 +972,8 @@ composites_metric_callback(struct rspamd_task *task) { std::vector comp_data_vec; struct rspamd_scan_result *mres; + auto *cm = COMPOSITE_MANAGER_FROM_PTR(task->cfg->composites_manager); + bool is_second_pass = (task->processed_stages & RSPAMD_TASK_STAGE_POST_FILTERS) != 0; comp_data_vec.reserve(1); @@ -963,11 +981,23 @@ composites_metric_callback(struct rspamd_task *task) { auto &cd = comp_data_vec.emplace_back(task, mres); - /* Process metric result */ - rspamd_symcache_composites_foreach(task, - task->cfg->cache, - composites_foreach_callback, - &cd); + if (is_second_pass) { + /* Second pass: process only second-pass composites directly from manager */ + msg_debug_composites("processing second-pass composites"); + for (auto *comp: cm->second_pass_composites) { + composites_foreach_callback((gpointer) comp->sym.c_str(), + (gpointer) comp, + &cd); + } + } + else { + /* First pass: use symcache iteration (will skip second-pass composites in callback) */ + msg_debug_composites("processing first-pass composites via symcache"); + rspamd_symcache_composites_foreach(task, + task->cfg->cache, + composites_foreach_callback, + &cd); + } } for (const auto &cd: comp_data_vec) { diff --git a/src/libserver/composites/composites.h b/src/libserver/composites/composites.h index 5d58029093..cff7d67de1 100644 --- a/src/libserver/composites/composites.h +++ b/src/libserver/composites/composites.h @@ -57,6 +57,14 @@ void *rspamd_composites_manager_add_from_ucl_silent(void *, const char *, const void *rspamd_composites_manager_add_from_string(void *, const char *, const char *); void *rspamd_composites_manager_add_from_string_silent(void *, const char *, const char *); +/** + * Process composite dependencies to split into first/second pass + * Should be called after symcache is finalized + * @param cm_ptr composites manager pointer + * @param cfg config structure + */ +void rspamd_composites_process_deps(void *cm_ptr, struct rspamd_config *cfg); + #ifdef __cplusplus } #endif diff --git a/src/libserver/composites/composites_internal.hxx b/src/libserver/composites/composites_internal.hxx index 2ae4219eba..09ff48e8d0 100644 --- a/src/libserver/composites/composites_internal.hxx +++ b/src/libserver/composites/composites_internal.hxx @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ struct rspamd_composite { struct rspamd_expression *expr; int id; rspamd_composite_policy policy; + bool second_pass; /**< true if this composite needs second pass evaluation */ }; #define COMPOSITE_MANAGER_FROM_PTR(ptr) (reinterpret_cast(ptr)) @@ -93,6 +94,7 @@ private: composite->id = all_composites.size() - 1; composite->str_expr = composite_expression; composite->sym = composite_name; + composite->second_pass = false; /* Initially all composites are first pass */ composites[composite->sym] = composite; @@ -104,7 +106,16 @@ private: composites; /* Store all composites here, even if we have duplicates */ std::vector> all_composites; + struct rspamd_config *cfg; + +public: + /* Two-phase evaluation: composites are split into first and second pass */ + std::vector first_pass_composites; /* Evaluated during COMPOSITES stage */ + std::vector second_pass_composites; /* Evaluated during COMPOSITES_POST stage */ + + /* Analyze composite dependencies and split into first/second pass vectors */ + void process_dependencies(); }; }// namespace rspamd::composites diff --git a/src/libserver/composites/composites_manager.cxx b/src/libserver/composites/composites_manager.cxx index c82448631f..cc340be4c2 100644 --- a/src/libserver/composites/composites_manager.cxx +++ b/src/libserver/composites/composites_manager.cxx @@ -331,4 +331,149 @@ bool rspamd_composites_add_map_handlers(const ucl_object_t *obj, struct rspamd_c } return true; +} + +namespace rspamd::composites { + +/* Helper to check if a symbol requires second pass evaluation */ +static bool +symbol_needs_second_pass(struct rspamd_config *cfg, const char *symbol_name) +{ + if (!cfg->cache) { + return false; + } + + auto flags = rspamd_symcache_get_symbol_flags(cfg->cache, symbol_name); + + /* Postfilters and classifiers/statistics symbols require second pass */ + return (flags & (SYMBOL_TYPE_POSTFILTER | SYMBOL_TYPE_CLASSIFIER | SYMBOL_TYPE_NOSTAT)) != 0; +} + +/* Callback data for walking expression atoms to find symbol dependencies */ +struct composite_dep_cbdata { + struct rspamd_config *cfg; + bool needs_second_pass; + composites_manager *cm; +}; + +static void +composite_dep_callback(const rspamd_ftok_t *atom, gpointer ud) +{ + auto *cbd = reinterpret_cast(ud); + auto *cfg = cbd->cfg; + + if (cbd->needs_second_pass) { + /* Already marked, no need to continue */ + return; + } + + /* Convert atom to string */ + std::string_view atom_str(atom->begin, atom->len); + + /* Skip operators and special characters */ + if (atom->len == 0 || atom->begin[0] == '&' || atom->begin[0] == '|' || + atom->begin[0] == '!' || atom->begin[0] == '(' || atom->begin[0] == ')') { + return; + } + + /* Check if this is a reference to another composite */ + if (auto *dep_comp = cbd->cm->find(atom_str); dep_comp != nullptr) { + /* Dependency on another composite - will be handled in transitive pass */ + return; + } + + /* Check if the symbol itself needs second pass */ + if (symbol_needs_second_pass(cfg, atom->begin)) { + msg_debug_config("composite depends on second-pass symbol: %*s", + (int) atom->len, atom->begin); + cbd->needs_second_pass = true; + } +} + +void composites_manager::process_dependencies() +{ + ankerl::unordered_dense::set second_pass_set; + bool changed; + + msg_debug_config("analyzing composite dependencies for two-phase evaluation"); + + /* Initially, all composites start in first pass */ + for (const auto &comp: all_composites) { + first_pass_composites.push_back(comp.get()); + } + + /* First pass: mark composites that directly depend on postfilters/stats */ + for (auto *comp: first_pass_composites) { + composite_dep_cbdata cbd{cfg, false, this}; + + rspamd_expression_atom_foreach(comp->expr, + composite_dep_callback, + &cbd); + + if (cbd.needs_second_pass) { + second_pass_set.insert(comp); + msg_debug_config("composite '%s' marked for second pass (direct dependency)", + comp->sym.c_str()); + } + } + + /* Second pass: handle transitive dependencies */ + do { + changed = false; + for (auto *comp: first_pass_composites) { + if (second_pass_set.contains(comp)) { + continue; + } + + bool has_second_pass_dep = false; + + /* Helper struct for lambda capture */ + struct trans_check_data { + composites_manager *cm; + ankerl::unordered_dense::set *second_pass_set; + bool *has_dep; + } trans_data{this, &second_pass_set, &has_second_pass_dep}; + + rspamd_expression_atom_foreach(comp->expr, [](const rspamd_ftok_t *atom, gpointer ud) { + auto *data = reinterpret_cast(ud); + std::string_view atom_str(atom->begin, atom->len); + if (auto *dep_comp = data->cm->find(atom_str); dep_comp != nullptr) { + /* Cast away const since we know this points to a modifiable composite */ + if (data->second_pass_set->contains(const_cast(dep_comp))) { + *data->has_dep = true; + } + } }, &trans_data); + + if (has_second_pass_dep) { + second_pass_set.insert(comp); + changed = true; + msg_debug_config("composite '%s' marked for second pass (transitive dependency)", + comp->sym.c_str()); + } + } + } while (changed); + + /* Move second-pass composites from first_pass to second_pass vector and mark them */ + auto it = first_pass_composites.begin(); + while (it != first_pass_composites.end()) { + if (second_pass_set.contains(*it)) { + (*it)->second_pass = true; + second_pass_composites.push_back(*it); + it = first_pass_composites.erase(it); + } + else { + ++it; + } + } + + msg_debug_config("composite dependency analysis complete: %d first-pass, %d second-pass composites", + (int) first_pass_composites.size(), (int) second_pass_composites.size()); +} + +}// namespace rspamd::composites + +void rspamd_composites_process_deps(void *cm_ptr, struct rspamd_config *cfg) +{ + auto *cm = COMPOSITE_MANAGER_FROM_PTR(cm_ptr); + cm->process_dependencies(); } \ No newline at end of file diff --git a/src/libserver/task.c b/src/libserver/task.c index 0d58ad3c7b..b38c07e264 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -803,13 +803,8 @@ rspamd_task_process(struct rspamd_task *task, unsigned int stages) } break; case RSPAMD_TASK_STAGE_COMPOSITES_POST: - /* Second run of composites processing before idempotent filters (if needed) */ - if (task->result->nresults_postfilters != task->result->nresults) { - rspamd_composites_process_task(task); - } - else { - msg_debug_task("skip second run of composites as the result has not been changed"); - } + /* Second run of composites processing for composites that depend on postfilters/stats */ + rspamd_composites_process_task(task); break; case RSPAMD_TASK_STAGE_IDEMPOTENT: diff --git a/test/functional/cases/001_merged/109_composites_postfilter.robot b/test/functional/cases/001_merged/109_composites_postfilter.robot new file mode 100644 index 0000000000..8205948c52 --- /dev/null +++ b/test/functional/cases/001_merged/109_composites_postfilter.robot @@ -0,0 +1,16 @@ +*** Settings *** +Library ${RSPAMD_TESTDIR}/lib/rspamd.py +Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot +Variables ${RSPAMD_TESTDIR}/lib/vars.py + +*** Variables *** +${MESSAGE} ${RSPAMD_TESTDIR}/messages/spam_message.eml +${RSPAMD_LUA_SCRIPT} ${RSPAMD_TESTDIR}/lua/composites_postfilter.lua + +*** Test Cases *** +Composite With Postfilter And Filter + [Documentation] Test that composite with postfilter + filter symbols works correctly (issue #5674) + Scan File ${MESSAGE} + Expect Symbol With Score TEST_POSTFILTER_COMPOSITE 10.0 + Do Not Expect Symbol TEST_FILTER_SYM + Do Not Expect Symbol TEST_POSTFILTER_SYM diff --git a/test/functional/configs/merged-local.conf b/test/functional/configs/merged-local.conf index bac414aa17..b34fb6f12e 100644 --- a/test/functional/configs/merged-local.conf +++ b/test/functional/configs/merged-local.conf @@ -1063,6 +1063,13 @@ group "test" { } } +composites { + TEST_POSTFILTER_COMPOSITE { + expression = "TEST_FILTER_SYM & TEST_POSTFILTER_SYM"; + score = 10.0; + } +} + worker "controller" { bind_socket = "{= env.LOCAL_ADDR =}:{= env.PORT_CONTROLLER =}"; keypair { diff --git a/test/functional/configs/merged.conf b/test/functional/configs/merged.conf index 06a34308da..a2c2963f14 100644 --- a/test/functional/configs/merged.conf +++ b/test/functional/configs/merged.conf @@ -37,6 +37,9 @@ lua = "{= env.TESTDIR =}/lua/magic.lua" # 380_external_relay lua = "{= env.TESTDIR =}/lua/external_relay.lua" +# 109_composites_postfilter +lua = "{= env.TESTDIR =}/lua/composites_postfilter.lua" + .include(priority=1,duplicate=merge) "{= env.TESTDIR =}/configs/merged-local.conf" .include(priority=2,duplicate=replace) "{= env.TESTDIR =}/configs/merged-override.conf" diff --git a/test/functional/lua/composites_postfilter.lua b/test/functional/lua/composites_postfilter.lua new file mode 100644 index 0000000000..ae7e36b2a6 --- /dev/null +++ b/test/functional/lua/composites_postfilter.lua @@ -0,0 +1,34 @@ +-- Test for composite with postfilter + filter symbols +-- This test demonstrates bug #5674 + +-- Normal filter symbol (executed during FILTERS stage) +rspamd_config:register_symbol({ + type = 'normal', + name = 'TEST_FILTER_SYM', + callback = function(task) + task:insert_result('TEST_FILTER_SYM', 1.0) + end +}) +rspamd_config:set_metric_symbol({ + name = 'TEST_FILTER_SYM', + score = 1.0 +}) + +-- Postfilter symbol (executed during POST_FILTERS stage) +rspamd_config:register_symbol({ + type = 'postfilter', + name = 'TEST_POSTFILTER_SYM', + callback = function(task) + task:insert_result('TEST_POSTFILTER_SYM', 1.0) + end +}) +rspamd_config:set_metric_symbol({ + name = 'TEST_POSTFILTER_SYM', + score = 1.0 +}) + +-- Composite is defined in merged-local.conf +-- TEST_POSTFILTER_COMPOSITE = TEST_FILTER_SYM & TEST_POSTFILTER_SYM +-- This should match when both symbols are present +-- BUG: Currently fails because composite is evaluated before postfilter runs +-- and is not re-evaluated in the second pass -- 2.47.3