std::vector<symbol_remove_data>>
symbols_to_remove;
std::vector<bool> checked;
+ /*
+ * Pinned snapshot of the composites generation that was current when
+ * this task started evaluating composites. All evaluation reads happen
+ * through this pointer, so a concurrent dynamic-map reload cannot
+ * change the world under our feet mid-task.
+ *
+ * Composite ids are globally monotonic across reloads; sizing `checked`
+ * by gen->all_composites.size() means we have a slot for every
+ * composite in this snapshot. Composites not in this generation are
+ * never evaluated, so their ids never index `checked`.
+ */
+ std::shared_ptr<composites_generation> gen;
bool is_second_pass; /**< true if we're in COMPOSITES_POST stage */
uint64_t matched_count{}; /**< number of matched composites */
explicit composites_data(struct rspamd_task *task, struct rspamd_scan_result *mres)
- : task(task), composite(nullptr), metric_res(mres), matched_count(0)
+ : task(task), composite(nullptr), metric_res(mres),
+ gen(COMPOSITE_MANAGER_FROM_PTR(task->cfg->composites_manager)->snapshot_generation()),
+ matched_count(0)
{
- checked.resize(rspamd_composites_manager_nelts(task->cfg->composites_manager) * 2,
- false);
+ /*
+ * Size `checked` by the largest composite id this generation could
+ * possibly produce + 1 (ids are 0-based and monotonic across
+ * reloads). For an empty generation we allocate a 2-slot dummy
+ * vector so indexing never overflows even if a stale id sneaks in.
+ */
+ int max_id = -1;
+ for (const auto &c: gen->all_composites) {
+ if (c->id > max_id) {
+ max_id = c->id;
+ }
+ }
+ checked.resize((max_id + 1) * 2, false);
/* Determine if we're in second pass by checking if POST_FILTERS stage has been processed */
is_second_pass = (task->processed_stages & RSPAMD_TASK_STAGE_POST_FILTERS) != 0;
}
}
};
-enum class rspamd_composite_atom_type {
- ATOM_UNKNOWN,
- ATOM_COMPOSITE,
- ATOM_PLAIN
-};
-
struct rspamd_composite_atom {
std::string symbol;
std::string_view norm_symbol;
- rspamd_composite_atom_type comp_type = rspamd_composite_atom_type::ATOM_UNKNOWN;
- const struct rspamd_composite *ncomp; /* underlying composite */
std::vector<rspamd_composite_option_match> opts;
};
msg_debug_composites("not found symbol %s in composite %s", sym.data(),
cd->composite->sym.c_str());
- if (G_UNLIKELY(atom->comp_type == rspamd_composite_atom_type::ATOM_UNKNOWN)) {
- const struct rspamd_composite *ncomp;
-
- if ((ncomp = COMPOSITE_MANAGER_FROM_PTR(task->cfg->composites_manager)->find(sym)) != NULL) {
- atom->comp_type = rspamd_composite_atom_type::ATOM_COMPOSITE;
- atom->ncomp = ncomp;
- }
- else {
- atom->comp_type = rspamd_composite_atom_type::ATOM_PLAIN;
- }
- }
+ /*
+ * Resolve composite references against the task's pinned snapshot
+ * every time. We do not cache the resolution on the atom because
+ * the atom struct is shared across tasks that may be using
+ * different generations — caching would dangle on reload.
+ * Hashtable lookups are cheap and only happen for symbols that
+ * aren't already in the scan result.
+ */
+ const struct rspamd_composite *ncomp = cd->gen->find(sym);
- if (atom->comp_type == rspamd_composite_atom_type::ATOM_COMPOSITE) {
+ if (ncomp != nullptr && !ncomp->disabled) {
msg_debug_composites("symbol %s for composite %s is another composite",
sym.data(), cd->composite->sym.c_str());
- if (!cd->checked[atom->ncomp->id * 2]) {
+ if (!cd->checked[ncomp->id * 2]) {
msg_debug_composites("composite dependency %s for %s is not checked",
sym.data(), cd->composite->sym.c_str());
/* Set checked for this symbol to avoid cyclic references */
cd->checked[cd->composite->id * 2] = true;
auto *saved = cd->composite; /* Save the current composite */
- composites_foreach_callback((gpointer) atom->ncomp->sym.c_str(),
- (gpointer) atom->ncomp, (gpointer) cd);
+ composites_foreach_callback((gpointer) ncomp->sym.c_str(),
+ (gpointer) ncomp, (gpointer) cd);
/* Restore state */
cd->composite = saved;
cd->checked[cd->composite->id * 2] = false;
/*
* XXX: in case of cyclic references this would return 0
*/
- if (cd->checked[atom->ncomp->id * 2 + 1]) {
+ if (cd->checked[ncomp->id * 2 + 1]) {
ms = rspamd_task_find_symbol_result(cd->task, sym.data(),
cd->metric_res);
}
{
auto &cd = comp_data_vec.emplace_back(task, mres);
+ /*
+ * All evaluation reads go through cd.gen — the snapshot we pinned
+ * when this composites_data was constructed. cm->use_inverted_index
+ * is read off the manager (it is config-time only and never
+ * changes after init).
+ */
+ auto *gen = cd.gen.get();
+
if (is_second_pass) {
- /* Second pass: process only second-pass composites directly from manager */
- msg_debug_composites("processing second-pass composites");
- for (auto *comp: cm->second_pass_composites) {
+ /* Second pass: process only second-pass composites from the snapshot */
+ msg_debug_composites("processing second-pass composites (gen %L)",
+ (int64_t) gen->generation_id);
+ for (auto *comp: gen->second_pass_composites) {
composites_foreach_callback((gpointer) comp->sym.c_str(),
(gpointer) comp,
&cd);
}
- composites_checked += cm->second_pass_composites.size();
+ composites_checked += gen->second_pass_composites.size();
}
else if (use_fast_path) {
/* First pass with inverted index: fast lookup */
/* Callback data for collecting potentially active composites */
struct collect_active_cbdata {
- composites_manager *cm;
+ composites_generation *gen;
ankerl::unordered_dense::set<rspamd_composite *> *active;
- } collect_data{cm, &potentially_active};
+ } collect_data{gen, &potentially_active};
/* Collect composites that have at least one positive atom present */
rspamd_task_symbol_result_foreach(task, mres, [](gpointer key, gpointer value, gpointer ud) {
auto *cbd = reinterpret_cast<collect_active_cbdata *>(ud);
std::string_view sym_name{reinterpret_cast<const char *>(key)};
- auto it = cbd->cm->symbol_to_composites.find(sym_name);
- if (it != cbd->cm->symbol_to_composites.end()) {
+ auto it = cbd->gen->symbol_to_composites.find(sym_name);
+ if (it != cbd->gen->symbol_to_composites.end()) {
for (auto *comp: it->second) {
/* Only add first-pass composites */
if (!comp->second_pass) {
} }, &collect_data);
/* Always add NOT-only composites (they have no positive atoms) */
- for (auto *comp: cm->not_only_composites) {
+ for (auto *comp: gen->not_only_composites) {
if (!comp->second_pass) {
potentially_active.insert(comp);
}
}
- msg_debug_composites("processing %d potentially active composites (from %d first-pass)",
+ msg_debug_composites("processing %d potentially active composites (from %d first-pass, gen %L)",
(int) potentially_active.size(),
- (int) cm->first_pass_composites.size());
+ (int) gen->first_pass_composites.size(),
+ (int64_t) gen->generation_id);
/* Process only potentially active composites */
for (auto *comp: potentially_active) {
}
else {
/* Slow path: check all first-pass composites */
- msg_debug_composites("processing all %d first-pass composites (slow path)",
- (int) cm->first_pass_composites.size());
- for (auto *comp: cm->first_pass_composites) {
+ msg_debug_composites("processing all %d first-pass composites (slow path, gen %L)",
+ (int) gen->first_pass_composites.size(),
+ (int64_t) gen->generation_id);
+ for (auto *comp: gen->first_pass_composites) {
composites_foreach_callback((gpointer) comp->sym.c_str(),
(gpointer) comp,
&cd);
}
- composites_checked += cm->first_pass_composites.size();
+ composites_checked += gen->first_pass_composites.size();
}
}
}
}
-void rspamd_composites_resolve_atom_types(composites_manager *cm)
-{
- auto resolve_callback = [](GNode *, rspamd_expression_atom_t *atom, gpointer ud) {
- auto *manager = reinterpret_cast<composites_manager *>(ud);
- auto *comp_atom = reinterpret_cast<rspamd_composite_atom *>(atom->data);
-
- if (comp_atom == nullptr) {
- return;
- }
-
- if (comp_atom->comp_type != rspamd_composite_atom_type::ATOM_UNKNOWN) {
- /* Already resolved */
- return;
- }
-
- const auto *ncomp = manager->find(comp_atom->symbol);
- if (ncomp != nullptr) {
- comp_atom->comp_type = rspamd_composite_atom_type::ATOM_COMPOSITE;
- comp_atom->ncomp = ncomp;
- }
- else {
- comp_atom->comp_type = rspamd_composite_atom_type::ATOM_PLAIN;
- comp_atom->ncomp = nullptr;
- }
- };
-
- /* Process all first-pass composites */
- for (auto *comp: cm->first_pass_composites) {
- rspamd_expression_atom_foreach_ex(comp->expr, resolve_callback, cm);
- }
-
- /* Process all second-pass composites */
- for (auto *comp: cm->second_pass_composites) {
- rspamd_expression_atom_foreach_ex(comp->expr, resolve_callback, cm);
- }
-}
-
}// namespace rspamd::composites
#define RSPAMD_COMPOSITES_INTERNAL_HXX
#pragma once
+#include <memory>
#include <string>
+#include <vector>
+#include "contrib/ankerl/unordered_dense.h"
#include "libutil/expression.h"
#include "libutil/util.h"
#include "libutil/cxx/hash_util.hxx"
rspamd_composite_policy policy;
bool second_pass; /**< true if this composite needs second pass evaluation */
bool has_positive_atoms; /**< true if composite has at least one non-negated atom */
+ bool disabled; /**< true if composite is a placeholder stub (evaluates to false) */
+};
+
+/**
+ * A composites generation: an immutable-once-published snapshot of all
+ * composites and their precomputed evaluation indices.
+ *
+ * The manager holds one current generation. On dynamic map reloads a new
+ * generation is built off-line and atomically swapped in; in-flight tasks
+ * keep using their snapshot (held via shared_ptr in composites_data).
+ */
+struct composites_generation {
+ ankerl::unordered_dense::map<std::string,
+ std::shared_ptr<rspamd_composite>,
+ rspamd::smart_str_hash, rspamd::smart_str_equal>
+ composites;
+ /* Ownership of every composite belongs here (including duplicates) */
+ std::vector<std::shared_ptr<rspamd_composite>> all_composites;
+
+ /* Two-phase evaluation buckets */
+ std::vector<rspamd_composite *> first_pass_composites;
+ std::vector<rspamd_composite *> second_pass_composites;
+
+ /* Inverted index: symbol -> composites that contain this symbol as positive atom */
+ ankerl::unordered_dense::map<std::string, std::vector<rspamd_composite *>,
+ rspamd::smart_str_hash, rspamd::smart_str_equal>
+ symbol_to_composites;
+ /* Composites that have only negated atoms or group matchers (must always be checked) */
+ std::vector<rspamd_composite *> not_only_composites;
+
+ uint64_t generation_id = 0;
+
+ auto find(std::string_view name) const -> const rspamd_composite *
+ {
+ auto found = composites.find(std::string(name));
+ return found != composites.end() ? found->second.get() : nullptr;
+ }
};
#define COMPOSITE_MANAGER_FROM_PTR(ptr) (reinterpret_cast<rspamd::composites::composites_manager *>(ptr))
class composites_manager {
public:
composites_manager(struct rspamd_config *_cfg)
- : cfg(_cfg), use_inverted_index(true)
+ : cfg(_cfg),
+ current_gen(std::make_shared<composites_generation>()),
+ use_inverted_index(true)
{
rspamd_mempool_add_destructor(_cfg->cfg_pool, composites_manager_dtor, this);
}
auto size(void) const -> std::size_t
{
- return all_composites.size();
+ return current_gen->all_composites.size();
}
auto find(std::string_view name) const -> const rspamd_composite *
{
- auto found = composites.find(std::string(name));
-
- if (found != composites.end()) {
- return found->second.get();
- }
+ return current_gen->find(name);
+ }
- return nullptr;
+ /**
+ * Snapshot the current generation. Callers (tasks) keep this shared_ptr
+ * alive for the duration of evaluation so a concurrent reload cannot
+ * pull the rug out from under them.
+ */
+ auto snapshot_generation() const -> std::shared_ptr<composites_generation>
+ {
+ return current_gen;
}
auto add_composite(std::string_view, const ucl_object_t *, bool silent_duplicate) -> rspamd_composite *;
auto add_composite(std::string_view name, std::string_view expression, bool silent_duplicate, double score = NAN) -> rspamd_composite *;
+ /* Allocate a fresh monotonic composite id (stable across generations) */
+ auto next_id() -> int
+ {
+ return next_composite_id++;
+ }
+
+ auto get_cfg() const -> struct rspamd_config *
+ {
+ return cfg;
+ }
+
+ auto current() const -> composites_generation *
+ {
+ return current_gen.get();
+ }
+
private:
~composites_manager() = default;
static void composites_manager_dtor(void *ptr)
auto new_composite(std::string_view composite_name, rspamd_expression *expr,
std::string_view composite_expression) -> auto
{
- auto &composite = all_composites.emplace_back(std::make_shared<rspamd_composite>());
+ auto &gen = *current_gen;
+ auto &composite = gen.all_composites.emplace_back(std::make_shared<rspamd_composite>());
composite->expr = expr;
- composite->id = all_composites.size() - 1;
+ composite->id = next_id();
composite->str_expr = composite_expression;
composite->sym = composite_name;
composite->second_pass = false; /* Initially all composites are first pass */
+ composite->disabled = false;
- composites[composite->sym] = composite;
+ gen.composites[composite->sym] = composite;
return composite;
}
- ankerl::unordered_dense::map<std::string,
- std::shared_ptr<rspamd_composite>, rspamd::smart_str_hash, rspamd::smart_str_equal>
- composites;
- /* Store all composites here, even if we have duplicates */
- std::vector<std::shared_ptr<rspamd_composite>> all_composites;
-
struct rspamd_config *cfg;
+ int next_composite_id = 0;
-public:
- /* Two-phase evaluation: composites are split into first and second pass */
- std::vector<rspamd_composite *> first_pass_composites; /* Evaluated during COMPOSITES stage */
- std::vector<rspamd_composite *> second_pass_composites; /* Evaluated during COMPOSITES_POST stage */
-
- /* Inverted index: symbol -> composites that contain this symbol as positive atom */
- ankerl::unordered_dense::map<std::string, std::vector<rspamd_composite *>,
- rspamd::smart_str_hash, rspamd::smart_str_equal>
- symbol_to_composites;
- /* Composites that have only negated atoms (must always be checked) */
- std::vector<rspamd_composite *> not_only_composites;
+ /* The live generation. Replaced on dynamic-map reload via publish_generation(). */
+ std::shared_ptr<composites_generation> current_gen;
+public:
/* Configuration flags */
bool use_inverted_index; /**< Use inverted index for fast composite lookup (default: true) */
void mark_whitelist_dependencies();
};
-/**
- * Precompute atom types (ATOM_COMPOSITE vs ATOM_PLAIN) for all composites.
- * This eliminates lazy lookups during expression evaluation.
- * Should be called after all composites are registered.
- */
-void rspamd_composites_resolve_atom_types(composites_manager *cm);
-
}// namespace rspamd::composites
#endif//RSPAMD_COMPOSITES_INTERNAL_HXX
return nullptr;
}
- if (composites.contains(composite_name)) {
+ if (current_gen->composites.contains(composite_name)) {
if (silent_duplicate) {
msg_debug_config("composite %s is redefined", composite_name.data());
return nullptr;
GError *err = nullptr;
rspamd_expression *expr = nullptr;
- if (composites.contains(composite_name)) {
+ if (current_gen->composites.contains(composite_name)) {
/* Duplicate composite - refuse to add */
if (silent_duplicate) {
msg_debug_config("composite %s is redefined", composite_name.data());
{
ankerl::unordered_dense::set<rspamd_composite *> second_pass_set;
bool changed;
+ auto &gen = *current_gen;
- msg_debug_config("analyzing composite dependencies for two-phase evaluation");
+ msg_debug_config("analyzing composite dependencies for two-phase evaluation (gen %L)",
+ (int64_t) gen.generation_id);
- /* Initially, all composites start in first pass */
- for (const auto &comp: all_composites) {
- first_pass_composites.push_back(comp.get());
+ /* Reset pass buckets in case process_dependencies() is called repeatedly */
+ gen.first_pass_composites.clear();
+ gen.second_pass_composites.clear();
+
+ /* Skip disabled stubs entirely — they will not be evaluated */
+ for (const auto &comp: gen.all_composites) {
+ if (!comp->disabled) {
+ gen.first_pass_composites.push_back(comp.get());
+ }
+ else {
+ comp->second_pass = false;
+ }
}
/* First pass: mark composites that directly depend on postfilters/stats */
- for (auto *comp: first_pass_composites) {
+ for (auto *comp: gen.first_pass_composites) {
composite_dep_cbdata cbd{cfg, false, this};
rspamd_expression_atom_foreach(comp->expr,
/* Second pass: handle transitive dependencies */
do {
changed = false;
- for (auto *comp: first_pass_composites) {
+ for (auto *comp: gen.first_pass_composites) {
if (second_pass_set.contains(comp)) {
continue;
}
} while (changed);
/* Move second-pass composites from first_pass to second_pass vector and mark them */
- auto it = first_pass_composites.begin();
- while (it != first_pass_composites.end()) {
+ auto it = gen.first_pass_composites.begin();
+ while (it != gen.first_pass_composites.end()) {
if (second_pass_set.contains(*it)) {
(*it)->second_pass = true;
- second_pass_composites.push_back(*it);
- it = first_pass_composites.erase(it);
+ gen.second_pass_composites.push_back(*it);
+ it = gen.first_pass_composites.erase(it);
}
else {
+ (*it)->second_pass = false;
++it;
}
}
msg_debug_config("composite dependency analysis complete: %d first-pass, %d second-pass composites",
- (int) first_pass_composites.size(), (int) second_pass_composites.size());
+ (int) gen.first_pass_composites.size(), (int) gen.second_pass_composites.size());
}
/*
/* Mark that we have at least one positive atom */
cbd->has_positive = true;
- /* Add to inverted index */
- cbd->cm->symbol_to_composites[symbol_name].push_back(cbd->comp);
+ /* Add to inverted index (always the manager's *current* generation, which
+ * is what build_inverted_index operates on) */
+ cbd->cm->current()->symbol_to_composites[symbol_name].push_back(cbd->comp);
}
void composites_manager::build_inverted_index()
{
- msg_debug_config("building inverted index for %d composites", (int) all_composites.size());
+ auto &gen = *current_gen;
+
+ msg_debug_config("building inverted index for %d composites (gen %L)",
+ (int) gen.all_composites.size(), (int64_t) gen.generation_id);
+
+ gen.symbol_to_composites.clear();
+ gen.not_only_composites.clear();
+
+ for (auto &comp: gen.all_composites) {
+ if (comp->disabled) {
+ /* Stub: contributes neither to the index nor to "always check" */
+ comp->has_positive_atoms = false;
+ continue;
+ }
- for (auto &comp: all_composites) {
inverted_index_cbdata cbd{this, comp.get(), false, false};
rspamd_expression_atom_foreach_ex(comp->expr, inverted_index_atom_callback, &cbd);
* - It has only negated atoms (no positive symbols to match)
* - It uses group matchers (we don't know which symbols will match)
*/
- not_only_composites.push_back(comp.get());
+ gen.not_only_composites.push_back(comp.get());
if (cbd.has_group_atom) {
msg_debug_config("composite '%s' uses group matcher, will always be checked",
comp->sym.c_str());
* entries. Then remove the composite-name keys.
*/
ankerl::unordered_dense::set<std::string> composite_keys;
- for (const auto &[sym, _]: symbol_to_composites) {
+ for (const auto &[sym, _]: gen.symbol_to_composites) {
if (find(sym) != nullptr) {
composite_keys.insert(sym);
}
(int) composite_keys.size());
for (const auto &comp_key: composite_keys) {
- auto it = symbol_to_composites.find(comp_key);
- if (it == symbol_to_composites.end()) {
+ auto it = gen.symbol_to_composites.find(comp_key);
+ if (it == gen.symbol_to_composites.end()) {
continue;
}
/* Propagate dependents to each leaf atom's index entry */
for (const auto &leaf: leaf_atoms) {
- auto &entry = symbol_to_composites[leaf];
+ auto &entry = gen.symbol_to_composites[leaf];
for (auto *dep: dependents) {
if (std::find(entry.begin(), entry.end(), dep) == entry.end()) {
entry.push_back(dep);
*/
if (leaf_atoms.empty()) {
for (auto *dep: dependents) {
- if (std::find(not_only_composites.begin(),
- not_only_composites.end(), dep) == not_only_composites.end()) {
- not_only_composites.push_back(dep);
+ if (std::find(gen.not_only_composites.begin(),
+ gen.not_only_composites.end(), dep) == gen.not_only_composites.end()) {
+ gen.not_only_composites.push_back(dep);
msg_debug_config("composite '%s' depends on composite '%s' "
"with no leaf atoms, will always be checked",
dep->sym.c_str(), comp_key.c_str());
}
/* Remove the composite-name key from the index */
- symbol_to_composites.erase(comp_key);
+ gen.symbol_to_composites.erase(comp_key);
msg_debug_config("resolved composite reference '%s': "
"propagated %d dependents to %d leaf atoms",
}
msg_debug_config("inverted index built: %d unique symbols, %d not-only composites",
- (int) symbol_to_composites.size(), (int) not_only_composites.size());
+ (int) gen.symbol_to_composites.size(), (int) gen.not_only_composites.size());
}
/* Callback data for collecting atoms from whitelist composites */
void composites_manager::mark_whitelist_dependencies()
{
ankerl::unordered_dense::set<std::string> fine_symbols;
+ auto &gen = *current_gen;
- msg_debug_config("analyzing whitelist composites for FINE symbol marking");
+ msg_debug_config("analyzing whitelist composites for FINE symbol marking (gen %L)",
+ (int64_t) gen.generation_id);
/* Step 1: Find composites with negative score and collect their atoms */
- for (const auto &comp: all_composites) {
+ for (const auto &comp: gen.all_composites) {
+ if (comp->disabled) {
+ continue;
+ }
auto *sym_def = static_cast<struct rspamd_symbol *>(
g_hash_table_lookup(cfg->symbols, comp->sym.c_str()));
bool changed;
do {
changed = false;
- for (const auto &comp: all_composites) {
+ for (const auto &comp: gen.all_composites) {
+ if (comp->disabled) {
+ continue;
+ }
if (fine_symbols.contains(comp->sym)) {
size_t before = fine_symbols.size();
whitelist_atom_cbdata cbd{&fine_symbols};
{
auto *cm = COMPOSITE_MANAGER_FROM_PTR(cm_ptr);
cm->process_dependencies();
- rspamd_composites_resolve_atom_types(cm);
cm->build_inverted_index();
}