]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Move safety valve check to collect functions
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 16 Dec 2025 12:02:25 +0000 (12:02 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 16 Dec 2025 12:02:25 +0000 (12:02 +0000)
Check limits when adding new rows, not in periodic sender.

src/plugins/lua/clickhouse.lua
src/plugins/lua/elastic.lua

index 260ce13fd50260aaa7e841dc3178fbcff942303f..5454c559d8c3f7b19748705036b524373f4e926a 100644 (file)
@@ -922,6 +922,27 @@ local function clickhouse_collect(task)
     table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task)))
   end
 
+  local dominated = false
+  if settings.limits.max_rows > 0 and nrows > settings.limits.max_rows * 10 then
+    dominated = true
+    rspamd_logger.errx(task, 'row count limit exceeded 10x: %d rows (limit %d), discarding data',
+        nrows, settings.limits.max_rows)
+  end
+  if settings.limits.max_memory > 0 and used_memory >= settings.limits.max_memory * 10 then
+    dominated = true
+    rspamd_logger.errx(task, 'memory limit exceeded 10x: %d bytes (limit %d), discarding data',
+        used_memory, settings.limits.max_memory)
+  end
+
+  if dominated then
+    nrows = 0
+    used_memory = 0
+    data_rows = {}
+    custom_rows = {}
+    collectgarbage()
+    return
+  end
+
   local tsv_row = lua_clickhouse.row_to_tsv(row)
   used_memory = used_memory + #tsv_row
   data_rows[#data_rows + 1] = tsv_row
@@ -1032,31 +1053,13 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
     return 0
   end
 
-  local dominated = false
-
   if settings.limits.max_rows > 0 then
-    if nrows > settings.limits.max_rows * 10 then
-      dominated = true
-      rspamd_logger.errx(cfg, 'row count limit exceeded 10x: %d rows (limit %d), discarding data',
-          nrows, settings.limits.max_rows)
-    elseif nrows > settings.limits.max_rows then
+    if nrows > settings.limits.max_rows then
       need_collect = true
       reason = string.format('limit of rows has been reached: %d', nrows)
     end
   end
 
-  if settings.limits.max_memory > 0 then
-    if used_memory >= settings.limits.max_memory * 10 then
-      dominated = true
-      rspamd_logger.errx(cfg, 'memory limit exceeded 10x: %d bytes (limit %d), discarding data',
-          used_memory, settings.limits.max_memory)
-    elseif used_memory >= settings.limits.max_memory then
-      need_collect = true
-      reason = string.format('limit of memory has been reached: %d bytes used',
-          used_memory)
-    end
-  end
-
   if last_collection > 0 and settings.limits.max_interval > 0 then
     if now - last_collection > settings.limits.max_interval then
       need_collect = true
@@ -1066,18 +1069,19 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
     end
   end
 
+  if settings.limits.max_memory > 0 then
+    if used_memory >= settings.limits.max_memory then
+      need_collect = true
+      reason = string.format('limit of memory has been reached: %d bytes used',
+          used_memory)
+    end
+  end
+
   if last_collection == 0 then
     last_collection = now
   end
 
-  if dominated then
-    nrows = 0
-    last_collection = now
-    used_memory = 0
-    data_rows = {}
-    custom_rows = {}
-    collectgarbage()
-  elseif need_collect then
+  if need_collect then
     local saved_rows = data_rows
     local saved_custom = custom_rows
     nrows = 0
index 31d8d966160d2864beb967091bc74c7eea6c77d2..d78f7ad7437b33092dbde5e6dc3a976f7cf99d0e 100644 (file)
@@ -672,6 +672,15 @@ local function elastic_collect(task)
     end
   end
 
+  local nlogs = buffer['logs']:length()
+  if nlogs >= settings['limits']['max_rows'] * 10 then
+    rspamd_logger.errx(task, 'row count limit exceeded 10x: %s rows (limit %s), discarding data',
+        nlogs, settings['limits']['max_rows'])
+    buffer['logs'] = lua_util.newdeque()
+    collectgarbage()
+    return
+  end
+
   local now = tostring(rspamd_util.get_time() * 1000)
   local row = { ['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = now }
   buffer['logs']:push(row)
@@ -683,14 +692,7 @@ local function periodic_send_data(cfg, ev_base)
   local flush_needed = false
 
   local nlogs_total = buffer['logs']:length()
-  if nlogs_total >= settings['limits']['max_rows'] * 10 then
-    rspamd_logger.errx(rspamd_config,
-        'row count limit exceeded 10x: %s rows (limit %s), discarding data',
-        nlogs_total, settings['limits']['max_rows'])
-    buffer['logs'] = lua_util.newdeque()
-    collectgarbage()
-    return
-  elseif nlogs_total >= settings['limits']['max_rows'] then
+  if nlogs_total >= settings['limits']['max_rows'] then
     rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total,
         settings['limits']['max_rows'])
     flush_needed = true