-- command - redis command
-- args - table of arguments
function rspamd_redis_make_request(task, redis_params, key, is_write, callback, command, args)
+ local addr
+ local function rspamd_redis_make_request_cb(err, data)
+ if err then
+ addr:fail()
+ else
+ addr:ok()
+ end
+ callback(err, data, addr)
+ end
if not task or not redis_params or not callback or not command then
return false,nil,nil
end
- local addr
local rspamd_redis = require "rspamd_redis"
if key then
local options = {
task = task,
- callback = callback,
+ callback = rspamd_redis_make_request_cb,
host = addr:get_addr(),
timeout = redis_params['timeout'],
cmd = command,
end
else
rspamd_logger.errx(task, 'failed to scan: %s', err)
- upstream:fail()
end
else
- upstream:ok()
data = tostring(data)
local found = (string.sub(data, 1, 1) == '1')
end
else
rspamd_logger.errx(task, 'failed to scan: %s', err)
- upstream:fail()
end
else
- upstream:ok()
data = tostring(data)
local s = string.find(data, ' FOUND')
end
else
rspamd_logger.errx(task, 'failed to scan: %s', err)
- upstream:fail()
end
else
- upstream:ok()
data = tostring(data)
local vname = string.match(data, 'VIRUS (%S+) ')
local redis_key = options.key_prefix .. ip:to_string()
local ret,conn,upstream
local function redis_asn_set_cb(redis_err)
- if not redis_err then
- upstream:ok()
- else
+ if redis_err then
rspamd_logger.errx(task, 'got error %s when setting asn record on server %s',
redis_err, upstream:get_addr())
- upstream:fail()
end
end
ret,conn,upstream = rspamd_redis_make_request(task,
local function redis_fann_check_cb(err, data)
if err then
rspamd_logger.errx(task, 'redis error on host %s: %s', upstream:get_addr(), err)
- upstream:fail()
- else
- upstream:ok()
end
if not err and type(data) == 'string' then
local version = tonumber(data)
end
local function save_fann(task, is_spam)
- local ret, conn, upstream
local function redis_fann_save_cb(err)
if err then
rspamd_logger.errx(task, "cannot save neural net to redis: %s", err)
- upstream:fail()
- else
- upstream:ok()
end
end
else
current_classify_ann.ham_learned = current_classify_ann.ham_learned + 1
end
- ret,conn,upstream = rspamd_redis_make_request(task,
+ local ret,conn = rspamd_redis_make_request(task,
redis_params, -- connect params
key, -- hash key
true, -- is write
local body_key = data_key(task)
local meta_key = envelope_key(task)
local hash_key = body_key .. meta_key
- local upstream
local function redis_get_cb(err, data)
local ret_body = false
end
elseif err then
rspamd_logger.errx(task, 'got error while getting greylisting keys: %1', err)
- upstream:fail()
return
end
- upstream:ok()
end
- local ret, _
- ret,_,upstream = rspamd_redis_make_request(task,
+ local ret = rspamd_redis_make_request(task,
redis_params, -- connect params
hash_key, -- hash key
false, -- is write
local hash_key = body_key .. meta_key
local function redis_set_cb(err)
- if not err then
- upstream:ok()
- else
+ if err then
rspamd_logger.errx(task, 'got error %s when setting greylisting record on server %s',
err, upstream:get_addr())
- upstream:fail()
end
end
if err then
rspamd_logger.errx(task, 'got error %s when publishing record on server %s',
err, upstream:get_addr())
- upstream:fail()
- else
- upstream:ok()
end
end
if settings.select then
end
local valid = false
- local ret,_,upstream
local function check_results(mxes)
if fun.all(function(_, elt) return elt.checked end, mxes) then
local function redis_cache_cb(err)
if err ~= nil then
rspamd_logger.errx(task, 'redis_cache_cb received error: %1', err)
- upstream:fail()
return
- else
- upstream:ok()
end
end
if not valid then
else
task:insert_result(settings.symbol_bad_mx, 1.0)
end
- ret,_,upstream = rspamd_redis_make_request(task,
+ local ret = rspamd_redis_make_request(task,
redis_params, -- connect params
key, -- hash key
false, -- is write
table.insert(valid_mx, k)
end, fun.filter(function (_, elt) return elt.working end, mxes))
task:insert_result(settings.symbol_good_mx, 1.0, valid_mx)
- ret,_,upstream = rspamd_redis_make_request(task,
+ local ret = rspamd_redis_make_request(task,
redis_params, -- connect params
key, -- hash key
false, -- is write
end
local key = settings.key_prefix .. mx_domain
- ret,_,upstream = rspamd_redis_make_request(task,
+ local ret = rspamd_redis_make_request(task,
redis_params, -- connect params
key, -- hash key
false, -- is write
)
if not ret then
- upstream:fail()
local r = task:get_resolver()
r:resolve('mx', {
name = mx_domain,
local function check_limits(task, args)
local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args)
- local ret,upstream
+ local ret
--- Called when value is got from server
local function rate_get_cb(err, data)
if err then
rspamd_logger.infox(task, 'got error while getting limit: %1', err)
- upstream:fail()
- else
- upstream:ok()
end
if not data then return end
local ntime = rspamd_util.get_time()
fun.map(function(a) return rspamd_str_split(a[2], ":")[2] end, args)))
end
- local _
- ret,_,upstream = rspamd_redis_make_request(task,
+ ret = rspamd_redis_make_request(task,
redis_params, -- connect params
key, -- hash key
false, -- is write
local ret, upstream
local function rate_set_cb(err)
- if not err then
- upstream:ok()
- else
+ if err then
rspamd_logger.infox(task, 'got error %s when setting ratelimit record on server %s',
err, upstream:get_addr())
end
local function rate_get_cb(err, data)
if err then
rspamd_logger.infox(task, 'got error while setting limit: %1', err)
- upstream:fail()
end
if not data then return end
local ntime = rspamd_util.get_time()