return current_gen.get();
}
+ /**
+ * Snapshot current_gen as base_gen — every future staging generation
+ * starts from a clone of base_gen. Called once after static config has
+ * been loaded and the first round of process_dependencies /
+ * build_inverted_index / mark_whitelist_dependencies has completed.
+ */
+ auto pin_base_generation() -> void
+ {
+ base_gen = current_gen;
+ }
+
+ auto get_base_generation() const -> std::shared_ptr<composites_generation>
+ {
+ return base_gen;
+ }
+
+ /**
+ * Build a fresh staging generation off the pinned base. Composites from
+ * base_gen are cloned (new shared_ptr, fresh id, flags reset) so the
+ * staging can run process_dependencies / build_inverted_index without
+ * mutating composites that in-flight tasks may still be observing.
+ *
+ * Callers (the dynamic-map fin callback) layer the map's composites on
+ * top, then call publish_generation().
+ */
+ auto build_staging() -> std::shared_ptr<composites_generation>;
+
+ /**
+ * Apply a single UCL composite definition to a staging generation.
+ * Parses the expression, creates a fresh composite struct, replaces any
+ * existing entry under this name, and updates cfg->symbols so scoring
+ * and FINE-flag propagation see the dynamic composite. Returns the new
+ * composite or nullptr on parse/validation failure.
+ */
+ auto add_composite_to_staging(composites_generation &staging,
+ std::string_view name,
+ const ucl_object_t *obj) -> rspamd_composite *;
+
+ /**
+ * Replace the composite under `name` in `staging` with a disabled stub
+ * (or insert one if the name was unknown). Returns true if a stub was
+ * (re)created.
+ */
+ auto disable_in_staging(composites_generation &staging,
+ const std::string &name) -> bool;
+
+ /**
+ * Publish a staging generation as current:
+ * - register new composite names with the symcache + cfg->symbols
+ * - update ever_seen_names
+ * - bump the resort generation on the symcache
+ * - run process_dependencies / build_inverted_index / mark_whitelist
+ * - atomically swap current_gen
+ *
+ * Single-threaded libev makes the swap a plain assignment; in-flight
+ * tasks keep their snapshot alive via shared_ptr.
+ */
+ auto publish_generation(std::shared_ptr<composites_generation> staging) -> void;
+
+ /**
+ * Capture current_gen as the static-config base. Subsequent
+ * build_staging() calls clone from this snapshot. Populates
+ * ever_seen_names from the static composites so they aren't
+ * re-registered with the symcache on first dynamic publish. Idempotent
+ * — calling more than once is a no-op.
+ */
+ auto seal_static_load() -> void;
+
+ /**
+ * Returns the set of composite names this manager has ever published.
+ * Map handlers consult this to materialise disabled stubs for names
+ * that previously existed and have now been removed.
+ */
+ auto ever_seen() const -> const ankerl::unordered_dense::set<std::string> &
+ {
+ return ever_seen_names;
+ }
+
+ auto allocate_generation_id() -> uint64_t
+ {
+ return ++next_gen_id;
+ }
+
private:
~composites_manager() = default;
static void composites_manager_dtor(void *ptr)
struct rspamd_config *cfg;
int next_composite_id = 0;
+ uint64_t next_gen_id = 0;
/* The live generation. Replaced on dynamic-map reload via publish_generation(). */
std::shared_ptr<composites_generation> current_gen;
+ /* Snapshot of the static-config generation, taken after config-load.
+ * Every staging generation is cloned from this. */
+ std::shared_ptr<composites_generation> base_gen;
+
+ /* Names this manager has ever published (static or dynamic). Monotonic.
+ * Used to (a) gate one-time symcache + cfg->symbols registration and
+ * (b) help map handlers materialise disabled stubs for vanished names. */
+ ankerl::unordered_dense::set<std::string> ever_seen_names;
+
+ /* The composite shared_ptr each name was first registered with in the
+ * symcache. The symcache stores raw cbdata; pinning the shared_ptr here
+ * guarantees it never dangles even when later generations replace the
+ * composite under the same name. Static composites are already pinned
+ * via base_gen → all_composites so this map only fills in for
+ * dynamic-only names. */
+ ankerl::unordered_dense::map<std::string, std::shared_ptr<rspamd_composite>>
+ symcache_pinned;
+
public:
/* Configuration flags */
bool use_inverted_index; /**< Use inverted index for fast composite lookup (default: true) */
return new_composite(composite_name, expr, composite_expression).get();
}
+/*
+ * Per-map state. Lives in cfg->cfg_pool and survives across reloads of the
+ * same map. `last_names` tracks which composite names this map last
+ * published so that on reload we can stub-out names that the map dropped.
+ */
struct map_cbdata {
composites_manager *cm;
struct rspamd_config *cfg;
std::string buf;
+ ankerl::unordered_dense::set<std::string> last_names;
explicit map_cbdata(struct rspamd_config *cfg)
: cfg(cfg)
struct map_cb_data *data,
gboolean _final)
{
-
if (data->cur_data == nullptr) {
data->cur_data = data->prev_data;
reinterpret_cast<map_cbdata *>(data->cur_data)->buf.clear();
}
auto *cbd = reinterpret_cast<map_cbdata *>(data->cur_data);
-
cbd->buf.append(chunk, len);
return nullptr;
}
if (cbd) {
cbd->buf.clear();
}
+ return;
}
- else if (cbd != nullptr) {
- if (target) {
- *target = data->cur_data;
- }
- rspamd::string_foreach_line(cbd->buf, [&](std::string_view line) {
- auto [name_and_score, expr] = rspamd::string_split_on(line, ' ');
- auto [name, score] = rspamd::string_split_on(name_and_score, ':');
-
- if (!score.empty()) {
- /* I wish it was supported properly */
- //auto conv_res = std::from_chars(value->data(), value->size(), num);
- char numbuf[128], *endptr = nullptr;
- size_t n = std::min(score.size(), sizeof(numbuf) - 1);
- memcpy(numbuf, score.data(), n);
- numbuf[n] = '\0';
- auto num = g_ascii_strtod(numbuf, &endptr);
-
- if (fabs(num) >= G_MAXFLOAT || std::isnan(num)) {
- msg_err("invalid score for %*s", (int) name_and_score.size(), name_and_score.data());
- return;
- }
+ if (cbd == nullptr) {
+ msg_err("no data read for composites map");
+ return;
+ }
- auto ret = cbd->cm->add_composite(name, expr, true, num);
+ if (target) {
+ *target = data->cur_data;
+ }
- if (ret == nullptr) {
- msg_err("cannot add composite %*s", (int) name_and_score.size(), name_and_score.data());
- return;
- }
- }
- else {
- msg_err("missing score for %*s", (int) name_and_score.size(), name_and_score.data());
- return;
- }
- });
+ auto *cfg = cbd->cfg;
+ auto *cm = cbd->cm;
+
+ /* Parse the buffered bytes as UCL. */
+ auto *parser = ucl_parser_new(UCL_PARSER_NO_FILEVARS);
+ if (!ucl_parser_add_chunk(parser,
+ reinterpret_cast<const unsigned char *>(cbd->buf.data()),
+ cbd->buf.size())) {
+ msg_err_config("cannot parse composites map as UCL: %s",
+ ucl_parser_get_error(parser));
+ ucl_parser_free(parser);
+ cbd->buf.clear();
+ return;
}
- else {
- msg_err("no data read for composites map");
+
+ ucl_object_t *top = ucl_parser_get_object(parser);
+ ucl_parser_free(parser);
+
+ if (top == nullptr) {
+ msg_err_config("composites map UCL is empty");
+ cbd->buf.clear();
+ return;
+ }
+
+ if (ucl_object_type(top) != UCL_OBJECT) {
+ msg_err_config("composites map must be a UCL object, got %s",
+ ucl_object_type_to_string(ucl_object_type(top)));
+ ucl_object_unref(top);
+ cbd->buf.clear();
+ return;
+ }
+
+ /* Build a staging generation cloned from the base. */
+ auto staging = cm->build_staging();
+ ankerl::unordered_dense::set<std::string> seen_in_map;
+ unsigned int added = 0, updated = 0, failed = 0;
+
+ const ucl_object_t *cur;
+ auto *it = ucl_object_iterate_new(top);
+ while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) {
+ const char *key = ucl_object_key(cur);
+ if (key == nullptr) {
+ continue;
+ }
+ std::string name{key};
+
+ bool replacing = staging->composites.contains(name);
+ auto *comp = cm->add_composite_to_staging(*staging, name, cur);
+ if (comp == nullptr) {
+ failed++;
+ continue;
+ }
+
+ seen_in_map.insert(name);
+ if (replacing) {
+ updated++;
+ }
+ else {
+ added++;
+ }
+ }
+ ucl_object_iterate_free(it);
+ ucl_object_unref(top);
+
+ /* Names this map previously owned but no longer mentions become
+ * disabled stubs in the staging. */
+ unsigned int stubbed = 0;
+ for (const auto &name: cbd->last_names) {
+ if (seen_in_map.contains(name)) {
+ continue;
+ }
+ if (cm->disable_in_staging(*staging, name)) {
+ stubbed++;
+ }
}
+
+ cm->publish_generation(staging);
+ cbd->last_names = std::move(seen_in_map);
+ cbd->buf.clear();
+
+ msg_info_config("dynamic composites map reloaded (gen %L): "
+ "%ud added, %ud updated, %ud stubbed, %ud failed",
+ (int64_t) cm->current()->generation_id,
+ added, updated, stubbed, failed);
}
static void
marked_count);
}
+auto composites_manager::build_staging() -> std::shared_ptr<composites_generation>
+{
+ auto staging = std::make_shared<composites_generation>();
+ staging->generation_id = allocate_generation_id();
+
+ if (!base_gen) {
+ /* Should not happen — pin_base_generation must be called once
+ * after static load. Fall back to current_gen so the caller still
+ * gets a workable staging. */
+ msg_warn_config("composites: build_staging() called before base "
+ "generation was pinned, cloning current_gen instead");
+ }
+
+ const auto &source = base_gen ? *base_gen : *current_gen;
+
+ for (const auto &orig: source.all_composites) {
+ /*
+ * Deep-copy the composite struct (shared expression pointer is
+ * fine, it lives in cfg_pool). Re-derive per-generation flags
+ * via the analysis pipeline.
+ */
+ auto cloned = std::make_shared<rspamd_composite>(*orig);
+ cloned->id = next_id();
+ cloned->second_pass = false;
+ cloned->has_positive_atoms = false;
+ staging->all_composites.push_back(cloned);
+ staging->composites[cloned->sym] = cloned;
+ }
+
+ msg_debug_config("composites: built staging gen %L with %d cloned composites",
+ (int64_t) staging->generation_id,
+ (int) staging->all_composites.size());
+
+ return staging;
+}
+
+auto composites_manager::add_composite_to_staging(composites_generation &staging,
+ std::string_view name,
+ const ucl_object_t *obj) -> rspamd_composite *
+{
+ const auto *val = ucl_object_lookup(obj, "enabled");
+ if (val != nullptr && !ucl_object_toboolean(val)) {
+ /* Operator wants the name present but inactive — disabled stub */
+ disable_in_staging(staging, std::string(name));
+ return staging.find(name) ? const_cast<rspamd_composite *>(staging.find(name)) : nullptr;
+ }
+
+ const char *composite_expression = nullptr;
+ val = ucl_object_lookup(obj, "expression");
+
+ if (val == nullptr || !ucl_object_tostring_safe(val, &composite_expression)) {
+ msg_err_config("dynamic composite %*s has no expression",
+ (int) name.size(), name.data());
+ return nullptr;
+ }
+
+ /* Copy the expression into cfg_pool — parser keeps pointers into it. */
+ auto expr_len = strlen(composite_expression);
+ char *expr_copy = rspamd_mempool_alloc_buffer(cfg->cfg_pool, expr_len + 1);
+ memcpy(expr_copy, composite_expression, expr_len);
+ expr_copy[expr_len] = '\0';
+
+ GError *err = nullptr;
+ rspamd_expression *expr = nullptr;
+
+ if (!rspamd_parse_expression(expr_copy, expr_len, &composite_expr_subr,
+ nullptr, cfg->cfg_pool, &err, &expr)) {
+ msg_err_config("cannot parse expression for dynamic composite %*s: %e",
+ (int) name.size(), name.data(), err);
+ if (err) {
+ g_error_free(err);
+ }
+ return nullptr;
+ }
+
+ auto composite = std::make_shared<rspamd_composite>();
+ composite->id = next_id();
+ composite->expr = expr;
+ composite->str_expr = composite_expression;
+ composite->sym = std::string(name);
+ composite->second_pass = false;
+ composite->has_positive_atoms = false;
+ composite->disabled = false;
+ composite->policy = rspamd_composite_policy::RSPAMD_COMPOSITE_POLICY_REMOVE_ALL;
+
+ val = ucl_object_lookup(obj, "policy");
+ if (val) {
+ auto p = composite_policy_from_str(ucl_object_tostring(val));
+ if (p == rspamd_composite_policy::RSPAMD_COMPOSITE_POLICY_UNKNOWN) {
+ msg_err_config("dynamic composite %*s has unknown policy '%s'",
+ (int) name.size(), name.data(), ucl_object_tostring(val));
+ return nullptr;
+ }
+ composite->policy = p;
+ }
+
+ /* Replace any existing entry under this name (came from base_gen
+ * clone or from an earlier entry in this same map). */
+ auto sym_key = composite->sym;
+ auto it = staging.composites.find(sym_key);
+ if (it != staging.composites.end()) {
+ /* Find and replace in all_composites */
+ for (auto &slot: staging.all_composites) {
+ if (slot.get() == it->second.get()) {
+ slot = composite;
+ break;
+ }
+ }
+ it->second = composite;
+ }
+ else {
+ staging.all_composites.push_back(composite);
+ staging.composites[sym_key] = composite;
+ }
+
+ /* Reflect the composite in cfg->symbols so scoring and FINE-flag
+ * propagation work for both static and dynamic composites. Safe to
+ * mutate the GHashTable here because we're on the libev thread with
+ * no scan in progress. */
+ auto score = std::isnan(cfg->unknown_weight) ? 0.0 : cfg->unknown_weight;
+ val = ucl_object_lookup(obj, "score");
+ if (val != nullptr) {
+ ucl_object_todouble_safe(val, &score);
+ }
+
+ const char *group = "composite";
+ val = ucl_object_lookup(obj, "group");
+ if (val != nullptr) {
+ group = ucl_object_tostring(val);
+ }
+
+ const char *description = composite_expression;
+ val = ucl_object_lookup(obj, "description");
+ if (val != nullptr) {
+ description = ucl_object_tostring(val);
+ }
+
+ rspamd_config_add_symbol(cfg, composite->sym.c_str(), score,
+ description, group,
+ 0, ucl_object_get_priority(obj),
+ 1);
+
+ const auto *groups = ucl_object_lookup(obj, "groups");
+ if (groups && ucl_object_type(groups) == UCL_ARRAY) {
+ const ucl_object_t *cur_gr;
+ auto *gr_it = ucl_object_iterate_new(groups);
+
+ while ((cur_gr = ucl_object_iterate_safe(gr_it, true)) != nullptr) {
+ rspamd_config_add_symbol_group(cfg, composite->sym.c_str(),
+ ucl_object_tostring(cur_gr));
+ }
+
+ ucl_object_iterate_free(gr_it);
+ }
+
+ return composite.get();
+}
+
+auto composites_manager::disable_in_staging(composites_generation &staging,
+ const std::string &name) -> bool
+{
+ auto it = staging.composites.find(name);
+ if (it == staging.composites.end()) {
+ /* Name never existed — create an inert stub so find() works */
+ auto stub = std::make_shared<rspamd_composite>();
+ stub->id = next_id();
+ stub->expr = nullptr;
+ stub->sym = name;
+ stub->second_pass = false;
+ stub->has_positive_atoms = false;
+ stub->disabled = true;
+ stub->policy = rspamd_composite_policy::RSPAMD_COMPOSITE_POLICY_LEAVE;
+ staging.all_composites.push_back(stub);
+ staging.composites[name] = stub;
+ return true;
+ }
+
+ auto stub = std::make_shared<rspamd_composite>(*it->second);
+ stub->id = next_id();
+ stub->expr = nullptr;
+ stub->second_pass = false;
+ stub->has_positive_atoms = false;
+ stub->disabled = true;
+ for (auto &slot: staging.all_composites) {
+ if (slot.get() == it->second.get()) {
+ slot = stub;
+ break;
+ }
+ }
+ it->second = stub;
+ return true;
+}
+
+auto composites_manager::publish_generation(std::shared_ptr<composites_generation> staging) -> void
+{
+ if (!staging) {
+ return;
+ }
+
+ /* Register newly-introduced composite names with the symcache. cfg->symbols
+ * was already updated by add_composite_to_staging(). ever_seen_names gates
+ * the one-time symcache add. */
+ bool symcache_changed = false;
+ for (const auto &[name, comp]: staging->composites) {
+ if (comp->disabled) {
+ continue;
+ }
+ if (ever_seen_names.contains(name)) {
+ continue;
+ }
+ if (cfg->cache) {
+ rspamd_symcache_add_symbol(cfg->cache, name.c_str(), 0,
+ nullptr, comp.get(),
+ SYMBOL_TYPE_COMPOSITE, -1);
+ symcache_changed = true;
+ }
+ /* Pin the shared_ptr so the symcache's ud never dangles even if
+ * the composite is replaced in a later generation. */
+ symcache_pinned[name] = comp;
+ ever_seen_names.insert(name);
+ }
+
+ if (symcache_changed && cfg->cache) {
+ rspamd_symcache_promote_resort(cfg->cache);
+ }
+
+ /* Run the analysis pipeline on the staging gen. */
+ process_dependencies(*staging);
+ build_inverted_index(*staging);
+ mark_whitelist_dependencies(*staging);
+
+ /* Atomic swap (single-threaded libev: assignment is the swap). */
+ current_gen = std::move(staging);
+}
+
+auto composites_manager::seal_static_load() -> void
+{
+ if (base_gen) {
+ return; /* Already sealed */
+ }
+ base_gen = current_gen;
+ for (const auto &[name, comp]: current_gen->composites) {
+ ever_seen_names.insert(name);
+ /* Static composites are pinned via base_gen → all_composites, no
+ * extra pinning required for the symcache ud. */
+ }
+ msg_debug_config("composites: sealed static load (gen %L, %d composites)",
+ (int64_t) current_gen->generation_id,
+ (int) current_gen->all_composites.size());
+}
+
}// namespace rspamd::composites
void rspamd_composites_process_deps(void *cm_ptr, struct rspamd_config *cfg)
{
auto *cm = COMPOSITE_MANAGER_FROM_PTR(cm_ptr);
cm->mark_whitelist_dependencies();
+ /* Last step of static load: snapshot base generation and ever-seen
+ * names so dynamic map publishes can clone from a stable base. */
+ cm->seal_static_load();
+}
+
+bool rspamd_composites_add_dynamic_map(void *cm_ptr, const ucl_object_t *obj,
+ struct rspamd_config *cfg)
+{
+ auto *cm = COMPOSITE_MANAGER_FROM_PTR(cm_ptr);
+ (void) cm;
+ return rspamd_composites_add_map_handlers(obj, cfg);
+}
+
+uint64_t rspamd_composites_current_generation(void *cm_ptr)
+{
+ auto *cm = COMPOSITE_MANAGER_FROM_PTR(cm_ptr);
+ auto *gen = cm->current();
+ return gen ? gen->generation_id : 0;
}
\ No newline at end of file