]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] neural: don't strand trained ANNs behind tombstones 6089/head
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 9 Jun 2026 11:43:40 +0000 (12:43 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 9 Jun 2026 11:44:01 +0000 (12:44 +0100)
A trained ANN could become unreachable to workers even though
training succeeded: NEURAL_SPAM/NEURAL_HAM stopped firing while the
controller logged "ann ... is changed, our version = N, remote
version = M" forever.

Root cause is a version regression, not a missing zset registration.
The new version was seeded from the in-memory set.ann, and
fill_set_ann resets set.ann.version to 0 whenever a worker never
loaded an ANN (restart, or the selected profile's blob was missing).
A worker that trained from the _4 profile then saved version 1.
process_existing_ann selects the highest version among compatible
profiles, so the live version-1 blob was shadowed by the stale
version-4 zset entry whose key was empty. The profile zset has no
TTL, so the dead high-version tombstone was immortal and the
condition self-perpetuated (the _4 blob was never rewritten).

Three fixes:

1. Version monotonicity (lualib/plugins/neural.lua): seed the new
   version from the profile actually trained from (the trained-from
   key encodes it as the trailing _<n>), max'd with
   training_profile/set.ann, so the new entry always outranks the
   profile it supersedes.

2. Liveness-aware selection (src/plugins/lua/neural.lua,
   neural_maybe_invalidate.lua): when the selected profile's blob is
   missing, fall back to the next compatible profile with a live blob
   instead of going dark, and emit a throttled warning (was a silent
   debug line). The invalidate script also GCs profile entries that
   have no blob and no training data and are older than a grace
   window.

3. Lifetime coupling (neural_save_unlock.lua,
   src/plugins/lua/neural.lua): give the profile zset a TTL refreshed
   each check_anns cycle, and refresh the blob TTL on every reload,
   so an actively used ANN never expires out from under its entry.

Adds 330_neural/005_stale_version.robot, which injects a
higher-version tombstone and asserts inference recovers.

lualib/plugins/neural.lua
lualib/redis_scripts/neural_maybe_invalidate.lua
lualib/redis_scripts/neural_save_unlock.lua
src/plugins/lua/neural.lua
test/functional/cases/330_neural/005_stale_version.robot [new file with mode: 0644]

index a116adc14a39bafd038559277177278627ecfee0..83074d0ce0b39e9799b59623f1d6e331f28b8cb4 100644 (file)
@@ -1295,7 +1295,21 @@ local function spawn_train(params)
 
         -- Deserialise ANN from the child process
         local loaded_ann = rspamd_kann.load(parsed.ann_data)
-        local version = (params.set.ann.version or 0) + 1
+        -- Seed the new version from the profile we actually trained from, not
+        -- from the in-memory set.ann: fill_set_ann resets set.ann.version to 0
+        -- whenever this worker never loaded an ANN (restart, or the selected
+        -- profile's blob was missing), which would make the freshly trained ANN
+        -- regress below the stale zset entries.  process_existing_ann selects by
+        -- highest version, so a regressed entry is never picked and the blob is
+        -- stranded.  The trained-from key encodes its version as the trailing
+        -- _<n>; basing the new version on it guarantees the new entry outranks
+        -- the profile it supersedes.
+        local trained_from_version = tonumber(tostring(params.ann_key):match('_(%d+)$'))
+        local base_version = math.max(
+          trained_from_version or 0,
+          (params.set.training_profile and params.set.training_profile.version) or 0,
+          (params.set.ann and params.set.ann.version) or 0)
+        local version = base_version + 1
         params.set.ann.version = version
         params.set.ann.ann = loaded_ann
         params.set.ann.symbols = params.set.symbols
index 517fa019d466cba52c3d8e3621759ef0f24f7948..03c308ec1a6ef82e2099884742b6ce065aaa9319 100644 (file)
@@ -1,8 +1,16 @@
--- Lua script to invalidate ANNs by rank
--- Uses the following keys
--- key1 - prefix for keys
+-- Lua script to invalidate ANNs
+-- Uses the following keys and argv
+-- key1 - prefix for keys (profile zset)
 -- key2 - number of elements to leave
+-- argv1 - tombstone cutoff timestamp (optional): profile entries older than this
+--         whose ANN blob no longer exists and that hold no training data are
+--         removed.  This clears stale entries left behind when a blob expires or
+--         a profile was registered but never trained, which would otherwise keep
+--         shadowing freshly trained (lower-versioned) ANNs in version selection.
 
+local removed = {}
+
+-- 1) Rank-based pruning: keep the `lim` newest entries by score (timestamp)
 local card = redis.call('ZCARD', KEYS[1])
 local lim = tonumber(KEYS[2])
 if card > lim then
@@ -16,10 +24,37 @@ if card > lim then
         redis.call('DEL', tb.redis_key .. '_spam_set')
         redis.call('DEL', tb.redis_key .. '_ham_set')
       end
+      removed[#removed + 1] = k
     end
   end
   redis.call('ZREMRANGEBYRANK', KEYS[1], 0, card - lim - 1)
-  return to_delete
-else
-  return {}
-end
\ No newline at end of file
+end
+
+-- 2) Tombstone GC: drop entries with no trained blob and no training data that
+--    are older than the cutoff.  Entries still accumulating vectors, or younger
+--    than the cutoff (freshly registered profiles awaiting their first train),
+--    are spared regardless of blob presence.
+local cutoff = tonumber(ARGV[1])
+if cutoff then
+  local survivors = redis.call('ZRANGE', KEYS[1], 0, -1, 'WITHSCORES')
+  local i = 1
+  while i <= #survivors do
+    local member = survivors[i]
+    local score = tonumber(survivors[i + 1])
+    i = i + 2
+    if score and score < cutoff then
+      local ok, tb = pcall(cjson.decode, member)
+      if ok and type(tb) == 'table' and type(tb.redis_key) == 'string' then
+        local has_blob = redis.call('HEXISTS', tb.redis_key, 'ann') == 1
+        local has_spam = redis.call('EXISTS', tb.redis_key .. '_spam_set') == 1
+        local has_ham = redis.call('EXISTS', tb.redis_key .. '_ham_set') == 1
+        if not has_blob and not has_spam and not has_ham then
+          redis.call('ZREM', KEYS[1], member)
+          removed[#removed + 1] = member
+        end
+      end
+    end
+  end
+end
+
+return removed
index c6d6dc3572629cb2a61d4db1b5d90e5da23798f4..622ba50148797432c9d5f8c56e743c8ab838abde 100644 (file)
@@ -29,6 +29,9 @@ redis.call('HDEL', KEYS[3], 'lock')
 redis.call('HSET', KEYS[3], 'obsolete', '1')
 redis.call('EXPIRE', KEYS[3], 600)
 redis.call('EXPIRE', KEYS[1], tonumber(ARGV[3]))
+-- Bound the profile zset lifetime to the ANN lifetime so a fully idle rule's
+-- profile registry eventually disappears instead of accumulating tombstones
+redis.call('EXPIRE', KEYS[2], tonumber(ARGV[3]))
 -- expire in 10m, to not face race condition with other rspamd replicas refill deleted keys
 redis.call('EXPIRE', KEYS[3] .. '_spam_set', 600)
 redis.call('EXPIRE', KEYS[3] .. '_ham_set', 600)
index b8c787b5cc26fafa1173e69dd6cdb476c4624c3f..4f7af253663c1feb7623595cdf9b82b8a43cf259 100644 (file)
@@ -621,13 +621,33 @@ local function do_train_ann(worker, ev_base, rule, set, ann_key)
     })
 end
 
+-- Warn (throttled) when a selected profile points at a key with no trained
+-- blob.  Without throttling this fires every watch_interval and floods the log.
+local function maybe_warn_stale_profile(rule, set, ann_key)
+  local now = rspamd_util.get_time()
+  if not set.last_stale_warn or (now - set.last_stale_warn) > 600 then
+    set.last_stale_warn = now
+    rspamd_logger.warnx(rspamd_config,
+      'ANN profile for %s:%s selects Redis key %s which has no trained blob ' ..
+      '(stale profile entry); falling back to an older live profile',
+      rule.prefix, set.name, ann_key)
+  else
+    lua_util.debugm(N, rspamd_config,
+      'stale ANN profile key %s for %s:%s (warning throttled)',
+      ann_key, rule.prefix, set.name)
+  end
+end
+
 -- This function loads new ann from Redis
 -- This is based on `profile` attribute.
 -- ANN is loaded from `profile.redis_key`
 -- Rank of `profile` key is also increased, unfortunately, it means that we need to
 -- serialize profile one more time and set its rank to the current time
 -- set.ann fields are set according to Redis data received
-local function load_new_ann(rule, ev_base, set, profile, min_diff)
+-- `candidates` (optional) is the best-first list of remaining compatible
+-- profiles ({elt, dist}); if `profile`'s blob is missing we fall back to the
+-- next live candidate so a stale highest-version entry cannot keep inference dark.
+local function load_new_ann(rule, ev_base, set, profile, min_diff, candidates)
   local ann_key = profile.redis_key
 
   local function data_cb(err, data)
@@ -674,6 +694,13 @@ local function load_new_ann(rule, ev_base, set, profile, min_diff)
                 'ZADD',  -- command
                 { set.prefix, tostring(rspamd_util.get_time()), profile_serialized }
               )
+              -- Keep an actively-reloaded blob alive: without this the blob
+              -- expires ann_expire after training even while still in use, and
+              -- its surviving zset entry becomes a tombstone.  The zset TTL
+              -- itself is refreshed in check_anns every cycle.
+              lua_redis.redis_make_request_taskless(ev_base, rspamd_config,
+                rule.redis, nil, true, rank_cb, 'EXPIRE',
+                { ann_key, tostring(rule.ann_expire) })
               rspamd_logger.infox(rspamd_config,
                 'loaded ANN for %s:%s from %s; %s bytes compressed; version=%s',
                 rule.prefix, set.name, ann_key, #data[1], profile.version)
@@ -684,8 +711,21 @@ local function load_new_ann(rule, ev_base, set, profile, min_diff)
             end
           end
         else
-          lua_util.debugm(N, rspamd_config, 'missing ANN for %s:%s in Redis key %s',
-            rule.prefix, set.name, ann_key)
+          maybe_warn_stale_profile(rule, set, ann_key)
+          -- Fall back to the next compatible profile that actually has a blob.
+          -- Only move to a candidate that would not downgrade what we already
+          -- have loaded (strictly newer than set.ann, or anything when dark).
+          local cur_ver = (set.ann and set.ann.version) or -1
+          for i, cand in ipairs(candidates or {}) do
+            if (cand.elt.version or 0) > cur_ver then
+              local rest = {}
+              for j = i + 1, #candidates do
+                rest[#rest + 1] = candidates[j]
+              end
+              load_new_ann(rule, ev_base, set, cand.elt, cand.dist, rest)
+              break
+            end
+          end
         end
 
         if set.ann and set.ann.ann and type(data[2]) == 'userdata' and data[2].cookie == text_cookie then
@@ -948,25 +988,38 @@ local function process_existing_ann(_, ev_base, rule, set, profiles)
   local has_providers = rule.providers and #rule.providers > 0
   local current_providers_digest = has_providers and
       neural_common.providers_config_digest(rule.providers, rule) or nil
-  local min_diff = math.huge
-  local sel_elt
   lua_util.debugm(N, rspamd_config,
     'process_existing_ann: have %s profiles for %s:%s (providers_digest=%s)',
     type(profiles) == 'table' and #profiles or -1, rule.prefix, set.name,
     current_providers_digest or 'none')
 
+  -- Build the best-first list of compatible candidates (smaller distance first,
+  -- tie-break on higher version).  The head is the profile we want to load; the
+  -- tail is handed to load_new_ann as a fallback for when the head's blob turns
+  -- out to be missing (a stale highest-version entry pointing at an expired or
+  -- never-written key).
+  local candidates = {}
   for _, elt in fun.iter(profiles) do
     local compatible, dist = neural_common.is_profile_compatible(
       rule, set, elt, current_providers_digest)
     if compatible then
-      -- Prefer smaller distance; tie-break on higher version
-      if dist < min_diff
-          or (dist == min_diff and sel_elt and (elt.version or 0) > (sel_elt.version or 0)) then
-        min_diff = dist
-        sel_elt = elt
-      end
+      candidates[#candidates + 1] = { elt = elt, dist = dist }
     end
   end
+  table.sort(candidates, function(a, b)
+    if a.dist ~= b.dist then
+      return a.dist < b.dist
+    end
+    return (a.elt.version or 0) > (b.elt.version or 0)
+  end)
+
+  local sel = candidates[1]
+  local sel_elt = sel and sel.elt
+  local min_diff = sel and sel.dist or math.huge
+  local fallback_tail = {}
+  for i = 2, #candidates do
+    fallback_tail[#fallback_tail + 1] = candidates[i]
+  end
 
   if sel_elt then
     -- Track the best-known profile as the training target independently of
@@ -1001,7 +1054,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles)
             rule.prefix .. ':' .. set.name,
             set.ann.version,
             sel_elt.version)
-          load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+          load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
         else
           lua_util.debugm(N, rspamd_config, 'ann %s is not changed, ' ..
             'our version = %s, remote version = %s',
@@ -1015,7 +1068,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles)
             'providers schema matches for %s; reload newer version %s (ours = %s)',
             rule.prefix .. ':' .. set.name,
             sel_elt.version, set.ann.version)
-          load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+          load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
         else
           lua_util.debugm(N, rspamd_config,
             'providers schema matches for %s; our version %s >= remote %s, no reload',
@@ -1030,7 +1083,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles)
             rule.prefix .. ':' .. set.name,
             set.ann.distance,
             min_diff)
-          load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+          load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
         else
           lua_util.debugm(N, rspamd_config, 'ann %s is not changed or less specific, ' ..
             'our distance = %s, remote distance = %s',
@@ -1041,7 +1094,7 @@ local function process_existing_ann(_, ev_base, rule, set, profiles)
       end
     else
       -- We have no ANN, load new one
-      load_new_ann(rule, ev_base, set, sel_elt, min_diff)
+      load_new_ann(rule, ev_base, set, sel_elt, min_diff, fallback_tail)
     end
   end
   if sel_elt then
@@ -1397,6 +1450,20 @@ local function check_anns(worker, cfg, ev_base, rule, process_callback, what)
         'ZREVRANGE',                                         -- command
         { set.prefix, '0', tostring(settings.max_profiles) } -- arguments
       )
+      -- Refresh the profile-zset TTL while the rule is live, so it never expires
+      -- out from under a running worker (a stable, non-reloading rule would
+      -- otherwise let it lapse).  Per-tombstone cleanup is handled by the GC in
+      -- the invalidate script; this TTL only reclaims the registry after a long
+      -- full shutdown.
+      lua_redis.redis_make_request_taskless(ev_base,
+        cfg,
+        rule.redis,
+        nil,
+        true, -- is write
+        function(_, _) end,
+        'EXPIRE',
+        { set.prefix, tostring(rule.ann_expire) }
+      )
     end
   end -- Cycle over all settings
 
@@ -1422,10 +1489,17 @@ local function cleanup_anns(rule, cfg, ev_base)
     end
 
     if type(set) == 'table' then
+      -- Entries older than this with no blob and no training data are tombstones
+      -- (expired or never-trained profiles) and get GC'd by the script.  The
+      -- grace window must comfortably exceed the rotated-key training-set TTL
+      -- (600s) so a profile being rotated is never mistaken for a tombstone.
+      local grace = math.max(1200, (rule.watch_interval or 60) * 4)
+      local cutoff = rspamd_util.get_time() - grace
       lua_redis.exec_redis_script(neural_common.redis_script_id.maybe_invalidate,
         { ev_base = ev_base, is_write = true },
         invalidate_cb,
-        { set.prefix, tostring(settings.max_profiles) })
+        { set.prefix, tostring(settings.max_profiles) },
+        { tostring(cutoff) })
     end
   end
 end
diff --git a/test/functional/cases/330_neural/005_stale_version.robot b/test/functional/cases/330_neural/005_stale_version.robot
new file mode 100644 (file)
index 0000000..8ddb717
--- /dev/null
@@ -0,0 +1,82 @@
+*** Settings ***
+Suite Setup      Rspamd Redis Setup
+Suite Teardown   Rspamd Redis Teardown
+Library         Process
+Library         Collections
+Library         ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource        ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables       ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG}          ${RSPAMD_TESTDIR}/configs/neural_rotation.conf
+${SPAM_MSG}        ${RSPAMD_TESTDIR}/messages/spam.eml
+${HAM_MSG}         ${RSPAMD_TESTDIR}/messages/ham.eml
+${REDIS_SCOPE}     Suite
+${RSPAMD_SCOPE}    Suite
+${RSPAMD_URL_TLD}  ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+
+*** Test Cases ***
+Train providers-driven ANN
+  Sleep  2s  Wait for redis and initial check_anns
+  Scan File  ${SPAM_MSG}  Settings={symbols_enabled = ["SPAM_SYMBOL1", "SPAM_SYMBOL2", "SPAM_SYMBOL3"]}
+  Expect Symbol  SPAM_SYMBOL1
+  Scan File  ${HAM_MSG}   Settings={symbols_enabled = ["HAM_SYMBOL1", "HAM_SYMBOL2", "HAM_SYMBOL3"]}
+  Expect Symbol  HAM_SYMBOL1
+
+Inference fires before the tombstone
+  Sleep  5s  Wait for training to complete and ANN to be reloaded
+  Scan File  ${SPAM_MSG}  Settings={symbols_enabled = ["SPAM_SYMBOL1","SPAM_SYMBOL2","SPAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]}
+  Expect Symbol  NEURAL_SPAM_SHORT
+
+Inject a stale higher-version profile tombstone
+  # Register a profile whose version exceeds every real one but whose redis_key
+  # has no trained blob (an expired/never-written key).  process_existing_ann
+  # selects the highest version, so pre-fix this entry would be picked and the
+  # missing blob would silently leave inference dark.  The same providers_digest
+  # keeps it "compatible", so only the missing-blob fallback can save it.
+  ${zset} =  Get Neural Profile Zset
+  Set Suite Variable  ${NEURAL_ZSET}  ${zset}
+  ${member} =  Get Any Profile Member  ${zset}
+  ${tomb} =  Make Stale Tombstone  ${member}
+  ${r} =  Run Process  redis-cli  -h  ${RSPAMD_REDIS_ADDR}  -p  ${RSPAMD_REDIS_PORT}
+  ...  ZADD  ${zset}  9999999999  ${tomb}
+  Should Be Equal As Integers  ${r.rc}  0
+
+Inference still fires despite the stale tombstone
+  # Clear the loaded ANN so the next check_anns (watch_interval=0.5) re-runs
+  # selection; the dead highest-version entry must be skipped for the live one.
+  Scan File  ${SPAM_MSG}  Settings={symbols_enabled = ["FORCE_ROTATE_NEURAL"];symbols_disabled = ["NEURAL_LEARN","NEURAL_CHECK"]}
+  Expect Symbol  FORCE_ROTATE_NEURAL
+  Sleep  3s  Wait for check_anns to reselect past the tombstone
+  Scan File  ${SPAM_MSG}  Settings={symbols_enabled = ["SPAM_SYMBOL1","SPAM_SYMBOL2","SPAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]}
+  Expect Symbol  NEURAL_SPAM_SHORT
+  Scan File  ${HAM_MSG}   Settings={symbols_enabled = ["HAM_SYMBOL1","HAM_SYMBOL2","HAM_SYMBOL3"];groups_enabled=["neural"];symbols_disabled = ["NEURAL_LEARN"]}
+  Expect Symbol  NEURAL_HAM_SHORT
+
+*** Keywords ***
+Get Neural Profile Zset
+  # The profile registry zset is the only key matching the rn3_ prefix
+  # (ANN blobs and training sets use the rn_ prefix).
+  ${res} =  Run Process  redis-cli  -h  ${RSPAMD_REDIS_ADDR}  -p  ${RSPAMD_REDIS_PORT}
+  ...  KEYS  rn3_*
+  ${zset} =  Evaluate  $res.stdout.strip().split('\\n')[0]
+  Should Not Be Empty  ${zset}  msg=no neural profile zset registered
+  [Return]  ${zset}
+
+Get Any Profile Member
+  [Arguments]  ${zset}
+  ${res} =  Run Process  redis-cli  -h  ${RSPAMD_REDIS_ADDR}  -p  ${RSPAMD_REDIS_PORT}
+  ...  ZRANGE  ${zset}  0  0
+  ${member} =  Evaluate  $res.stdout.strip()
+  Should Not Be Empty  ${member}  msg=no profile member to clone
+  [Return]  ${member}
+
+Make Stale Tombstone
+  [Arguments]  ${member}
+  # Clone a real profile, point it at a non-existent key and bump its version
+  # past everything else; keep digest/providers_digest so it stays compatible.
+  ${obj} =  Evaluate  json.loads($member)  json
+  ${origkey} =  Set Variable  ${obj}[redis_key]
+  Set To Dictionary  ${obj}  redis_key=${origkey}_STALE_MISSING  version=${99999}
+  ${tomb} =  Evaluate  json.dumps($obj)  json
+  [Return]  ${tomb}