return #value
end
-local function execute_batch(batch, conns, opts)
- local cmd_pipe = {}
+-- Send cmd_pipe commands to a single connection starting from start_idx.
+-- Returns true on success, or (false, err, resume_idx) on failure where
+-- resume_idx is the chunk start index that should be retried.
+local function send_cmd_pipe(cmd_pipe, conn, start_idx)
+ for i = start_idx, #cmd_pipe, pipeline_max do
+ local chunk_end = math.min(i + pipeline_max - 1, #cmd_pipe)
+ local added = 0
+
+ for j = i, chunk_end do
+ local is_ok, err = conn:add_cmd(cmd_pipe[j][1], cmd_pipe[j][2])
+
+ if not is_ok then
+ rspamd_logger.errx("cannot add command: %s with args: %s: %s",
+ cmd_pipe[j][1], cmd_pipe[j][2], err)
+ -- add_cmd failed: no commands from this chunk were sent to Redis,
+ -- safe to retry from this chunk index
+ return false, err, i
+ end
+
+ added = added + 1
+ end
+
+ if added > 0 then
+ local ret, err = conn:exec()
+
+ if not ret then
+ local chunk_size = chunk_end - i + 1
+ rspamd_logger.errx("cannot execute restore batch: %s; skipping %s commands in failed chunk to avoid double-counting",
+ err, chunk_size)
+ -- exec() failed: some commands in this chunk may have been applied,
+ -- advance past this chunk to avoid double-counting
+ return false, err, i + pipeline_max
+ end
+ end
+ end
+
+ return true
+end
+
+local function reconnect_all(selected)
+ local new_conns = {}
+
+ for _, cls in ipairs(selected) do
+ local res, conn = lua_redis.redis_connect_sync(cls.redis_params, true)
+
+ if not res then
+ rspamd_logger.errx("cannot reconnect to redis server: %s", cls.redis_params)
+ return nil
+ end
+
+ table.insert(new_conns, conn)
+ end
+ return new_conns
+end
+
+local max_retries = 3
+
+local function flush_restore_batch(batch, conns, selected, opts)
+ if #batch == 0 then
+ return true, conns
+ end
+
+ local cmd_pipe = {}
for _, cmd in ipairs(batch) do
obj_to_redis_arguments(cmd, opts, cmd_pipe)
end
for _, cmd in ipairs(cmd_pipe) do
rspamd_logger.messagex('%s %s', cmd[1], table.concat(cmd[2], ' '))
end
- else
- for _, conn in ipairs(conns) do
- -- Chunk commands to avoid stack overflow on large datasets
- for i = 1, #cmd_pipe, pipeline_max do
- local chunk_end = math.min(i + pipeline_max - 1, #cmd_pipe)
- local added = 0
-
- for j = i, chunk_end do
- local is_ok, err = conn:add_cmd(cmd_pipe[j][1], cmd_pipe[j][2])
-
- if not is_ok then
- rspamd_logger.errx("cannot add command: %s with args: %s: %s",
- cmd_pipe[j][1], cmd_pipe[j][2], err)
- return false, err
- end
+ clear_fcn(batch)
+ return true, conns
+ end
- added = added + 1
- end
+ for conn_idx, conn in ipairs(conns) do
+ local resume_idx = 1
- if added > 0 then
- local ret, err = conn:exec()
+ for attempt = 1, max_retries do
+ local ok, err, next_idx = send_cmd_pipe(cmd_pipe, conn, resume_idx)
- if not ret then
- rspamd_logger.errx("cannot execute restore batch: %s", err)
- return false, err
- end
- end
+ if ok then
+ break
end
- end
- end
- return true
-end
+ if attempt == max_retries then
+ rspamd_logger.errx("batch failed after %s attempts: %s", max_retries, err)
+ return false, conns
+ end
-local function flush_restore_batch(batch, conns, opts)
- if #batch == 0 then
- return true
- end
+ resume_idx = next_idx or resume_idx
+ rspamd_logger.messagex("batch failed at command %s/%s, reconnecting (attempt %s/%s)",
+ resume_idx, #cmd_pipe, attempt, max_retries)
- local ok = execute_batch(batch, conns, opts)
- if not ok then
- return false
+ -- Brief pause before reconnecting to handle transient Redis unavailability
+ os.execute("sleep 1")
+
+ local new_conns = reconnect_all(selected)
+
+ if not new_conns then
+ rspamd_logger.errx("reconnection failed on attempt %s", attempt)
+ return false, conns
+ end
+
+ conns = new_conns
+ conn = conns[conn_idx]
+ end
end
clear_fcn(batch)
- return true
+ return true, conns
end
local function restore_handler(opts)
cur_line = cur_line + 1
if #batch >= opts.batch_size or pending_cmds >= restore_pipeline_limit then
- local ok = flush_restore_batch(batch, conns, opts)
+ local ok
+ ok, conns = flush_restore_batch(batch, conns, selected, opts)
if not ok then
os.exit(1)
end
end
if #batch > 0 then
- local ok = flush_restore_batch(batch, conns, opts)
+ local ok
+ ok, conns = flush_restore_batch(batch, conns, selected, opts)
if not ok then
os.exit(1)
end