sentinel_watch_time = (ts.number + ts.string / lutil.parse_time_interval):is_optional(),
sentinel_masters_pattern = ts.string:is_optional(),
sentinel_master_maxerrors = (ts.number + ts.string / tonumber):is_optional(),
+ sentinel_password = ts.string:is_optional(),
}
-local config_schema =
- -- Allow separate read/write servers to allow usage in the `extra_fields`
- ts.shape({
- read_servers = ts.string + ts.array_of(ts.string),
- }, {extra_fields = common_schema}) +
- ts.shape({
- write_servers = ts.string + ts.array_of(ts.string),
- }, {extra_fields = common_schema}) +
- ts.shape({
- read_servers = ts.string + ts.array_of(ts.string),
- write_servers = ts.string + ts.array_of(ts.string),
- }, {extra_fields = common_schema}) +
- ts.shape({
- servers = ts.string + ts.array_of(ts.string),
- }, {extra_fields = common_schema}) +
- ts.shape({
- server = ts.string + ts.array_of(ts.string),
- }, {extra_fields = common_schema})
+local config_schema = -- Allow separate read/write servers to allow usage in the `extra_fields`
+ts.shape({
+ read_servers = ts.string + ts.array_of(ts.string),
+}, { extra_fields = common_schema }) +
+ ts.shape({
+ write_servers = ts.string + ts.array_of(ts.string),
+ }, { extra_fields = common_schema }) +
+ ts.shape({
+ read_servers = ts.string + ts.array_of(ts.string),
+ write_servers = ts.string + ts.array_of(ts.string),
+ }, { extra_fields = common_schema }) +
+ ts.shape({
+ servers = ts.string + ts.array_of(ts.string),
+ }, { extra_fields = common_schema }) +
+ ts.shape({
+ server = ts.string + ts.array_of(ts.string),
+ }, { extra_fields = common_schema })
exports.config_schema = config_schema
-
local function redis_query_sentinel(ev_base, params, initialised)
local function flatten_redis_table(tbl)
local res = {}
- for i=1,#tbl,2 do
+ for i = 1, #tbl, 2 do
res[tbl[i]] = tbl[i + 1]
end
local pending_subrequests = 0
- for _,m in ipairs(result) do
+ for _, m in ipairs(result) do
local master = flatten_redis_table(m)
-- Wrap IPv6-addresses in brackets
if (master.ip:match(":")) then
- master.ip = "["..master.ip.."]"
+ master.ip = "[" .. master.ip .. "]"
end
if params.sentinel_masters_pattern then
end
-- For each master we need to get a list of slaves
- for k,v in pairs(masters) do
+ for k, v in pairs(masters) do
v.slaves = {}
local function slaves_cb(slave_err, slave_result)
if not slave_err and type(slave_result) == 'table' then
- for _,s in ipairs(slave_result) do
+ for _, s in ipairs(slave_result) do
local slave = flatten_redis_table(s)
lutil.debugm(N, rspamd_config,
'found slave for master %s with ip %s and port %s',
v.name, slave.ip, slave.port)
-- Wrap IPv6-addresses in brackets
if (slave.ip:match(":")) then
- slave.ip = "["..slave.ip.."]"
+ slave.ip = "[" .. slave.ip .. "]"
end
v.slaves[#v.slaves + 1] = slave
end
end
end
- local ret = rspamd_redis.make_request({
+ local ret = rspamd_redis.make_request {
host = addr:get_addr(),
timeout = params.timeout,
+ password = params.sentinel_password,
config = rspamd_config,
ev_base = ev_base,
cmd = 'SENTINEL',
- args = {'slaves', k},
+ args = { 'slaves', k },
no_pool = true,
callback = slaves_cb
- })
+ }
if not ret then
logger.errx(rspamd_config, 'cannot connect sentinel when query slaves at address: %s',
end
end
- local ret = rspamd_redis.make_request({
+ local ret = rspamd_redis.make_request {
host = addr:get_addr(),
timeout = params.timeout,
config = rspamd_config,
ev_base = ev_base,
+ password = params.sentinel_password,
cmd = 'SENTINEL',
- args = {'masters'},
+ args = { 'masters' },
no_pool = true,
callback = masters_cb,
- })
+ }
if not ret then
logger.errx(rspamd_config, 'cannot connect sentinel at address: %s',
-- We now form new strings for masters and slaves
local read_servers_tbl, write_servers_tbl = {}, {}
- for _,master in pairs(masters) do
+ for _, master in pairs(masters) do
write_servers_tbl[#write_servers_tbl + 1] = string.format(
'%s:%s', master.ip, master.port
)
'%s:%s', master.ip, master.port
)
- for _,slave in ipairs(master.slaves) do
+ for _, slave in ipairs(master.slaves) do
if slave['master-link-status'] == 'ok' then
read_servers_tbl[#read_servers_tbl + 1] = string.format(
'%s:%s', slave.ip, slave.port
h:update(k)
h:update(tostring(v))
elseif type(v) == 'table' then
- for kk,vv in pairs(v) do
+ for kk, vv in pairs(v) do
rec_hash(kk, vv)
end
end
if options['read_servers'] then
if rspamd_config then
upstreams_read = upstream_list.create(rspamd_config,
- options['read_servers'], default_port)
+ options['read_servers'], default_port)
else
upstreams_read = upstream_list.create(options['read_servers'],
- default_port)
+ default_port)
end
result.read_servers_str = options['read_servers']
elseif options['servers'] then
if rspamd_config then
upstreams_read = upstream_list.create(rspamd_config,
- options['servers'], default_port)
+ options['servers'], default_port)
else
upstreams_read = upstream_list.create(options['servers'], default_port)
end
elseif options['server'] then
if rspamd_config then
upstreams_read = upstream_list.create(rspamd_config,
- options['server'], default_port)
+ options['server'], default_port)
else
upstreams_read = upstream_list.create(options['server'], default_port)
end
if options['write_servers'] then
if rspamd_config then
upstreams_write = upstream_list.create(rspamd_config,
- options['write_servers'], default_port)
+ options['write_servers'], default_port)
else
upstreams_write = upstream_list.create(options['write_servers'],
- default_port)
+ default_port)
end
result.write_servers_str = options['write_servers']
read_only = false
-- Exclude disabled
if opts['disabled_modules'] then
- for _,v in ipairs(opts['disabled_modules']) do
+ for _, v in ipairs(opts['disabled_modules']) do
if v == module_name then
logger.infox(rspamd_config, "NOT using default redis server for module %s: it is disabled",
- module_name)
+ module_name)
- return nil
+ return nil
end
end
end
end
if result.read_servers then
- return maybe_return_cached(result)
+ return maybe_return_cached(result)
end
return nil
end,
blpop = function(args)
local idx_l = {}
- for i = 1, #args -1 do
+ for i = 1, #args - 1 do
table.insert(idx_l, i)
end
return idx_l
return idx_l
end,
set = function(args)
- return {1}
+ return { 1 }
end,
mget = function(args)
local idx_l = {}
return idx_l
end,
smove = function(args)
- return {1, 2}
+ return { 1, 2 }
end,
- script = function() end
+ script = function()
+ end
}
process_cmd.append = process_cmd.set
process_cmd.auth = process_cmd.script
end,
principal_recipient_domain = function(task)
local p = task:get_principal_recipient()
- if not p then return end
+ if not p then
+ return
+ end
return string.match(p, '.*@(.*)')
end,
ip = function(task)
local i = task:get_ip()
- if i and i:is_valid() then return i:to_string() end
+ if i and i:is_valid() then
+ return i:to_string()
+ end
end,
from = function(task)
return ((task:get_from('smtp') or E)[1] or E)['addr']
end,
from_domain_or_helo_domain = function(task)
local d = ((task:get_from('smtp') or E)[1] or E)['domain']
- if d and #d > 0 then return d end
+ if d and #d > 0 then
+ return d
+ end
return task:get_helo()
end,
mime_from = function(task)
local function gen_get_esld(f)
return function(task)
local d = f(task)
- if not d then return end
+ if not d then
+ return
+ end
return rspamd_util.get_tld(d)
end
end
-- args - table of arguments
-- extra_opts - table of optional request arguments
local function rspamd_redis_make_request(task, redis_params, key, is_write,
- callback, command, args, extra_opts)
+ callback, command, args, extra_opts)
local addr
local function rspamd_redis_make_request_cb(err, data)
if err then
end
end
if not task or not redis_params or not callback or not command then
- return false,nil,nil
+ return false, nil, nil
end
local rspamd_redis = require "rspamd_redis"
}
if extra_opts then
- for k,v in pairs(extra_opts) do
+ for k, v in pairs(extra_opts) do
options[k] = v
end
end
' (host=%s, timeout=%s): cmd: %s', ip_addr,
options.timeout, options.cmd)
- local ret,conn = rspamd_redis.make_request(options)
+ local ret, conn = rspamd_redis.make_request(options)
if not ret then
addr:fail()
logger.warnx(task, "cannot make redis request to: %s", tostring(ip_addr))
end
- return ret,conn,addr
+ return ret, conn, addr
end
--[[[
exports.redis_make_request = rspamd_redis_make_request
local function redis_make_request_taskless(ev_base, cfg, redis_params, key,
- is_write, callback, command, args, extra_opts)
+ is_write, callback, command, args, extra_opts)
if not ev_base or not redis_params or not callback or not command then
- return false,nil,nil
+ return false, nil, nil
end
local addr
args = args
}
if extra_opts then
- for k,v in pairs(extra_opts) do
+ for k, v in pairs(extra_opts) do
options[k] = v
end
end
-
if redis_params['password'] then
options['password'] = redis_params['password']
end
lutil.debugm(N, cfg, 'perform taskless request to redis server' ..
' (host=%s, timeout=%s): cmd: %s', options.host:tostring(true),
options.timeout, options.cmd)
- local ret,conn = rspamd_redis.make_request(options)
+ local ret, conn = rspamd_redis.make_request(options)
if not ret then
logger.errx('cannot execute redis request')
addr:fail()
end
- return ret,conn,addr
+ return ret, conn, addr
end
--[[[
end
local wait_table = {}
- for _,s in ipairs(script.waitq) do
+ for _, s in ipairs(script.waitq) do
table.insert(wait_table, s)
end
script.waitq = {}
- for _,s in ipairs(wait_table) do
+ for _, s in ipairs(wait_table) do
s(script.loaded)
end
end
local function prepare_redis_call(script)
local function merge_tables(t1, t2)
- for k,v in pairs(t2) do t1[k] = v end
+ for k, v in pairs(t2) do
+ t1[k] = v
+ end
end
local servers = {}
-- Call load script on each server, set loaded flag
script.in_flight = #servers
- for _,s in ipairs(servers) do
+ for _, s in ipairs(servers) do
local cur_opts = {
host = s:get_addr(),
timeout = script.redis_params['timeout'],
cmd = 'SCRIPT',
- args = {'LOAD', script.script },
+ args = { 'LOAD', script.script },
upstream = s
}
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _,opt in ipairs(opts) do
+ for _, opt in ipairs(opts) do
opt.task = task
opt.is_write = is_write
opt.callback = function(err, data)
else
opt.upstream:ok()
logger.infox(task,
- "uploaded redis script to %s with id %s, sha: %s",
+ "uploaded redis script to %s with id %s, sha: %s",
opt.upstream:get_addr():to_string(true),
script.id, data)
script.sha = data -- We assume that sha is the same on all servers
if not ret then
logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
+ opt.upstream:get_addr())
script.in_flight = script.in_flight - 1
opt.upstream:fail()
end
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _,opt in ipairs(opts) do
+ for _, opt in ipairs(opts) do
opt.config = cfg
opt.ev_base = ev_base
opt.is_write = is_write
else
opt.upstream:ok()
logger.infox(cfg,
- "uploaded redis script to %s with id %s, sha: %s",
+ "uploaded redis script to %s with id %s, sha: %s",
opt.upstream:get_addr():to_string(true), script.id, data)
script.sha = data -- We assume that sha is the same on all servers
script.fatal_error = nil
if not ret then
logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
+ opt.upstream:get_addr())
script.in_flight = script.in_flight - 1
opt.upstream:fail()
end
end
local function add_redis_script(script, redis_params, caller_level)
- if not caller_level then caller_level = 2 end
+ if not caller_level then
+ caller_level = 2
+ end
local caller = debug.getinfo(caller_level)
local new_script = {
local lua_util = require "lua_util"
local rspamd_logger = require "rspamd_logger"
- if not dir then dir = rspamd_paths.LUALIBDIR end
- if filename:sub(1, 1) ~= package.config:sub(1,1) then
+ if not dir then
+ dir = rspamd_paths.LUALIBDIR
+ end
+ if filename:sub(1, 1) ~= package.config:sub(1, 1) then
-- Relative path
filename = lua_util.join_path(dir, "redis_scripts", filename)
end
local redis_args = {}
if not redis_scripts[id] then
- logger.errx("cannot find registered script with id %s", id)
+ logger.errx("cannot find registered script with id %s", id)
return false
end
-
local script = redis_scripts[id]
if script.fatal_error then
if #redis_args == 0 then
table.insert(redis_args, script.sha)
table.insert(redis_args, tostring(#keys))
- for _,k in ipairs(keys) do
+ for _, k in ipairs(keys) do
table.insert(redis_args, k)
end
if params.task then
if not rspamd_redis_make_request(params.task, script.redis_params,
- params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
+ params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
callback('Cannot make redis request', nil)
end
else
if not redis_make_request_taskless(params.ev_base, rspamd_config,
- script.redis_params,
- params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
+ script.redis_params,
+ params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
callback('Cannot make redis request', nil)
end
end
local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base)
if not redis_params then
- return false,nil
+ return false, nil
end
local rspamd_redis = require "rspamd_redis"
session = redis_params.session or rspamadm_session
}
- for k,v in pairs(redis_params) do
+ for k, v in pairs(redis_params) do
options[k] = v
end
if not options.config then
logger.errx('config is not set')
- return false,nil,addr
+ return false, nil, addr
end
if not options.ev_base then
logger.errx('ev_base is not set')
- return false,nil,addr
+ return false, nil, addr
end
if not options.session then
logger.errx('session is not set')
- return false,nil,addr
+ return false, nil, addr
end
- local ret,conn = rspamd_redis.connect_sync(options)
+ local ret, conn = rspamd_redis.connect_sync(options)
if not ret then
logger.errx('cannot create redis connection: %s', conn)
addr:fail()
- return false,nil,addr
+ return false, nil, addr
end
if conn then
local need_exec = false
if redis_params['password'] then
- conn:add_cmd('AUTH', {redis_params['password']})
+ conn:add_cmd('AUTH', { redis_params['password'] })
need_exec = true
end
if redis_params['db'] then
- conn:add_cmd('SELECT', {tostring(redis_params['db'])})
+ conn:add_cmd('SELECT', { tostring(redis_params['db']) })
need_exec = true
elseif redis_params['dbname'] then
- conn:add_cmd('SELECT', {tostring(redis_params['dbname'])})
+ conn:add_cmd('SELECT', { tostring(redis_params['dbname']) })
need_exec = true
end
logger.errx('cannot prepare redis connection (authentication or db selection failure): %s',
res)
addr:fail()
- return false,nil,addr
+ return false, nil, addr
end
end
end
- return ret,conn,addr
+ return ret, conn, addr
end
exports.redis_connect_sync = redis_connect_sync
if not attrs or not redis_params or not req then
logger.errx('invalid arguments for redis request')
- return false,nil,nil
+ return false, nil, nil
end
if not (attrs.task or (attrs.config and attrs.ev_base)) then
logger.errx('invalid attributes for redis request')
- return false,nil,nil
+ return false, nil, nil
end
local opts = lua_util.shallowcopy(attrs)
opts.timeout, opts.cmd, opts.args)
if opts.callback then
- local ret,conn = rspamd_redis.make_request(opts)
+ local ret, conn = rspamd_redis.make_request(opts)
if not ret then
logger.errx(log_obj, 'cannot execute redis request')
addr:fail()
end
- return ret,conn,addr
+ return ret, conn, addr
else
-- Coroutines version
- local ret,conn = rspamd_redis.connect_sync(opts)
+ local ret, conn = rspamd_redis.connect_sync(opts)
if not ret then
logger.errx(log_obj, 'cannot execute redis request')
addr:fail()
conn:add_cmd(opts.cmd, opts.args)
return conn:exec()
end
- return false,nil,addr
+ return false, nil, addr
end
end
if not attrs or not redis_params then
logger.errx('invalid arguments for redis connect')
- return false,nil,nil
+ return false, nil, nil
end
if not (attrs.task or (attrs.config and attrs.ev_base)) then
logger.errx('invalid attributes for redis connect')
- return false,nil,nil
+ return false, nil, nil
end
local opts = lua_util.shallowcopy(attrs)
end
if opts.callback then
- local ret,conn = rspamd_redis.connect(opts)
+ local ret, conn = rspamd_redis.connect(opts)
if not ret then
logger.errx(log_obj, 'cannot execute redis connect')
addr:fail()
end
- return ret,conn,addr
+ return ret, conn, addr
else
-- Coroutines version
- local ret,conn = rspamd_redis.connect_sync(opts)
+ local ret, conn = rspamd_redis.connect_sync(opts)
if not ret then
logger.errx(log_obj, 'cannot execute redis connect')
addr:fail()
else
- return true,conn,addr
+ return true, conn, addr
end
- return false,nil,addr
+ return false, nil, addr
end
end
}
if optional and type(optional) == 'table' then
- for k,v in pairs(optional) do
+ for k, v in pairs(optional) do
pr[k] = v
end
end
else
local fun = require "fun"
- return fun.totable(fun.filter(function(_, data) return data.module == mname end,
+ return fun.totable(fun.filter(function(_, data)
+ return data.module == mname
+ end,
redis_prefixes))
end
end