From: Vsevolod Stakhov Date: Thu, 12 Mar 2026 12:20:31 +0000 (+0000) Subject: [Fix] Add reconnection with retry logic for statistics restore X-Git-Tag: 4.0.0~24^2~2 X-Git-Url: http://git.ipfire.org/gitweb/?a=commitdiff_plain;h=f3f398e6307e86f963e9ed867acd16eb55bdcaa9;p=thirdparty%2Frspamd.git [Fix] Add reconnection with retry logic for statistics restore When restoring large datasets (24M+ lines), Redis connections can get terminated mid-restore. Add retry logic with automatic reconnection that resumes from the failed pipeline chunk, avoiding HINCRBYFLOAT double-counting by skipping past ambiguously-applied chunks. --- diff --git a/lualib/rspamadm/statistics_dump.lua b/lualib/rspamadm/statistics_dump.lua index 4068eda3f5..be73046956 100644 --- a/lualib/rspamadm/statistics_dump.lua +++ b/lualib/rspamadm/statistics_dump.lua @@ -647,9 +647,70 @@ local function estimate_redis_commands(obj, opts) return #value end -local function execute_batch(batch, conns, opts) - local cmd_pipe = {} +-- Send cmd_pipe commands to a single connection starting from start_idx. +-- Returns true on success, or (false, err, resume_idx) on failure where +-- resume_idx is the chunk start index that should be retried. +local function send_cmd_pipe(cmd_pipe, conn, start_idx) + for i = start_idx, #cmd_pipe, pipeline_max do + local chunk_end = math.min(i + pipeline_max - 1, #cmd_pipe) + local added = 0 + + for j = i, chunk_end do + local is_ok, err = conn:add_cmd(cmd_pipe[j][1], cmd_pipe[j][2]) + + if not is_ok then + rspamd_logger.errx("cannot add command: %s with args: %s: %s", + cmd_pipe[j][1], cmd_pipe[j][2], err) + -- add_cmd failed: no commands from this chunk were sent to Redis, + -- safe to retry from this chunk index + return false, err, i + end + + added = added + 1 + end + + if added > 0 then + local ret, err = conn:exec() + + if not ret then + local chunk_size = chunk_end - i + 1 + rspamd_logger.errx("cannot execute restore batch: %s; skipping %s commands in failed chunk to avoid double-counting", + err, chunk_size) + -- exec() failed: some commands in this chunk may have been applied, + -- advance past this chunk to avoid double-counting + return false, err, i + pipeline_max + end + end + end + + return true +end + +local function reconnect_all(selected) + local new_conns = {} + + for _, cls in ipairs(selected) do + local res, conn = lua_redis.redis_connect_sync(cls.redis_params, true) + + if not res then + rspamd_logger.errx("cannot reconnect to redis server: %s", cls.redis_params) + return nil + end + + table.insert(new_conns, conn) + end + return new_conns +end + +local max_retries = 3 + +local function flush_restore_batch(batch, conns, selected, opts) + if #batch == 0 then + return true, conns + end + + local cmd_pipe = {} for _, cmd in ipairs(batch) do obj_to_redis_arguments(cmd, opts, cmd_pipe) end @@ -658,52 +719,46 @@ local function execute_batch(batch, conns, opts) for _, cmd in ipairs(cmd_pipe) do rspamd_logger.messagex('%s %s', cmd[1], table.concat(cmd[2], ' ')) end - else - for _, conn in ipairs(conns) do - -- Chunk commands to avoid stack overflow on large datasets - for i = 1, #cmd_pipe, pipeline_max do - local chunk_end = math.min(i + pipeline_max - 1, #cmd_pipe) - local added = 0 - - for j = i, chunk_end do - local is_ok, err = conn:add_cmd(cmd_pipe[j][1], cmd_pipe[j][2]) - - if not is_ok then - rspamd_logger.errx("cannot add command: %s with args: %s: %s", - cmd_pipe[j][1], cmd_pipe[j][2], err) - return false, err - end + clear_fcn(batch) + return true, conns + end - added = added + 1 - end + for conn_idx, conn in ipairs(conns) do + local resume_idx = 1 - if added > 0 then - local ret, err = conn:exec() + for attempt = 1, max_retries do + local ok, err, next_idx = send_cmd_pipe(cmd_pipe, conn, resume_idx) - if not ret then - rspamd_logger.errx("cannot execute restore batch: %s", err) - return false, err - end - end + if ok then + break end - end - end - return true -end + if attempt == max_retries then + rspamd_logger.errx("batch failed after %s attempts: %s", max_retries, err) + return false, conns + end -local function flush_restore_batch(batch, conns, opts) - if #batch == 0 then - return true - end + resume_idx = next_idx or resume_idx + rspamd_logger.messagex("batch failed at command %s/%s, reconnecting (attempt %s/%s)", + resume_idx, #cmd_pipe, attempt, max_retries) - local ok = execute_batch(batch, conns, opts) - if not ok then - return false + -- Brief pause before reconnecting to handle transient Redis unavailability + os.execute("sleep 1") + + local new_conns = reconnect_all(selected) + + if not new_conns then + rspamd_logger.errx("reconnection failed on attempt %s", attempt) + return false, conns + end + + conns = new_conns + conn = conns[conn_idx] + end end clear_fcn(batch) - return true + return true, conns end local function restore_handler(opts) @@ -749,7 +804,8 @@ local function restore_handler(opts) cur_line = cur_line + 1 if #batch >= opts.batch_size or pending_cmds >= restore_pipeline_limit then - local ok = flush_restore_batch(batch, conns, opts) + local ok + ok, conns = flush_restore_batch(batch, conns, selected, opts) if not ok then os.exit(1) end @@ -768,7 +824,8 @@ local function restore_handler(opts) end if #batch > 0 then - local ok = flush_restore_batch(batch, conns, opts) + local ok + ok, conns = flush_restore_batch(batch, conns, selected, opts) if not ok then os.exit(1) end