exports.enrich_schema = enrich_schema
-local function redis_query_sentinel(ev_base, params, initialised)
- local function flatten_redis_table(tbl)
- local res = {}
- for i = 1, #tbl, 2 do
- res[tbl[i]] = tbl[i + 1]
- end
-
- return res
+local function flatten_redis_table(tbl)
+ local res = {}
+ for i = 1, #tbl, 2 do
+ res[tbl[i]] = tbl[i + 1]
end
+
+ return res
+end
+
+local function redis_query_sentinel(ev_base, params, initialised)
-- Coroutines syntax
local rspamd_redis = require "rspamd_redis"
local sentinels = params.sentinels
end
end
+-- Build new read/write upstream lists from a flattened sentinel topology
+-- (table of {[master_name] = { ip, port, slaves = {...} }}) and replace
+-- params.read_servers / params.write_servers in-place. Used by the
+-- synchronous rspamadm path; the async sentinel watcher in worker context
+-- has its own variant that also installs failure-monitor watchers.
+local function apply_sentinel_topology(params, masters)
+ local read_servers_tbl, write_servers_tbl = {}, {}
+
+ for _, master in pairs(masters) do
+ write_servers_tbl[#write_servers_tbl + 1] = string.format(
+ '%s:%s', master.ip, master.port)
+ read_servers_tbl[#read_servers_tbl + 1] = string.format(
+ '%s:%s', master.ip, master.port)
+
+ for _, slave in ipairs(master.slaves or E) do
+ if slave['master-link-status'] == 'ok' then
+ read_servers_tbl[#read_servers_tbl + 1] = string.format(
+ '%s:%s', slave.ip, slave.port)
+ end
+ end
+ end
+
+ if #write_servers_tbl == 0 then
+ return false, 'no masters discovered from sentinel'
+ end
+
+ table.sort(read_servers_tbl)
+ table.sort(write_servers_tbl)
+
+ local read_servers_str = table.concat(read_servers_tbl, ',')
+ local write_servers_str = table.concat(write_servers_tbl, ',')
+
+ local upstream_list = require "rspamd_upstream_list"
+
+ if read_servers_str ~= params.read_servers_str then
+ local read_upstreams = upstream_list.create(rspamd_config,
+ read_servers_str, 6379)
+ if read_upstreams then
+ logger.infox(rspamd_config,
+ 'sentinel: replace read servers with new list: %s', read_servers_str)
+ params.read_servers = read_upstreams
+ params.read_servers_str = read_servers_str
+ else
+ return false, string.format('cannot parse read servers list: %s', read_servers_str)
+ end
+ end
+
+ if write_servers_str ~= params.write_servers_str then
+ local write_upstreams = upstream_list.create(rspamd_config,
+ write_servers_str, 6379)
+ if write_upstreams then
+ logger.infox(rspamd_config,
+ 'sentinel: replace write servers with new list: %s', write_servers_str)
+ params.write_servers = write_upstreams
+ params.write_servers_str = write_servers_str
+ else
+ return false, string.format('cannot parse write servers list: %s', write_servers_str)
+ end
+ end
+
+ return true
+end
+
+-- Synchronously query Redis Sentinel via rspamd_redis.connect_sync to populate
+-- masters and slaves, then rewrite redis_params via apply_sentinel_topology.
+-- Walks the configured sentinel upstreams in round-robin order until one
+-- answers; only fails after exhausting them all.
+local function redis_resolve_sentinels_sync(redis_params, attrs)
+ if not redis_params.sentinels then
+ return true
+ end
+
+ local rspamd_redis = require "rspamd_redis"
+ local sentinels = redis_params.sentinels
+ local last_err = 'no sentinel upstreams to query'
+
+ -- Try each sentinel until one succeeds; same upstream may be picked twice
+ -- in a row depending on its weight, so we just bound by upstream count.
+ local nsentinels = #(sentinels:all_upstreams() or E)
+ if nsentinels == 0 then
+ return false, 'sentinel upstream list is empty'
+ end
+
+ for _ = 1, nsentinels do
+ local addr = sentinels:get_upstream_round_robin()
+ if not addr then
+ return false, last_err
+ end
+
+ local sentinel_addr = addr:get_addr()
+ if not sentinel_addr then
+ addr:fail()
+ last_err = 'sentinel address not resolved'
+ else
+ local options = {
+ host = sentinel_addr,
+ timeout = attrs.timeout or redis_params.timeout or 1.0,
+ config = attrs.config or rspamd_config,
+ ev_base = attrs.ev_base,
+ session = attrs.session,
+ no_pool = true,
+ }
+ if redis_params.sentinel_username then
+ options.username = redis_params.sentinel_username
+ end
+ if redis_params.sentinel_password then
+ options.password = redis_params.sentinel_password
+ end
+
+ local masters, err = nil, nil
+ local conn_ok, conn = rspamd_redis.connect_sync(options)
+ if not conn_ok then
+ err = string.format('cannot connect sentinel %s: %s',
+ sentinel_addr:to_string(true), tostring(conn))
+ else
+ conn:add_cmd('SENTINEL', { 'masters' })
+ local exec_ok, masters_result = conn:exec()
+ if not exec_ok or type(masters_result) ~= 'table' then
+ err = string.format('SENTINEL masters failed on %s: %s',
+ sentinel_addr:to_string(true), tostring(masters_result))
+ else
+ masters = {}
+ for _, m in ipairs(masters_result) do
+ local master = flatten_redis_table(m)
+ if master.ip and master.ip:match(':') then
+ master.ip = '[' .. master.ip .. ']'
+ end
+ if redis_params.sentinel_masters_pattern then
+ if master.name and master.name:match(redis_params.sentinel_masters_pattern) then
+ masters[master.name] = master
+ else
+ lutil.debugm(N, rspamd_config,
+ 'skip master %s with ip %s and port %s, pattern %s',
+ master.name, master.ip, master.port,
+ redis_params.sentinel_masters_pattern)
+ end
+ else
+ masters[master.name] = master
+ end
+ end
+ end
+ end
+
+ if masters then
+ local all_ok = true
+ for name, master in pairs(masters) do
+ master.slaves = {}
+ local sok, sconn = rspamd_redis.connect_sync(options)
+ if not sok then
+ err = string.format('cannot connect sentinel for slaves on %s: %s',
+ sentinel_addr:to_string(true), tostring(sconn))
+ all_ok = false
+ break
+ end
+ sconn:add_cmd('SENTINEL', { 'slaves', name })
+ local s_eok, s_result = sconn:exec()
+ if not s_eok or type(s_result) ~= 'table' then
+ err = string.format('SENTINEL slaves %s failed on %s: %s',
+ name, sentinel_addr:to_string(true), tostring(s_result))
+ all_ok = false
+ break
+ end
+ for _, s in ipairs(s_result) do
+ local slave = flatten_redis_table(s)
+ if slave.ip and slave.ip:match(':') then
+ slave.ip = '[' .. slave.ip .. ']'
+ end
+ master.slaves[#master.slaves + 1] = slave
+ end
+ end
+ if all_ok then
+ addr:ok()
+ local ok, apply_err = apply_sentinel_topology(redis_params, masters)
+ if ok then
+ return true
+ end
+ return false, apply_err
+ end
+ end
+
+ addr:fail()
+ last_err = err or 'unknown sentinel failure'
+ end
+ end
+
+ return false, last_err
+end
+
+-- Synchronously upload registered Redis scripts associated with redis_params.
+-- 'script_filter' is the value of opts.scripts: true (all matching scripts),
+-- false (skip), or an array of script ids returned from add_redis_script.
+local function redis_load_scripts_sync(redis_params, attrs, script_filter)
+ if script_filter == false then
+ return true
+ end
+
+ local id_filter
+ if type(script_filter) == 'table' then
+ id_filter = {}
+ for _, id in ipairs(script_filter) do
+ id_filter[id] = true
+ end
+ end
+
+ local rspamd_redis = require "rspamd_redis"
+ local errors = {}
+ local matched = 0
+
+ for _, script in ipairs(redis_scripts) do
+ if script.redis_params == redis_params and not script.fatal_error then
+ if not id_filter or id_filter[script.id] then
+ matched = matched + 1
+ local opts = prepare_redis_call(script)
+
+ for idx, opt in ipairs(opts) do
+ if not opt.skip and script.servers_ready[idx] ~= 'done' then
+ local options = {
+ host = opt.host,
+ timeout = attrs.timeout or script.redis_params.timeout or 1.0,
+ config = attrs.config or rspamd_config,
+ ev_base = attrs.ev_base,
+ session = attrs.session,
+ }
+ if opt.username then options.username = opt.username end
+ if opt.password then options.password = opt.password end
+ if opt.dbname then options.dbname = opt.dbname end
+
+ local ok, conn = rspamd_redis.connect_sync(options)
+ if not ok then
+ opt.upstream:fail()
+ script.servers_ready[idx] = 'failed'
+ errors[#errors + 1] = string.format(
+ 'cannot connect %s for SCRIPT LOAD %s: %s',
+ opt.upstream:get_addr():to_string(true),
+ script_description(script), tostring(conn))
+ else
+ conn:add_cmd('SCRIPT', { 'LOAD', script.script })
+ local eok, sha = conn:exec()
+ if not eok then
+ opt.upstream:fail()
+ script.servers_ready[idx] = 'failed'
+ errors[#errors + 1] = string.format(
+ 'SCRIPT LOAD %s failed on %s: %s',
+ script_description(script),
+ opt.upstream:get_addr():to_string(true), tostring(sha))
+ else
+ opt.upstream:ok()
+ script.sha = sha
+ script.servers_ready[idx] = 'done'
+ logger.infox(rspamd_config,
+ 'uploaded redis script %s to %s, sha: %s',
+ script_description(script),
+ opt.upstream:get_addr():to_string(true), sha)
+ end
+ end
+ end
+ end
+
+ if is_any_server_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.pending_upload = false
+ script.fatal_error = 'cannot upload script to any server'
+ end
+ end
+ end
+ end
+
+ if id_filter and matched == 0 then
+ return false, 'no scripts matched the requested ids'
+ end
+
+ if #errors > 0 then
+ -- Soft-fail: report errors but only fail if no script ended up loaded.
+ local any_loaded = false
+ for _, script in ipairs(redis_scripts) do
+ if script.redis_params == redis_params and script.loaded then
+ any_loaded = true
+ break
+ end
+ end
+ if not any_loaded and matched > 0 then
+ return false, table.concat(errors, '; ')
+ end
+ end
+
+ return true
+end
+
+local default_setup_opts = {
+ sentinels = true,
+ scripts = true,
+ timeout = nil,
+}
+
+--[[[
+-- @function lua_redis.prepare_redis_setup(redis_params[, opts], callback)
+-- One-shot synchronous initialization of a Redis connection for contexts
+-- where `rspamd_config:add_on_load` callbacks never fire (typically rspamadm
+-- tools). Performs, per `opts`, sentinel resolution and script loading,
+-- then invokes `callback(err)` once Redis is ready.
+--
+-- @param {table} redis_params parsed redis params (from `parse_redis_server`).
+-- @param {table} opts (optional) merged via `lua_util.override_defaults` over:
+-- ```
+-- {
+-- sentinels = true, -- resolve Sentinels if redis_params.sentinels is set
+-- scripts = true, -- true = all scripts bound to redis_params,
+-- -- false = skip script loading,
+-- -- { id1, id2, ... } = only those add_redis_script ids
+-- timeout = nil, -- override timeout (seconds) for setup IO
+-- ev_base = nil, -- defaults to rspamadm_ev_base
+-- session = nil, -- defaults to rspamadm_session
+-- config = nil, -- defaults to rspamd_config
+-- }
+-- ```
+-- @param {function} callback `callback(err)` — `err` is `nil` on success or
+-- a string describing the failure.
+--]]
+exports.prepare_redis_setup = function(redis_params, opts, callback)
+ if type(opts) == 'function' and callback == nil then
+ callback = opts
+ opts = nil
+ end
+
+ if type(callback) ~= 'function' then
+ error('lua_redis.prepare_redis_setup: callback must be a function')
+ end
+
+ if not redis_params or type(redis_params) ~= 'table' then
+ return callback('redis_params is nil or not a table')
+ end
+
+ opts = lutil.override_defaults(default_setup_opts, opts or {})
+
+ local attrs = {
+ config = opts.config or rspamd_config,
+ ev_base = opts.ev_base or rawget(_G, 'rspamadm_ev_base'),
+ session = opts.session or rawget(_G, 'rspamadm_session'),
+ timeout = opts.timeout,
+ }
+
+ if not attrs.ev_base then
+ return callback('ev_base is not available; pass opts.ev_base or run from rspamadm')
+ end
+ if not attrs.session then
+ return callback('async session is not available; pass opts.session or run from rspamadm')
+ end
+
+ if opts.sentinels and redis_params.sentinels then
+ local ok, err = redis_resolve_sentinels_sync(redis_params, attrs)
+ if not ok then
+ return callback(err)
+ end
+ end
+
+ if opts.scripts ~= false then
+ local ok, err = redis_load_scripts_sync(redis_params, attrs, opts.scripts)
+ if not ok then
+ return callback(err)
+ end
+ end
+
+ return callback(nil)
+end
+
return exports
--- /dev/null
+-- Unit tests for lua_redis.prepare_redis_setup
+--
+-- The actual sentinel resolution and SCRIPT LOAD paths require a running
+-- Redis Sentinel cluster, which is out of scope for unit tests. These tests
+-- exercise the public surface: argument validation, callback semantics, and
+-- the no-op success path (no sentinels configured, no scripts registered for
+-- the supplied redis_params).
+
+context("lua_redis.prepare_redis_setup", function()
+ local lua_redis = require "lua_redis"
+
+ test("errors when redis_params is missing", function()
+ local captured
+ lua_redis.prepare_redis_setup(nil, { ev_base = 'x', session = 'x' },
+ function(err) captured = err end)
+ assert_not_nil(captured)
+ end)
+
+ test("errors when redis_params is wrong type", function()
+ local captured
+ lua_redis.prepare_redis_setup('not a table', { ev_base = 'x', session = 'x' },
+ function(err) captured = err end)
+ assert_not_nil(captured)
+ end)
+
+ test("errors when ev_base is unavailable", function()
+ local captured
+ lua_redis.prepare_redis_setup({}, { session = 'x' },
+ function(err) captured = err end)
+ assert_not_nil(captured)
+ end)
+
+ test("errors when session is unavailable", function()
+ local captured
+ lua_redis.prepare_redis_setup({}, { ev_base = 'x' },
+ function(err) captured = err end)
+ assert_not_nil(captured)
+ end)
+
+ test("opts argument is optional (callback in 2nd slot)", function()
+ -- Without rspamadm globals the validation path should still fire and
+ -- return an error string via the callback rather than throwing.
+ local called = false
+ local captured
+ lua_redis.prepare_redis_setup({}, function(err)
+ called = true
+ captured = err
+ end)
+ assert_true(called)
+ -- ev_base global is absent in rspamd-test → expect error string.
+ assert_not_nil(captured)
+ end)
+
+ test("callback receives nil on no-op success path", function()
+ -- redis_params with no sentinels and no registered scripts → both setup
+ -- branches short-circuit; ev_base/session never get dereferenced so we
+ -- can pass placeholder truthy values through opts.
+ local called = false
+ local captured = 'sentinel'
+ lua_redis.prepare_redis_setup({}, {
+ ev_base = 'fake_ev_base',
+ session = 'fake_session',
+ }, function(err)
+ called = true
+ captured = err
+ end)
+ assert_true(called)
+ assert_nil(captured)
+ end)
+
+ test("opts.scripts = false skips script loading", function()
+ -- Same no-op shape but explicitly disable scripts; should still succeed.
+ local captured = 'sentinel'
+ lua_redis.prepare_redis_setup({}, {
+ ev_base = 'fake_ev_base',
+ session = 'fake_session',
+ scripts = false,
+ }, function(err) captured = err end)
+ assert_nil(captured)
+ end)
+
+ test("opts.sentinels = false skips sentinel resolution", function()
+ -- Even if redis_params.sentinels were truthy, opts.sentinels=false should
+ -- bypass that branch. Use a marker that would otherwise crash the sync
+ -- resolver (no all_upstreams() method on a string) to prove the bypass.
+ local captured = 'sentinel'
+ lua_redis.prepare_redis_setup({ sentinels = 'definitely-not-an-upstream-list' }, {
+ ev_base = 'fake_ev_base',
+ session = 'fake_session',
+ sentinels = false,
+ }, function(err) captured = err end)
+ assert_nil(captured)
+ end)
+
+ test("non-function callback raises", function()
+ local ok = pcall(lua_redis.prepare_redis_setup, {}, {}, 'not a function')
+ assert_false(ok)
+ end)
+end)