]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] lua_redis: add prepare_redis_setup for rspamadm tools
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 2 May 2026 13:29:00 +0000 (14:29 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 2 May 2026 13:29:00 +0000 (14:29 +0100)
The Sentinel watcher in lualib/lua_redis.lua is registered via
rspamd_config:add_on_load, but those callbacks are only fired by
rspamd_lua_run_postloads, which is invoked from worker.c, controller.c,
fuzzy_storage.c, and rspamd_proxy.c — never from rspamadm. Standalone
rspamadm tools (rspamadm dmarc_report etc.) therefore never resolve the
current Redis master and end up round-robining writes across all nodes,
which breaks under Sentinel: writes that land on a replica fail with
READONLY and the tool silently produces empty results (#6009).

Introduce lua_redis.prepare_redis_setup(redis_params, opts, callback) as
a one-shot synchronous initializer for rspamadm-style tools, where on_load
callbacks never run and we don't want background periodics. It performs,
per opts (merged via lua_util.override_defaults):

  * sentinels = true:  query SENTINEL masters / SENTINEL slaves via
    rspamd_redis.connect_sync and rewrite redis_params.read_servers /
    redis_params.write_servers in place.
  * scripts = true | false | { id, ... }:  SCRIPT LOAD all (or selected)
    scripts registered against this redis_params via add_redis_script.
  * timeout / ev_base / session / config:  IO knobs; ev_base and session
    default to rspamadm_ev_base / rspamadm_session.

The callback is invoked as callback(err) — nil on success.

Wire dmarc_report through the new helper so writes after the initial
RENAME land on the actual master under Sentinel.

Closes #6009.

lualib/lua_redis.lua
lualib/rspamadm/dmarc_report.lua
test/lua/unit/lua_redis.prepare_setup.lua [new file with mode: 0644]

index a9e3ec2ef7f1d4fbb5c5f0cd9fbded67de8a1ec4..750cdf144dfe6f94cd817518c8cc400d3ccf3ee8 100644 (file)
@@ -256,15 +256,16 @@ end
 
 exports.enrich_schema = enrich_schema
 
-local function redis_query_sentinel(ev_base, params, initialised)
-  local function flatten_redis_table(tbl)
-    local res = {}
-    for i = 1, #tbl, 2 do
-      res[tbl[i]] = tbl[i + 1]
-    end
-
-    return res
+local function flatten_redis_table(tbl)
+  local res = {}
+  for i = 1, #tbl, 2 do
+    res[tbl[i]] = tbl[i + 1]
   end
+
+  return res
+end
+
+local function redis_query_sentinel(ev_base, params, initialised)
   -- Coroutines syntax
   local rspamd_redis = require "rspamd_redis"
   local sentinels = params.sentinels
@@ -2202,4 +2203,370 @@ exports.prefixes = function(mname)
   end
 end
 
+-- Build new read/write upstream lists from a flattened sentinel topology
+-- (table of {[master_name] = { ip, port, slaves = {...} }}) and replace
+-- params.read_servers / params.write_servers in-place. Used by the
+-- synchronous rspamadm path; the async sentinel watcher in worker context
+-- has its own variant that also installs failure-monitor watchers.
+local function apply_sentinel_topology(params, masters)
+  local read_servers_tbl, write_servers_tbl = {}, {}
+
+  for _, master in pairs(masters) do
+    write_servers_tbl[#write_servers_tbl + 1] = string.format(
+        '%s:%s', master.ip, master.port)
+    read_servers_tbl[#read_servers_tbl + 1] = string.format(
+        '%s:%s', master.ip, master.port)
+
+    for _, slave in ipairs(master.slaves or E) do
+      if slave['master-link-status'] == 'ok' then
+        read_servers_tbl[#read_servers_tbl + 1] = string.format(
+            '%s:%s', slave.ip, slave.port)
+      end
+    end
+  end
+
+  if #write_servers_tbl == 0 then
+    return false, 'no masters discovered from sentinel'
+  end
+
+  table.sort(read_servers_tbl)
+  table.sort(write_servers_tbl)
+
+  local read_servers_str = table.concat(read_servers_tbl, ',')
+  local write_servers_str = table.concat(write_servers_tbl, ',')
+
+  local upstream_list = require "rspamd_upstream_list"
+
+  if read_servers_str ~= params.read_servers_str then
+    local read_upstreams = upstream_list.create(rspamd_config,
+        read_servers_str, 6379)
+    if read_upstreams then
+      logger.infox(rspamd_config,
+          'sentinel: replace read servers with new list: %s', read_servers_str)
+      params.read_servers = read_upstreams
+      params.read_servers_str = read_servers_str
+    else
+      return false, string.format('cannot parse read servers list: %s', read_servers_str)
+    end
+  end
+
+  if write_servers_str ~= params.write_servers_str then
+    local write_upstreams = upstream_list.create(rspamd_config,
+        write_servers_str, 6379)
+    if write_upstreams then
+      logger.infox(rspamd_config,
+          'sentinel: replace write servers with new list: %s', write_servers_str)
+      params.write_servers = write_upstreams
+      params.write_servers_str = write_servers_str
+    else
+      return false, string.format('cannot parse write servers list: %s', write_servers_str)
+    end
+  end
+
+  return true
+end
+
+-- Synchronously query Redis Sentinel via rspamd_redis.connect_sync to populate
+-- masters and slaves, then rewrite redis_params via apply_sentinel_topology.
+-- Walks the configured sentinel upstreams in round-robin order until one
+-- answers; only fails after exhausting them all.
+local function redis_resolve_sentinels_sync(redis_params, attrs)
+  if not redis_params.sentinels then
+    return true
+  end
+
+  local rspamd_redis = require "rspamd_redis"
+  local sentinels = redis_params.sentinels
+  local last_err = 'no sentinel upstreams to query'
+
+  -- Try each sentinel until one succeeds; same upstream may be picked twice
+  -- in a row depending on its weight, so we just bound by upstream count.
+  local nsentinels = #(sentinels:all_upstreams() or E)
+  if nsentinels == 0 then
+    return false, 'sentinel upstream list is empty'
+  end
+
+  for _ = 1, nsentinels do
+    local addr = sentinels:get_upstream_round_robin()
+    if not addr then
+      return false, last_err
+    end
+
+    local sentinel_addr = addr:get_addr()
+    if not sentinel_addr then
+      addr:fail()
+      last_err = 'sentinel address not resolved'
+    else
+      local options = {
+        host = sentinel_addr,
+        timeout = attrs.timeout or redis_params.timeout or 1.0,
+        config = attrs.config or rspamd_config,
+        ev_base = attrs.ev_base,
+        session = attrs.session,
+        no_pool = true,
+      }
+      if redis_params.sentinel_username then
+        options.username = redis_params.sentinel_username
+      end
+      if redis_params.sentinel_password then
+        options.password = redis_params.sentinel_password
+      end
+
+      local masters, err = nil, nil
+      local conn_ok, conn = rspamd_redis.connect_sync(options)
+      if not conn_ok then
+        err = string.format('cannot connect sentinel %s: %s',
+            sentinel_addr:to_string(true), tostring(conn))
+      else
+        conn:add_cmd('SENTINEL', { 'masters' })
+        local exec_ok, masters_result = conn:exec()
+        if not exec_ok or type(masters_result) ~= 'table' then
+          err = string.format('SENTINEL masters failed on %s: %s',
+              sentinel_addr:to_string(true), tostring(masters_result))
+        else
+          masters = {}
+          for _, m in ipairs(masters_result) do
+            local master = flatten_redis_table(m)
+            if master.ip and master.ip:match(':') then
+              master.ip = '[' .. master.ip .. ']'
+            end
+            if redis_params.sentinel_masters_pattern then
+              if master.name and master.name:match(redis_params.sentinel_masters_pattern) then
+                masters[master.name] = master
+              else
+                lutil.debugm(N, rspamd_config,
+                    'skip master %s with ip %s and port %s, pattern %s',
+                    master.name, master.ip, master.port,
+                    redis_params.sentinel_masters_pattern)
+              end
+            else
+              masters[master.name] = master
+            end
+          end
+        end
+      end
+
+      if masters then
+        local all_ok = true
+        for name, master in pairs(masters) do
+          master.slaves = {}
+          local sok, sconn = rspamd_redis.connect_sync(options)
+          if not sok then
+            err = string.format('cannot connect sentinel for slaves on %s: %s',
+                sentinel_addr:to_string(true), tostring(sconn))
+            all_ok = false
+            break
+          end
+          sconn:add_cmd('SENTINEL', { 'slaves', name })
+          local s_eok, s_result = sconn:exec()
+          if not s_eok or type(s_result) ~= 'table' then
+            err = string.format('SENTINEL slaves %s failed on %s: %s',
+                name, sentinel_addr:to_string(true), tostring(s_result))
+            all_ok = false
+            break
+          end
+          for _, s in ipairs(s_result) do
+            local slave = flatten_redis_table(s)
+            if slave.ip and slave.ip:match(':') then
+              slave.ip = '[' .. slave.ip .. ']'
+            end
+            master.slaves[#master.slaves + 1] = slave
+          end
+        end
+        if all_ok then
+          addr:ok()
+          local ok, apply_err = apply_sentinel_topology(redis_params, masters)
+          if ok then
+            return true
+          end
+          return false, apply_err
+        end
+      end
+
+      addr:fail()
+      last_err = err or 'unknown sentinel failure'
+    end
+  end
+
+  return false, last_err
+end
+
+-- Synchronously upload registered Redis scripts associated with redis_params.
+-- 'script_filter' is the value of opts.scripts: true (all matching scripts),
+-- false (skip), or an array of script ids returned from add_redis_script.
+local function redis_load_scripts_sync(redis_params, attrs, script_filter)
+  if script_filter == false then
+    return true
+  end
+
+  local id_filter
+  if type(script_filter) == 'table' then
+    id_filter = {}
+    for _, id in ipairs(script_filter) do
+      id_filter[id] = true
+    end
+  end
+
+  local rspamd_redis = require "rspamd_redis"
+  local errors = {}
+  local matched = 0
+
+  for _, script in ipairs(redis_scripts) do
+    if script.redis_params == redis_params and not script.fatal_error then
+      if not id_filter or id_filter[script.id] then
+        matched = matched + 1
+        local opts = prepare_redis_call(script)
+
+        for idx, opt in ipairs(opts) do
+          if not opt.skip and script.servers_ready[idx] ~= 'done' then
+            local options = {
+              host = opt.host,
+              timeout = attrs.timeout or script.redis_params.timeout or 1.0,
+              config = attrs.config or rspamd_config,
+              ev_base = attrs.ev_base,
+              session = attrs.session,
+            }
+            if opt.username then options.username = opt.username end
+            if opt.password then options.password = opt.password end
+            if opt.dbname then options.dbname = opt.dbname end
+
+            local ok, conn = rspamd_redis.connect_sync(options)
+            if not ok then
+              opt.upstream:fail()
+              script.servers_ready[idx] = 'failed'
+              errors[#errors + 1] = string.format(
+                  'cannot connect %s for SCRIPT LOAD %s: %s',
+                  opt.upstream:get_addr():to_string(true),
+                  script_description(script), tostring(conn))
+            else
+              conn:add_cmd('SCRIPT', { 'LOAD', script.script })
+              local eok, sha = conn:exec()
+              if not eok then
+                opt.upstream:fail()
+                script.servers_ready[idx] = 'failed'
+                errors[#errors + 1] = string.format(
+                    'SCRIPT LOAD %s failed on %s: %s',
+                    script_description(script),
+                    opt.upstream:get_addr():to_string(true), tostring(sha))
+              else
+                opt.upstream:ok()
+                script.sha = sha
+                script.servers_ready[idx] = 'done'
+                logger.infox(rspamd_config,
+                    'uploaded redis script %s to %s, sha: %s',
+                    script_description(script),
+                    opt.upstream:get_addr():to_string(true), sha)
+              end
+            end
+          end
+        end
+
+        if is_any_server_ready(script) then
+          script_set_loaded(script)
+        elseif is_all_servers_failed(script) then
+          script.pending_upload = false
+          script.fatal_error = 'cannot upload script to any server'
+        end
+      end
+    end
+  end
+
+  if id_filter and matched == 0 then
+    return false, 'no scripts matched the requested ids'
+  end
+
+  if #errors > 0 then
+    -- Soft-fail: report errors but only fail if no script ended up loaded.
+    local any_loaded = false
+    for _, script in ipairs(redis_scripts) do
+      if script.redis_params == redis_params and script.loaded then
+        any_loaded = true
+        break
+      end
+    end
+    if not any_loaded and matched > 0 then
+      return false, table.concat(errors, '; ')
+    end
+  end
+
+  return true
+end
+
+local default_setup_opts = {
+  sentinels = true,
+  scripts = true,
+  timeout = nil,
+}
+
+--[[[
+-- @function lua_redis.prepare_redis_setup(redis_params[, opts], callback)
+-- One-shot synchronous initialization of a Redis connection for contexts
+-- where `rspamd_config:add_on_load` callbacks never fire (typically rspamadm
+-- tools). Performs, per `opts`, sentinel resolution and script loading,
+-- then invokes `callback(err)` once Redis is ready.
+--
+-- @param {table} redis_params parsed redis params (from `parse_redis_server`).
+-- @param {table} opts (optional) merged via `lua_util.override_defaults` over:
+--   ```
+--   {
+--     sentinels = true,    -- resolve Sentinels if redis_params.sentinels is set
+--     scripts   = true,    -- true = all scripts bound to redis_params,
+--                          -- false = skip script loading,
+--                          -- { id1, id2, ... } = only those add_redis_script ids
+--     timeout   = nil,     -- override timeout (seconds) for setup IO
+--     ev_base   = nil,     -- defaults to rspamadm_ev_base
+--     session   = nil,     -- defaults to rspamadm_session
+--     config    = nil,     -- defaults to rspamd_config
+--   }
+--   ```
+-- @param {function} callback `callback(err)` — `err` is `nil` on success or
+--   a string describing the failure.
+--]]
+exports.prepare_redis_setup = function(redis_params, opts, callback)
+  if type(opts) == 'function' and callback == nil then
+    callback = opts
+    opts = nil
+  end
+
+  if type(callback) ~= 'function' then
+    error('lua_redis.prepare_redis_setup: callback must be a function')
+  end
+
+  if not redis_params or type(redis_params) ~= 'table' then
+    return callback('redis_params is nil or not a table')
+  end
+
+  opts = lutil.override_defaults(default_setup_opts, opts or {})
+
+  local attrs = {
+    config = opts.config or rspamd_config,
+    ev_base = opts.ev_base or rawget(_G, 'rspamadm_ev_base'),
+    session = opts.session or rawget(_G, 'rspamadm_session'),
+    timeout = opts.timeout,
+  }
+
+  if not attrs.ev_base then
+    return callback('ev_base is not available; pass opts.ev_base or run from rspamadm')
+  end
+  if not attrs.session then
+    return callback('async session is not available; pass opts.session or run from rspamadm')
+  end
+
+  if opts.sentinels and redis_params.sentinels then
+    local ok, err = redis_resolve_sentinels_sync(redis_params, attrs)
+    if not ok then
+      return callback(err)
+    end
+  end
+
+  if opts.scripts ~= false then
+    local ok, err = redis_load_scripts_sync(redis_params, attrs, opts.scripts)
+    if not ok then
+      return callback(err)
+    end
+  end
+
+  return callback(nil)
+end
+
 return exports
index 1f354669778d69a9e15ccac8baf67bd201ec479d..28e735d2c514780b32293ca90c94e77ac05f9bf5 100644 (file)
@@ -757,6 +757,18 @@ local function handler(args)
     os.exit(1)
   end
 
+  -- rspamadm doesn't run worker on_load callbacks, so the async sentinel
+  -- watcher in lua_redis never fires here; resolve the master synchronously
+  -- before issuing any writes (see #6009).
+  local setup_err
+  lua_redis.prepare_redis_setup(redis_params, function(err)
+    setup_err = err
+  end)
+  if setup_err then
+    logger.errx('Cannot prepare Redis: %s', setup_err)
+    os.exit(1)
+  end
+
   -- Load exclude_rua_addresses map if --recheck-rua flag is set
   if opts.recheck_rua then
     if dmarc_settings.reporting.exclude_rua_addresses then
diff --git a/test/lua/unit/lua_redis.prepare_setup.lua b/test/lua/unit/lua_redis.prepare_setup.lua
new file mode 100644 (file)
index 0000000..355e857
--- /dev/null
@@ -0,0 +1,99 @@
+-- Unit tests for lua_redis.prepare_redis_setup
+--
+-- The actual sentinel resolution and SCRIPT LOAD paths require a running
+-- Redis Sentinel cluster, which is out of scope for unit tests. These tests
+-- exercise the public surface: argument validation, callback semantics, and
+-- the no-op success path (no sentinels configured, no scripts registered for
+-- the supplied redis_params).
+
+context("lua_redis.prepare_redis_setup", function()
+  local lua_redis = require "lua_redis"
+
+  test("errors when redis_params is missing", function()
+    local captured
+    lua_redis.prepare_redis_setup(nil, { ev_base = 'x', session = 'x' },
+        function(err) captured = err end)
+    assert_not_nil(captured)
+  end)
+
+  test("errors when redis_params is wrong type", function()
+    local captured
+    lua_redis.prepare_redis_setup('not a table', { ev_base = 'x', session = 'x' },
+        function(err) captured = err end)
+    assert_not_nil(captured)
+  end)
+
+  test("errors when ev_base is unavailable", function()
+    local captured
+    lua_redis.prepare_redis_setup({}, { session = 'x' },
+        function(err) captured = err end)
+    assert_not_nil(captured)
+  end)
+
+  test("errors when session is unavailable", function()
+    local captured
+    lua_redis.prepare_redis_setup({}, { ev_base = 'x' },
+        function(err) captured = err end)
+    assert_not_nil(captured)
+  end)
+
+  test("opts argument is optional (callback in 2nd slot)", function()
+    -- Without rspamadm globals the validation path should still fire and
+    -- return an error string via the callback rather than throwing.
+    local called = false
+    local captured
+    lua_redis.prepare_redis_setup({}, function(err)
+      called = true
+      captured = err
+    end)
+    assert_true(called)
+    -- ev_base global is absent in rspamd-test → expect error string.
+    assert_not_nil(captured)
+  end)
+
+  test("callback receives nil on no-op success path", function()
+    -- redis_params with no sentinels and no registered scripts → both setup
+    -- branches short-circuit; ev_base/session never get dereferenced so we
+    -- can pass placeholder truthy values through opts.
+    local called = false
+    local captured = 'sentinel'
+    lua_redis.prepare_redis_setup({}, {
+      ev_base = 'fake_ev_base',
+      session = 'fake_session',
+    }, function(err)
+      called = true
+      captured = err
+    end)
+    assert_true(called)
+    assert_nil(captured)
+  end)
+
+  test("opts.scripts = false skips script loading", function()
+    -- Same no-op shape but explicitly disable scripts; should still succeed.
+    local captured = 'sentinel'
+    lua_redis.prepare_redis_setup({}, {
+      ev_base = 'fake_ev_base',
+      session = 'fake_session',
+      scripts = false,
+    }, function(err) captured = err end)
+    assert_nil(captured)
+  end)
+
+  test("opts.sentinels = false skips sentinel resolution", function()
+    -- Even if redis_params.sentinels were truthy, opts.sentinels=false should
+    -- bypass that branch. Use a marker that would otherwise crash the sync
+    -- resolver (no all_upstreams() method on a string) to prove the bypass.
+    local captured = 'sentinel'
+    lua_redis.prepare_redis_setup({ sentinels = 'definitely-not-an-upstream-list' }, {
+      ev_base = 'fake_ev_base',
+      session = 'fake_session',
+      sentinels = false,
+    }, function(err) captured = err end)
+    assert_nil(captured)
+  end)
+
+  test("non-function callback raises", function()
+    local ok = pcall(lua_redis.prepare_redis_setup, {}, {}, 'not a function')
+    assert_false(ok)
+  end)
+end)