--- Lua script to invalidate ANNs by rank
--- Uses the following keys
--- key1 - prefix for keys
+-- Lua script to invalidate ANNs
+-- Uses the following keys and argv
+-- key1 - prefix for keys (profile zset)
-- key2 - number of elements to leave
+-- argv1 - tombstone cutoff timestamp (optional): profile entries older than this
+-- whose ANN blob no longer exists and that hold no training data are
+-- removed. This clears stale entries left behind when a blob expires or
+-- a profile was registered but never trained, which would otherwise keep
+-- shadowing freshly trained (lower-versioned) ANNs in version selection.
+local removed = {}
+
+-- 1) Rank-based pruning: keep the `lim` newest entries by score (timestamp)
local card = redis.call('ZCARD', KEYS[1])
local lim = tonumber(KEYS[2])
if card > lim then
redis.call('DEL', tb.redis_key .. '_spam_set')
redis.call('DEL', tb.redis_key .. '_ham_set')
end
+ removed[#removed + 1] = k
end
end
redis.call('ZREMRANGEBYRANK', KEYS[1], 0, card - lim - 1)
- return to_delete
-else
- return {}
-end
\ No newline at end of file
+end
+
+-- 2) Tombstone GC: drop entries with no trained blob and no training data that
+-- are older than the cutoff. Entries still accumulating vectors, or younger
+-- than the cutoff (freshly registered profiles awaiting their first train),
+-- are spared regardless of blob presence.
+local cutoff = tonumber(ARGV[1])
+if cutoff then
+ local survivors = redis.call('ZRANGE', KEYS[1], 0, -1, 'WITHSCORES')
+ local i = 1
+ while i <= #survivors do
+ local member = survivors[i]
+ local score = tonumber(survivors[i + 1])
+ i = i + 2
+ if score and score < cutoff then
+ local ok, tb = pcall(cjson.decode, member)
+ if ok and type(tb) == 'table' and type(tb.redis_key) == 'string' then
+ local has_blob = redis.call('HEXISTS', tb.redis_key, 'ann') == 1
+ local has_spam = redis.call('EXISTS', tb.redis_key .. '_spam_set') == 1
+ local has_ham = redis.call('EXISTS', tb.redis_key .. '_ham_set') == 1
+ if not has_blob and not has_spam and not has_ham then
+ redis.call('ZREM', KEYS[1], member)
+ removed[#removed + 1] = member
+ end
+ end
+ end
+ end
+end
+
+return removed
})
end
+-- Warn (throttled) when a selected profile points at a key with no trained
+-- blob. Without throttling this fires every watch_interval and floods the log.
+local function maybe_warn_stale_profile(rule, set, ann_key)
+ local now = rspamd_util.get_time()
+ if not set.last_stale_warn or (now - set.last_stale_warn) > 600 then
+ set.last_stale_warn = now
+ rspamd_logger.warnx(rspamd_config,
+ 'ANN profile for %s:%s selects Redis key %s which has no trained blob ' ..
+ '(stale profile entry); falling back to an older live profile',
+ rule.prefix, set.name, ann_key)
+ else
+ lua_util.debugm(N, rspamd_config,
+ 'stale ANN profile key %s for %s:%s (warning throttled)',
+ ann_key, rule.prefix, set.name)
+ end
+end
+
-- This function loads new ann from Redis
-- This is based on `profile` attribute.
-- ANN is loaded from `profile.redis_key`
-- Rank of `profile` key is also increased, unfortunately, it means that we need to
-- serialize profile one more time and set its rank to the current time
-- set.ann fields are set according to Redis data received
-local function load_new_ann(rule, ev_base, set, profile, min_diff)
+-- `candidates` (optional) is the best-first list of remaining compatible
+-- profiles ({elt, dist}); if `profile`'s blob is missing we fall back to the
+-- next live candidate so a stale highest-version entry cannot keep inference dark.
+local function load_new_ann(rule, ev_base, set, profile, min_diff, candidates)
local ann_key = profile.redis_key
local function data_cb(err, data)
'ZADD', -- command
{ set.prefix, tostring(rspamd_util.get_time()), profile_serialized }
)
+ -- Keep an actively-reloaded blob alive: without this the blob
+ -- expires ann_expire after training even while still in use, and
+ -- its surviving zset entry becomes a tombstone. The zset TTL
+ -- itself is refreshed in check_anns every cycle.
+ lua_redis.redis_make_request_taskless(ev_base, rspamd_config,
+ rule.redis, nil, true, rank_cb, 'EXPIRE',
+ { ann_key, tostring(rule.ann_expire) })
rspamd_logger.infox(rspamd_config,
'loaded ANN for %s:%s from %s; %s bytes compressed; version=%s',
rule.prefix, set.name, ann_key, #data[1], profile.version)
end
end
else
- lua_util.debugm(N, rspamd_config, 'missing ANN for %s:%s in Redis key %s',
- rule.prefix, set.name, ann_key)
+ maybe_warn_stale_profile(rule, set, ann_key)
+ -- Fall back to the next compatible profile that actually has a blob.
+ -- Only move to a candidate that would not downgrade what we already
+ -- have loaded (strictly newer than set.ann, or anything when dark).
+ local cur_ver = (set.ann and set.ann.version) or -1
+ for i, cand in ipairs(candidates or {}) do
+ if (cand.elt.version or 0) > cur_ver then
+ local rest = {}
+ for j = i + 1, #candidates do
+ rest[#rest + 1] = candidates[j]
+ end
+ load_new_ann(rule, ev_base, set, cand.elt, cand.dist, rest)
+ break
+ end
+ end
end
if set.ann and set.ann.ann and type(data[2]) == 'userdata' and data[2].cookie == text_cookie then
local has_providers = rule.providers and #rule.providers > 0
local current_providers_digest = has_providers and
neural_common.providers_config_digest(rule.providers, rule) or nil
- local min_diff = math.huge
- local sel_elt
lua_util.debugm(N, rspamd_config,
'process_existing_ann: have %s profiles for %s:%s (providers_digest=%s)',
type(profiles) == 'table' and #profiles or -1, rule.prefix, set.name,
current_providers_digest or 'none')
+ -- Build the best-first list of compatible candidates (smaller distance first,
+ -- tie-break on higher version). The head is the profile we want to load; the
+ -- tail is handed to load_new_ann as a fallback for when the head's blob turns
+ -- out to be missing (a stale highest-version entry pointing at an expired or
+ -- never-written key).
+ local candidates = {}
for _, elt in fun.iter(profiles) do
local compatible, dist = neural_common.is_profile_compatible(
rule, set, elt, current_providers_digest)
if compatible then
- -- Prefer smaller distance; tie-break on higher version
- if dist < min_diff
- or (dist == min_diff and sel_elt and (elt.version or 0) > (sel_elt.version or 0)) then
- min_diff = dist
- sel_elt = elt
- end
+ candidates[#candidates + 1] = { elt = elt, dist = dist }
end
end
+ table.sort(candidates, function(a, b)
+ if a.dist ~= b.dist then
+ return a.dist < b.dist
+ end
+ return (a.elt.version or 0) > (b.elt.version or 0)
+ end)
+
+ local sel = candidates[1]
+ local sel_elt = sel and sel.elt
+ local min_diff = sel and sel.dist or math.huge
+ local fallback_tail = {}
+ for i = 2, #candidates do
+ fallback_tail[#fallback_tail + 1] = candidates[i]
+ end
if sel_elt then
-- Track the best-known profile as the training target independently of
rule.prefix .. ':' .. set.name,
set.ann.version,
sel_elt.version)
- load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+ load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
else
lua_util.debugm(N, rspamd_config, 'ann %s is not changed, ' ..
'our version = %s, remote version = %s',
'providers schema matches for %s; reload newer version %s (ours = %s)',
rule.prefix .. ':' .. set.name,
sel_elt.version, set.ann.version)
- load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+ load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
else
lua_util.debugm(N, rspamd_config,
'providers schema matches for %s; our version %s >= remote %s, no reload',
rule.prefix .. ':' .. set.name,
set.ann.distance,
min_diff)
- load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+ load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
else
lua_util.debugm(N, rspamd_config, 'ann %s is not changed or less specific, ' ..
'our distance = %s, remote distance = %s',
end
else
-- We have no ANN, load new one
- load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+ load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
end
end
if sel_elt then
'ZREVRANGE', -- command
{ set.prefix, '0', tostring(settings.max_profiles) } -- arguments
)
+ -- Refresh the profile-zset TTL while the rule is live, so it never expires
+ -- out from under a running worker (a stable, non-reloading rule would
+ -- otherwise let it lapse). Per-tombstone cleanup is handled by the GC in
+ -- the invalidate script; this TTL only reclaims the registry after a long
+ -- full shutdown.
+ lua_redis.redis_make_request_taskless(ev_base,
+ cfg,
+ rule.redis,
+ nil,
+ true, -- is write
+ function(_, _) end,
+ 'EXPIRE',
+ { set.prefix, tostring(rule.ann_expire) }
+ )
end
end -- Cycle over all settings
end
if type(set) == 'table' then
+ -- Entries older than this with no blob and no training data are tombstones
+ -- (expired or never-trained profiles) and get GC'd by the script. The
+ -- grace window must comfortably exceed the rotated-key training-set TTL
+ -- (600s) so a profile being rotated is never mistaken for a tombstone.
+ local grace = math.max(1200, (rule.watch_interval or 60) * 4)
+ local cutoff = rspamd_util.get_time() - grace
lua_redis.exec_redis_script(neural_common.redis_script_id.maybe_invalidate,
{ ev_base = ev_base, is_write = true },
invalidate_cb,
- { set.prefix, tostring(settings.max_profiles) })
+ { set.prefix, tostring(settings.max_profiles) },
+ { tostring(cutoff) })
end
end
end
--- /dev/null
+*** Settings ***
+Suite Setup Rspamd Redis Setup
+Suite Teardown Rspamd Redis Teardown
+Library Process
+Library Collections
+Library ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG} ${RSPAMD_TESTDIR}/configs/neural_rotation.conf
+${SPAM_MSG} ${RSPAMD_TESTDIR}/messages/spam.eml
+${HAM_MSG} ${RSPAMD_TESTDIR}/messages/ham.eml
+${REDIS_SCOPE} Suite
+${RSPAMD_SCOPE} Suite
+${RSPAMD_URL_TLD} ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+
+*** Test Cases ***
+Train providers-driven ANN
+ Sleep 2s Wait for redis and initial check_anns
+ Scan File ${SPAM_MSG} Settings={symbols_enabled = ["SPAM_SYMBOL1", "SPAM_SYMBOL2", "SPAM_SYMBOL3"]}
+ Expect Symbol SPAM_SYMBOL1
+ Scan File ${HAM_MSG} Settings={symbols_enabled = ["HAM_SYMBOL1", "HAM_SYMBOL2", "HAM_SYMBOL3"]}
+ Expect Symbol HAM_SYMBOL1
+
+Inference fires before the tombstone
+ Sleep 5s Wait for training to complete and ANN to be reloaded
+ Scan File ${SPAM_MSG} Settings={symbols_enabled = ["SPAM_SYMBOL1","SPAM_SYMBOL2","SPAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]}
+ Expect Symbol NEURAL_SPAM_SHORT
+
+Inject a stale higher-version profile tombstone
+ # Register a profile whose version exceeds every real one but whose redis_key
+ # has no trained blob (an expired/never-written key). process_existing_ann
+ # selects the highest version, so pre-fix this entry would be picked and the
+ # missing blob would silently leave inference dark. The same providers_digest
+ # keeps it "compatible", so only the missing-blob fallback can save it.
+ ${zset} = Get Neural Profile Zset
+ Set Suite Variable ${NEURAL_ZSET} ${zset}
+ ${member} = Get Any Profile Member ${zset}
+ ${tomb} = Make Stale Tombstone ${member}
+ ${r} = Run Process redis-cli -h ${RSPAMD_REDIS_ADDR} -p ${RSPAMD_REDIS_PORT}
+ ... ZADD ${zset} 9999999999 ${tomb}
+ Should Be Equal As Integers ${r.rc} 0
+
+Inference still fires despite the stale tombstone
+ # Clear the loaded ANN so the next check_anns (watch_interval=0.5) re-runs
+ # selection; the dead highest-version entry must be skipped for the live one.
+ Scan File ${SPAM_MSG} Settings={symbols_enabled = ["FORCE_ROTATE_NEURAL"];symbols_disabled = ["NEURAL_LEARN","NEURAL_CHECK"]}
+ Expect Symbol FORCE_ROTATE_NEURAL
+ Sleep 3s Wait for check_anns to reselect past the tombstone
+ Scan File ${SPAM_MSG} Settings={symbols_enabled = ["SPAM_SYMBOL1","SPAM_SYMBOL2","SPAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]}
+ Expect Symbol NEURAL_SPAM_SHORT
+ Scan File ${HAM_MSG} Settings={symbols_enabled = ["HAM_SYMBOL1","HAM_SYMBOL2","HAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]}
+ Expect Symbol NEURAL_HAM_SHORT
+
+*** Keywords ***
+Get Neural Profile Zset
+ # The profile registry zset is the only key matching the rn3_ prefix
+ # (ANN blobs and training sets use the rn_ prefix).
+ ${res} = Run Process redis-cli -h ${RSPAMD_REDIS_ADDR} -p ${RSPAMD_REDIS_PORT}
+ ... KEYS rn3_*
+ ${zset} = Evaluate $res.stdout.strip().split('\\n')[0]
+ Should Not Be Empty ${zset} msg=no neural profile zset registered
+ [Return] ${zset}
+
+Get Any Profile Member
+ [Arguments] ${zset}
+ ${res} = Run Process redis-cli -h ${RSPAMD_REDIS_ADDR} -p ${RSPAMD_REDIS_PORT}
+ ... ZRANGE ${zset} 0 0
+ ${member} = Evaluate $res.stdout.strip()
+ Should Not Be Empty ${member} msg=no profile member to clone
+ [Return] ${member}
+
+Make Stale Tombstone
+ [Arguments] ${member}
+ # Clone a real profile, point it at a non-existent key and bump its version
+ # past everything else; keep digest/providers_digest so it stays compatible.
+ ${obj} = Evaluate json.loads($member) json
+ ${origkey} = Set Variable ${obj}[redis_key]
+ Set To Dictionary ${obj} redis_key=${origkey}_STALE_MISSING version=${99999}
+ ${tomb} = Evaluate json.dumps($obj) json
+ [Return] ${tomb}