local batch = {}
local pending_cmds = 0
+ local total_lines = 0
+ local total_cmds = 0
+ local total_batches = 0
+ local start_time = os.time()
+ local last_report_time = start_time
+
+ rspamd_logger.messagex("starting restore (batch_size=%s, pipeline_max=%s, mode=%s)",
+ opts.batch_size, pipeline_max, opts.mode or 'add')
for _, f in ipairs(files) do
local fd
io.input(fd)
end
+ rspamd_logger.messagex("processing file: %s", f)
local cur_line = 1
for line in io.lines() do
local ucl_parser = ucl.parser()
local ok
ok, conns = flush_restore_batch(batch, conns, selected, opts)
if not ok then
+ rspamd_logger.errx("restore failed at line %s (total restored: %s lines, %s commands in %s batches)",
+ total_lines + cur_line, total_lines, total_cmds, total_batches)
os.exit(1)
end
+ total_cmds = total_cmds + pending_cmds
+ total_batches = total_batches + 1
pending_cmds = 0
- if cur_line % (opts.batch_size * 10) == 0 then
- collectgarbage('collect')
- rspamd_logger.messagex("restored %s lines", cur_line)
+ -- Incremental GC after each batch to spread collection cost
+ collectgarbage('step', 100)
+
+ local now = os.time()
+ if now - last_report_time >= 10 then
+ local elapsed = now - start_time
+ local rate = total_lines > 0 and math.floor(total_lines / elapsed) or 0
+ rspamd_logger.messagex("restored %s lines, %s commands in %s batches (%s lines/sec, %s KB lua mem)",
+ total_lines + cur_line - 1, total_cmds, total_batches, rate,
+ math.floor(collectgarbage('count')))
+ last_report_time = now
end
end
end
+ total_lines = total_lines + cur_line - 1
+
if fd then
fd:close()
end
+
+ -- Full GC between files
+ collectgarbage('collect')
end
if #batch > 0 then
local ok
ok, conns = flush_restore_batch(batch, conns, selected, opts)
if not ok then
+ rspamd_logger.errx("restore failed on final batch (total restored: %s lines, %s commands)",
+ total_lines, total_cmds)
os.exit(1)
end
+ total_cmds = total_cmds + pending_cmds
+ total_batches = total_batches + 1
+ end
+
+ local elapsed = os.time() - start_time
+ if elapsed == 0 then
+ elapsed = 1
end
+ rspamd_logger.messagex("restore complete: %s lines, %s commands in %s batches (%s sec, %s lines/sec)",
+ total_lines, total_cmds, total_batches, elapsed, math.floor(total_lines / elapsed))
end
-- Migrate a single prefix's token keys from source to target using pipelined commands.