From: Vsevolod Stakhov Date: Tue, 16 Dec 2025 12:02:25 +0000 (+0000) Subject: [Fix] Move safety valve check to collect functions X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d3a77bc97587c2c3a1b7730d9b0a8f97076a94a5;p=thirdparty%2Frspamd.git [Fix] Move safety valve check to collect functions Check limits when adding new rows, not in periodic sender. --- diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 260ce13fd5..5454c559d8 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -922,6 +922,27 @@ local function clickhouse_collect(task) table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task))) end + local dominated = false + if settings.limits.max_rows > 0 and nrows > settings.limits.max_rows * 10 then + dominated = true + rspamd_logger.errx(task, 'row count limit exceeded 10x: %d rows (limit %d), discarding data', + nrows, settings.limits.max_rows) + end + if settings.limits.max_memory > 0 and used_memory >= settings.limits.max_memory * 10 then + dominated = true + rspamd_logger.errx(task, 'memory limit exceeded 10x: %d bytes (limit %d), discarding data', + used_memory, settings.limits.max_memory) + end + + if dominated then + nrows = 0 + used_memory = 0 + data_rows = {} + custom_rows = {} + collectgarbage() + return + end + local tsv_row = lua_clickhouse.row_to_tsv(row) used_memory = used_memory + #tsv_row data_rows[#data_rows + 1] = tsv_row @@ -1032,31 +1053,13 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) return 0 end - local dominated = false - if settings.limits.max_rows > 0 then - if nrows > settings.limits.max_rows * 10 then - dominated = true - rspamd_logger.errx(cfg, 'row count limit exceeded 10x: %d rows (limit %d), discarding data', - nrows, settings.limits.max_rows) - elseif nrows > settings.limits.max_rows then + if nrows > settings.limits.max_rows then need_collect = true reason = string.format('limit of rows has been reached: %d', nrows) end end - if settings.limits.max_memory > 0 then - if used_memory >= settings.limits.max_memory * 10 then - dominated = true - rspamd_logger.errx(cfg, 'memory limit exceeded 10x: %d bytes (limit %d), discarding data', - used_memory, settings.limits.max_memory) - elseif used_memory >= settings.limits.max_memory then - need_collect = true - reason = string.format('limit of memory has been reached: %d bytes used', - used_memory) - end - end - if last_collection > 0 and settings.limits.max_interval > 0 then if now - last_collection > settings.limits.max_interval then need_collect = true @@ -1066,18 +1069,19 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) end end + if settings.limits.max_memory > 0 then + if used_memory >= settings.limits.max_memory then + need_collect = true + reason = string.format('limit of memory has been reached: %d bytes used', + used_memory) + end + end + if last_collection == 0 then last_collection = now end - if dominated then - nrows = 0 - last_collection = now - used_memory = 0 - data_rows = {} - custom_rows = {} - collectgarbage() - elseif need_collect then + if need_collect then local saved_rows = data_rows local saved_custom = custom_rows nrows = 0 diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 31d8d96616..d78f7ad743 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -672,6 +672,15 @@ local function elastic_collect(task) end end + local nlogs = buffer['logs']:length() + if nlogs >= settings['limits']['max_rows'] * 10 then + rspamd_logger.errx(task, 'row count limit exceeded 10x: %s rows (limit %s), discarding data', + nlogs, settings['limits']['max_rows']) + buffer['logs'] = lua_util.newdeque() + collectgarbage() + return + end + local now = tostring(rspamd_util.get_time() * 1000) local row = { ['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = now } buffer['logs']:push(row) @@ -683,14 +692,7 @@ local function periodic_send_data(cfg, ev_base) local flush_needed = false local nlogs_total = buffer['logs']:length() - if nlogs_total >= settings['limits']['max_rows'] * 10 then - rspamd_logger.errx(rspamd_config, - 'row count limit exceeded 10x: %s rows (limit %s), discarding data', - nlogs_total, settings['limits']['max_rows']) - buffer['logs'] = lua_util.newdeque() - collectgarbage() - return - elseif nlogs_total >= settings['limits']['max_rows'] then + if nlogs_total >= settings['limits']['max_rows'] then rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total, settings['limits']['max_rows']) flush_needed = true