]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Add reconnection with retry logic for statistics restore
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 12 Mar 2026 12:20:31 +0000 (12:20 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 12 Mar 2026 12:20:31 +0000 (12:20 +0000)
When restoring large datasets (24M+ lines), Redis connections can get
terminated mid-restore. Add retry logic with automatic reconnection
that resumes from the failed pipeline chunk, avoiding HINCRBYFLOAT
double-counting by skipping past ambiguously-applied chunks.

lualib/rspamadm/statistics_dump.lua

index 4068eda3f5e778f54cde2e1cbbd8fa4458557303..be730469568c46de597cdd870a188a62941d590b 100644 (file)
@@ -647,9 +647,70 @@ local function estimate_redis_commands(obj, opts)
   return #value
 end
 
-local function execute_batch(batch, conns, opts)
-  local cmd_pipe = {}
+-- Send cmd_pipe commands to a single connection starting from start_idx.
+-- Returns true on success, or (false, err, resume_idx) on failure where
+-- resume_idx is the chunk start index that should be retried.
+local function send_cmd_pipe(cmd_pipe, conn, start_idx)
+  for i = start_idx, #cmd_pipe, pipeline_max do
+    local chunk_end = math.min(i + pipeline_max - 1, #cmd_pipe)
+    local added = 0
+
+    for j = i, chunk_end do
+      local is_ok, err = conn:add_cmd(cmd_pipe[j][1], cmd_pipe[j][2])
+
+      if not is_ok then
+        rspamd_logger.errx("cannot add command: %s with args: %s: %s",
+            cmd_pipe[j][1], cmd_pipe[j][2], err)
+        -- add_cmd failed: no commands from this chunk were sent to Redis,
+        -- safe to retry from this chunk index
+        return false, err, i
+      end
+
+      added = added + 1
+    end
+
+    if added > 0 then
+      local ret, err = conn:exec()
+
+      if not ret then
+        local chunk_size = chunk_end - i + 1
+        rspamd_logger.errx("cannot execute restore batch: %s; skipping %s commands in failed chunk to avoid double-counting",
+            err, chunk_size)
+        -- exec() failed: some commands in this chunk may have been applied,
+        -- advance past this chunk to avoid double-counting
+        return false, err, i + pipeline_max
+      end
+    end
+  end
+
+  return true
+end
+
+local function reconnect_all(selected)
+  local new_conns = {}
+
+  for _, cls in ipairs(selected) do
+    local res, conn = lua_redis.redis_connect_sync(cls.redis_params, true)
+
+    if not res then
+      rspamd_logger.errx("cannot reconnect to redis server: %s", cls.redis_params)
+      return nil
+    end
+
+    table.insert(new_conns, conn)
+  end
 
+  return new_conns
+end
+
+local max_retries = 3
+
+local function flush_restore_batch(batch, conns, selected, opts)
+  if #batch == 0 then
+    return true, conns
+  end
+
+  local cmd_pipe = {}
   for _, cmd in ipairs(batch) do
     obj_to_redis_arguments(cmd, opts, cmd_pipe)
   end
@@ -658,52 +719,46 @@ local function execute_batch(batch, conns, opts)
     for _, cmd in ipairs(cmd_pipe) do
       rspamd_logger.messagex('%s %s', cmd[1], table.concat(cmd[2], ' '))
     end
-  else
-    for _, conn in ipairs(conns) do
-      -- Chunk commands to avoid stack overflow on large datasets
-      for i = 1, #cmd_pipe, pipeline_max do
-        local chunk_end = math.min(i + pipeline_max - 1, #cmd_pipe)
-        local added = 0
-
-        for j = i, chunk_end do
-          local is_ok, err = conn:add_cmd(cmd_pipe[j][1], cmd_pipe[j][2])
-
-          if not is_ok then
-            rspamd_logger.errx("cannot add command: %s with args: %s: %s",
-                cmd_pipe[j][1], cmd_pipe[j][2], err)
-            return false, err
-          end
+    clear_fcn(batch)
+    return true, conns
+  end
 
-          added = added + 1
-        end
+  for conn_idx, conn in ipairs(conns) do
+    local resume_idx = 1
 
-        if added > 0 then
-          local ret, err = conn:exec()
+    for attempt = 1, max_retries do
+      local ok, err, next_idx = send_cmd_pipe(cmd_pipe, conn, resume_idx)
 
-          if not ret then
-            rspamd_logger.errx("cannot execute restore batch: %s", err)
-            return false, err
-          end
-        end
+      if ok then
+        break
       end
-    end
-  end
 
-  return true
-end
+      if attempt == max_retries then
+        rspamd_logger.errx("batch failed after %s attempts: %s", max_retries, err)
+        return false, conns
+      end
 
-local function flush_restore_batch(batch, conns, opts)
-  if #batch == 0 then
-    return true
-  end
+      resume_idx = next_idx or resume_idx
+      rspamd_logger.messagex("batch failed at command %s/%s, reconnecting (attempt %s/%s)",
+          resume_idx, #cmd_pipe, attempt, max_retries)
 
-  local ok = execute_batch(batch, conns, opts)
-  if not ok then
-    return false
+      -- Brief pause before reconnecting to handle transient Redis unavailability
+      os.execute("sleep 1")
+
+      local new_conns = reconnect_all(selected)
+
+      if not new_conns then
+        rspamd_logger.errx("reconnection failed on attempt %s", attempt)
+        return false, conns
+      end
+
+      conns = new_conns
+      conn = conns[conn_idx]
+    end
   end
 
   clear_fcn(batch)
-  return true
+  return true, conns
 end
 
 local function restore_handler(opts)
@@ -749,7 +804,8 @@ local function restore_handler(opts)
       cur_line = cur_line + 1
 
       if #batch >= opts.batch_size or pending_cmds >= restore_pipeline_limit then
-        local ok = flush_restore_batch(batch, conns, opts)
+        local ok
+        ok, conns = flush_restore_batch(batch, conns, selected, opts)
         if not ok then
           os.exit(1)
         end
@@ -768,7 +824,8 @@ local function restore_handler(opts)
   end
 
   if #batch > 0 then
-    local ok = flush_restore_batch(batch, conns, opts)
+    local ok
+    ok, conns = flush_restore_batch(batch, conns, selected, opts)
     if not ok then
       os.exit(1)
     end