From 6e6f13c428f95057dbf11cf64f8e0daab057f331 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 9 Jun 2026 12:43:40 +0100 Subject: [PATCH] [Fix] neural: don't strand trained ANNs behind tombstones A trained ANN could become unreachable to workers even though training succeeded: NEURAL_SPAM/NEURAL_HAM stopped firing while the controller logged "ann ... is changed, our version = N, remote version = M" forever. Root cause is a version regression, not a missing zset registration. The new version was seeded from the in-memory set.ann, and fill_set_ann resets set.ann.version to 0 whenever a worker never loaded an ANN (restart, or the selected profile's blob was missing). A worker that trained from the _4 profile then saved version 1. process_existing_ann selects the highest version among compatible profiles, so the live version-1 blob was shadowed by the stale version-4 zset entry whose key was empty. The profile zset has no TTL, so the dead high-version tombstone was immortal and the condition self-perpetuated (the _4 blob was never rewritten). Three fixes: 1. Version monotonicity (lualib/plugins/neural.lua): seed the new version from the profile actually trained from (the trained-from key encodes it as the trailing _), max'd with training_profile/set.ann, so the new entry always outranks the profile it supersedes. 2. Liveness-aware selection (src/plugins/lua/neural.lua, neural_maybe_invalidate.lua): when the selected profile's blob is missing, fall back to the next compatible profile with a live blob instead of going dark, and emit a throttled warning (was a silent debug line). The invalidate script also GCs profile entries that have no blob and no training data and are older than a grace window. 3. Lifetime coupling (neural_save_unlock.lua, src/plugins/lua/neural.lua): give the profile zset a TTL refreshed each check_anns cycle, and refresh the blob TTL on every reload, so an actively used ANN never expires out from under its entry. Adds 330_neural/005_stale_version.robot, which injects a higher-version tombstone and asserts inference recovers. --- lualib/plugins/neural.lua | 16 ++- .../redis_scripts/neural_maybe_invalidate.lua | 49 ++++++-- lualib/redis_scripts/neural_save_unlock.lua | 3 + src/plugins/lua/neural.lua | 106 +++++++++++++++--- .../cases/330_neural/005_stale_version.robot | 82 ++++++++++++++ 5 files changed, 232 insertions(+), 24 deletions(-) create mode 100644 test/functional/cases/330_neural/005_stale_version.robot diff --git a/lualib/plugins/neural.lua b/lualib/plugins/neural.lua index a116adc14a..83074d0ce0 100644 --- a/lualib/plugins/neural.lua +++ b/lualib/plugins/neural.lua @@ -1295,7 +1295,21 @@ local function spawn_train(params) -- Deserialise ANN from the child process local loaded_ann = rspamd_kann.load(parsed.ann_data) - local version = (params.set.ann.version or 0) + 1 + -- Seed the new version from the profile we actually trained from, not + -- from the in-memory set.ann: fill_set_ann resets set.ann.version to 0 + -- whenever this worker never loaded an ANN (restart, or the selected + -- profile's blob was missing), which would make the freshly trained ANN + -- regress below the stale zset entries. process_existing_ann selects by + -- highest version, so a regressed entry is never picked and the blob is + -- stranded. The trained-from key encodes its version as the trailing + -- _; basing the new version on it guarantees the new entry outranks + -- the profile it supersedes. + local trained_from_version = tonumber(tostring(params.ann_key):match('_(%d+)$')) + local base_version = math.max( + trained_from_version or 0, + (params.set.training_profile and params.set.training_profile.version) or 0, + (params.set.ann and params.set.ann.version) or 0) + local version = base_version + 1 params.set.ann.version = version params.set.ann.ann = loaded_ann params.set.ann.symbols = params.set.symbols diff --git a/lualib/redis_scripts/neural_maybe_invalidate.lua b/lualib/redis_scripts/neural_maybe_invalidate.lua index 517fa019d4..03c308ec1a 100644 --- a/lualib/redis_scripts/neural_maybe_invalidate.lua +++ b/lualib/redis_scripts/neural_maybe_invalidate.lua @@ -1,8 +1,16 @@ --- Lua script to invalidate ANNs by rank --- Uses the following keys --- key1 - prefix for keys +-- Lua script to invalidate ANNs +-- Uses the following keys and argv +-- key1 - prefix for keys (profile zset) -- key2 - number of elements to leave +-- argv1 - tombstone cutoff timestamp (optional): profile entries older than this +-- whose ANN blob no longer exists and that hold no training data are +-- removed. This clears stale entries left behind when a blob expires or +-- a profile was registered but never trained, which would otherwise keep +-- shadowing freshly trained (lower-versioned) ANNs in version selection. +local removed = {} + +-- 1) Rank-based pruning: keep the `lim` newest entries by score (timestamp) local card = redis.call('ZCARD', KEYS[1]) local lim = tonumber(KEYS[2]) if card > lim then @@ -16,10 +24,37 @@ if card > lim then redis.call('DEL', tb.redis_key .. '_spam_set') redis.call('DEL', tb.redis_key .. '_ham_set') end + removed[#removed + 1] = k end end redis.call('ZREMRANGEBYRANK', KEYS[1], 0, card - lim - 1) - return to_delete -else - return {} -end \ No newline at end of file +end + +-- 2) Tombstone GC: drop entries with no trained blob and no training data that +-- are older than the cutoff. Entries still accumulating vectors, or younger +-- than the cutoff (freshly registered profiles awaiting their first train), +-- are spared regardless of blob presence. +local cutoff = tonumber(ARGV[1]) +if cutoff then + local survivors = redis.call('ZRANGE', KEYS[1], 0, -1, 'WITHSCORES') + local i = 1 + while i <= #survivors do + local member = survivors[i] + local score = tonumber(survivors[i + 1]) + i = i + 2 + if score and score < cutoff then + local ok, tb = pcall(cjson.decode, member) + if ok and type(tb) == 'table' and type(tb.redis_key) == 'string' then + local has_blob = redis.call('HEXISTS', tb.redis_key, 'ann') == 1 + local has_spam = redis.call('EXISTS', tb.redis_key .. '_spam_set') == 1 + local has_ham = redis.call('EXISTS', tb.redis_key .. '_ham_set') == 1 + if not has_blob and not has_spam and not has_ham then + redis.call('ZREM', KEYS[1], member) + removed[#removed + 1] = member + end + end + end + end +end + +return removed diff --git a/lualib/redis_scripts/neural_save_unlock.lua b/lualib/redis_scripts/neural_save_unlock.lua index c6d6dc3572..622ba50148 100644 --- a/lualib/redis_scripts/neural_save_unlock.lua +++ b/lualib/redis_scripts/neural_save_unlock.lua @@ -29,6 +29,9 @@ redis.call('HDEL', KEYS[3], 'lock') redis.call('HSET', KEYS[3], 'obsolete', '1') redis.call('EXPIRE', KEYS[3], 600) redis.call('EXPIRE', KEYS[1], tonumber(ARGV[3])) +-- Bound the profile zset lifetime to the ANN lifetime so a fully idle rule's +-- profile registry eventually disappears instead of accumulating tombstones +redis.call('EXPIRE', KEYS[2], tonumber(ARGV[3])) -- expire in 10m, to not face race condition with other rspamd replicas refill deleted keys redis.call('EXPIRE', KEYS[3] .. '_spam_set', 600) redis.call('EXPIRE', KEYS[3] .. '_ham_set', 600) diff --git a/src/plugins/lua/neural.lua b/src/plugins/lua/neural.lua index b8c787b5cc..4f7af25366 100644 --- a/src/plugins/lua/neural.lua +++ b/src/plugins/lua/neural.lua @@ -621,13 +621,33 @@ local function do_train_ann(worker, ev_base, rule, set, ann_key) }) end +-- Warn (throttled) when a selected profile points at a key with no trained +-- blob. Without throttling this fires every watch_interval and floods the log. +local function maybe_warn_stale_profile(rule, set, ann_key) + local now = rspamd_util.get_time() + if not set.last_stale_warn or (now - set.last_stale_warn) > 600 then + set.last_stale_warn = now + rspamd_logger.warnx(rspamd_config, + 'ANN profile for %s:%s selects Redis key %s which has no trained blob ' .. + '(stale profile entry); falling back to an older live profile', + rule.prefix, set.name, ann_key) + else + lua_util.debugm(N, rspamd_config, + 'stale ANN profile key %s for %s:%s (warning throttled)', + ann_key, rule.prefix, set.name) + end +end + -- This function loads new ann from Redis -- This is based on `profile` attribute. -- ANN is loaded from `profile.redis_key` -- Rank of `profile` key is also increased, unfortunately, it means that we need to -- serialize profile one more time and set its rank to the current time -- set.ann fields are set according to Redis data received -local function load_new_ann(rule, ev_base, set, profile, min_diff) +-- `candidates` (optional) is the best-first list of remaining compatible +-- profiles ({elt, dist}); if `profile`'s blob is missing we fall back to the +-- next live candidate so a stale highest-version entry cannot keep inference dark. +local function load_new_ann(rule, ev_base, set, profile, min_diff, candidates) local ann_key = profile.redis_key local function data_cb(err, data) @@ -674,6 +694,13 @@ local function load_new_ann(rule, ev_base, set, profile, min_diff) 'ZADD', -- command { set.prefix, tostring(rspamd_util.get_time()), profile_serialized } ) + -- Keep an actively-reloaded blob alive: without this the blob + -- expires ann_expire after training even while still in use, and + -- its surviving zset entry becomes a tombstone. The zset TTL + -- itself is refreshed in check_anns every cycle. + lua_redis.redis_make_request_taskless(ev_base, rspamd_config, + rule.redis, nil, true, rank_cb, 'EXPIRE', + { ann_key, tostring(rule.ann_expire) }) rspamd_logger.infox(rspamd_config, 'loaded ANN for %s:%s from %s; %s bytes compressed; version=%s', rule.prefix, set.name, ann_key, #data[1], profile.version) @@ -684,8 +711,21 @@ local function load_new_ann(rule, ev_base, set, profile, min_diff) end end else - lua_util.debugm(N, rspamd_config, 'missing ANN for %s:%s in Redis key %s', - rule.prefix, set.name, ann_key) + maybe_warn_stale_profile(rule, set, ann_key) + -- Fall back to the next compatible profile that actually has a blob. + -- Only move to a candidate that would not downgrade what we already + -- have loaded (strictly newer than set.ann, or anything when dark). + local cur_ver = (set.ann and set.ann.version) or -1 + for i, cand in ipairs(candidates or {}) do + if (cand.elt.version or 0) > cur_ver then + local rest = {} + for j = i + 1, #candidates do + rest[#rest + 1] = candidates[j] + end + load_new_ann(rule, ev_base, set, cand.elt, cand.dist, rest) + break + end + end end if set.ann and set.ann.ann and type(data[2]) == 'userdata' and data[2].cookie == text_cookie then @@ -948,25 +988,38 @@ local function process_existing_ann(_, ev_base, rule, set, profiles) local has_providers = rule.providers and #rule.providers > 0 local current_providers_digest = has_providers and neural_common.providers_config_digest(rule.providers, rule) or nil - local min_diff = math.huge - local sel_elt lua_util.debugm(N, rspamd_config, 'process_existing_ann: have %s profiles for %s:%s (providers_digest=%s)', type(profiles) == 'table' and #profiles or -1, rule.prefix, set.name, current_providers_digest or 'none') + -- Build the best-first list of compatible candidates (smaller distance first, + -- tie-break on higher version). The head is the profile we want to load; the + -- tail is handed to load_new_ann as a fallback for when the head's blob turns + -- out to be missing (a stale highest-version entry pointing at an expired or + -- never-written key). + local candidates = {} for _, elt in fun.iter(profiles) do local compatible, dist = neural_common.is_profile_compatible( rule, set, elt, current_providers_digest) if compatible then - -- Prefer smaller distance; tie-break on higher version - if dist < min_diff - or (dist == min_diff and sel_elt and (elt.version or 0) > (sel_elt.version or 0)) then - min_diff = dist - sel_elt = elt - end + candidates[#candidates + 1] = { elt = elt, dist = dist } end end + table.sort(candidates, function(a, b) + if a.dist ~= b.dist then + return a.dist < b.dist + end + return (a.elt.version or 0) > (b.elt.version or 0) + end) + + local sel = candidates[1] + local sel_elt = sel and sel.elt + local min_diff = sel and sel.dist or math.huge + local fallback_tail = {} + for i = 2, #candidates do + fallback_tail[#fallback_tail + 1] = candidates[i] + end if sel_elt then -- Track the best-known profile as the training target independently of @@ -1001,7 +1054,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles) rule.prefix .. ':' .. set.name, set.ann.version, sel_elt.version) - load_new_ann(rule, ev_base, set, sel_elt, min_diff) + load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail) else lua_util.debugm(N, rspamd_config, 'ann %s is not changed, ' .. 'our version = %s, remote version = %s', @@ -1015,7 +1068,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles) 'providers schema matches for %s; reload newer version %s (ours = %s)', rule.prefix .. ':' .. set.name, sel_elt.version, set.ann.version) - load_new_ann(rule, ev_base, set, sel_elt, min_diff) + load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail) else lua_util.debugm(N, rspamd_config, 'providers schema matches for %s; our version %s >= remote %s, no reload', @@ -1030,7 +1083,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles) rule.prefix .. ':' .. set.name, set.ann.distance, min_diff) - load_new_ann(rule, ev_base, set, sel_elt, min_diff) + load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail) else lua_util.debugm(N, rspamd_config, 'ann %s is not changed or less specific, ' .. 'our distance = %s, remote distance = %s', @@ -1041,7 +1094,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles) end else -- We have no ANN, load new one - load_new_ann(rule, ev_base, set, sel_elt, min_diff) + load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail) end end if sel_elt then @@ -1397,6 +1450,20 @@ local function check_anns(worker, cfg, ev_base, rule, process_callback, what) 'ZREVRANGE', -- command { set.prefix, '0', tostring(settings.max_profiles) } -- arguments ) + -- Refresh the profile-zset TTL while the rule is live, so it never expires + -- out from under a running worker (a stable, non-reloading rule would + -- otherwise let it lapse). Per-tombstone cleanup is handled by the GC in + -- the invalidate script; this TTL only reclaims the registry after a long + -- full shutdown. + lua_redis.redis_make_request_taskless(ev_base, + cfg, + rule.redis, + nil, + true, -- is write + function(_, _) end, + 'EXPIRE', + { set.prefix, tostring(rule.ann_expire) } + ) end end -- Cycle over all settings @@ -1422,10 +1489,17 @@ local function cleanup_anns(rule, cfg, ev_base) end if type(set) == 'table' then + -- Entries older than this with no blob and no training data are tombstones + -- (expired or never-trained profiles) and get GC'd by the script. The + -- grace window must comfortably exceed the rotated-key training-set TTL + -- (600s) so a profile being rotated is never mistaken for a tombstone. + local grace = math.max(1200, (rule.watch_interval or 60) * 4) + local cutoff = rspamd_util.get_time() - grace lua_redis.exec_redis_script(neural_common.redis_script_id.maybe_invalidate, { ev_base = ev_base, is_write = true }, invalidate_cb, - { set.prefix, tostring(settings.max_profiles) }) + { set.prefix, tostring(settings.max_profiles) }, + { tostring(cutoff) }) end end end diff --git a/test/functional/cases/330_neural/005_stale_version.robot b/test/functional/cases/330_neural/005_stale_version.robot new file mode 100644 index 0000000000..8ddb71765e --- /dev/null +++ b/test/functional/cases/330_neural/005_stale_version.robot @@ -0,0 +1,82 @@ +*** Settings *** +Suite Setup Rspamd Redis Setup +Suite Teardown Rspamd Redis Teardown +Library Process +Library Collections +Library ${RSPAMD_TESTDIR}/lib/rspamd.py +Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot +Variables ${RSPAMD_TESTDIR}/lib/vars.py + +*** Variables *** +${CONFIG} ${RSPAMD_TESTDIR}/configs/neural_rotation.conf +${SPAM_MSG} ${RSPAMD_TESTDIR}/messages/spam.eml +${HAM_MSG} ${RSPAMD_TESTDIR}/messages/ham.eml +${REDIS_SCOPE} Suite +${RSPAMD_SCOPE} Suite +${RSPAMD_URL_TLD} ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat + +*** Test Cases *** +Train providers-driven ANN + Sleep 2s Wait for redis and initial check_anns + Scan File ${SPAM_MSG} Settings={symbols_enabled = ["SPAM_SYMBOL1", "SPAM_SYMBOL2", "SPAM_SYMBOL3"]} + Expect Symbol SPAM_SYMBOL1 + Scan File ${HAM_MSG} Settings={symbols_enabled = ["HAM_SYMBOL1", "HAM_SYMBOL2", "HAM_SYMBOL3"]} + Expect Symbol HAM_SYMBOL1 + +Inference fires before the tombstone + Sleep 5s Wait for training to complete and ANN to be reloaded + Scan File ${SPAM_MSG} Settings={symbols_enabled = ["SPAM_SYMBOL1","SPAM_SYMBOL2","SPAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]} + Expect Symbol NEURAL_SPAM_SHORT + +Inject a stale higher-version profile tombstone + # Register a profile whose version exceeds every real one but whose redis_key + # has no trained blob (an expired/never-written key). process_existing_ann + # selects the highest version, so pre-fix this entry would be picked and the + # missing blob would silently leave inference dark. The same providers_digest + # keeps it "compatible", so only the missing-blob fallback can save it. + ${zset} = Get Neural Profile Zset + Set Suite Variable ${NEURAL_ZSET} ${zset} + ${member} = Get Any Profile Member ${zset} + ${tomb} = Make Stale Tombstone ${member} + ${r} = Run Process redis-cli -h ${RSPAMD_REDIS_ADDR} -p ${RSPAMD_REDIS_PORT} + ... ZADD ${zset} 9999999999 ${tomb} + Should Be Equal As Integers ${r.rc} 0 + +Inference still fires despite the stale tombstone + # Clear the loaded ANN so the next check_anns (watch_interval=0.5) re-runs + # selection; the dead highest-version entry must be skipped for the live one. + Scan File ${SPAM_MSG} Settings={symbols_enabled = ["FORCE_ROTATE_NEURAL"];symbols_disabled = ["NEURAL_LEARN","NEURAL_CHECK"]} + Expect Symbol FORCE_ROTATE_NEURAL + Sleep 3s Wait for check_anns to reselect past the tombstone + Scan File ${SPAM_MSG} Settings={symbols_enabled = ["SPAM_SYMBOL1","SPAM_SYMBOL2","SPAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]} + Expect Symbol NEURAL_SPAM_SHORT + Scan File ${HAM_MSG} Settings={symbols_enabled = ["HAM_SYMBOL1","HAM_SYMBOL2","HAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]} + Expect Symbol NEURAL_HAM_SHORT + +*** Keywords *** +Get Neural Profile Zset + # The profile registry zset is the only key matching the rn3_ prefix + # (ANN blobs and training sets use the rn_ prefix). + ${res} = Run Process redis-cli -h ${RSPAMD_REDIS_ADDR} -p ${RSPAMD_REDIS_PORT} + ... KEYS rn3_* + ${zset} = Evaluate $res.stdout.strip().split('\\n')[0] + Should Not Be Empty ${zset} msg=no neural profile zset registered + [Return] ${zset} + +Get Any Profile Member + [Arguments] ${zset} + ${res} = Run Process redis-cli -h ${RSPAMD_REDIS_ADDR} -p ${RSPAMD_REDIS_PORT} + ... ZRANGE ${zset} 0 0 + ${member} = Evaluate $res.stdout.strip() + Should Not Be Empty ${member} msg=no profile member to clone + [Return] ${member} + +Make Stale Tombstone + [Arguments] ${member} + # Clone a real profile, point it at a non-existent key and bump its version + # past everything else; keep digest/providers_digest so it stays compatible. + ${obj} = Evaluate json.loads($member) json + ${origkey} = Set Variable ${obj}[redis_key] + Set To Dictionary ${obj} redis_key=${origkey}_STALE_MISSING version=${99999} + ${tomb} = Evaluate json.dumps($obj) json + [Return] ${tomb} -- 2.47.3