table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task)))
end
+ local dominated = false
+ if settings.limits.max_rows > 0 and nrows > settings.limits.max_rows * 10 then
+ dominated = true
+ rspamd_logger.errx(task, 'row count limit exceeded 10x: %d rows (limit %d), discarding data',
+ nrows, settings.limits.max_rows)
+ end
+ if settings.limits.max_memory > 0 and used_memory >= settings.limits.max_memory * 10 then
+ dominated = true
+ rspamd_logger.errx(task, 'memory limit exceeded 10x: %d bytes (limit %d), discarding data',
+ used_memory, settings.limits.max_memory)
+ end
+
+ if dominated then
+ nrows = 0
+ used_memory = 0
+ data_rows = {}
+ custom_rows = {}
+ collectgarbage()
+ return
+ end
+
local tsv_row = lua_clickhouse.row_to_tsv(row)
used_memory = used_memory + #tsv_row
data_rows[#data_rows + 1] = tsv_row
return 0
end
- local dominated = false
-
if settings.limits.max_rows > 0 then
- if nrows > settings.limits.max_rows * 10 then
- dominated = true
- rspamd_logger.errx(cfg, 'row count limit exceeded 10x: %d rows (limit %d), discarding data',
- nrows, settings.limits.max_rows)
- elseif nrows > settings.limits.max_rows then
+ if nrows > settings.limits.max_rows then
need_collect = true
reason = string.format('limit of rows has been reached: %d', nrows)
end
end
- if settings.limits.max_memory > 0 then
- if used_memory >= settings.limits.max_memory * 10 then
- dominated = true
- rspamd_logger.errx(cfg, 'memory limit exceeded 10x: %d bytes (limit %d), discarding data',
- used_memory, settings.limits.max_memory)
- elseif used_memory >= settings.limits.max_memory then
- need_collect = true
- reason = string.format('limit of memory has been reached: %d bytes used',
- used_memory)
- end
- end
-
if last_collection > 0 and settings.limits.max_interval > 0 then
if now - last_collection > settings.limits.max_interval then
need_collect = true
end
end
+ if settings.limits.max_memory > 0 then
+ if used_memory >= settings.limits.max_memory then
+ need_collect = true
+ reason = string.format('limit of memory has been reached: %d bytes used',
+ used_memory)
+ end
+ end
+
if last_collection == 0 then
last_collection = now
end
- if dominated then
- nrows = 0
- last_collection = now
- used_memory = 0
- data_rows = {}
- custom_rows = {}
- collectgarbage()
- elseif need_collect then
+ if need_collect then
local saved_rows = data_rows
local saved_custom = custom_rows
nrows = 0
end
end
+ local nlogs = buffer['logs']:length()
+ if nlogs >= settings['limits']['max_rows'] * 10 then
+ rspamd_logger.errx(task, 'row count limit exceeded 10x: %s rows (limit %s), discarding data',
+ nlogs, settings['limits']['max_rows'])
+ buffer['logs'] = lua_util.newdeque()
+ collectgarbage()
+ return
+ end
+
local now = tostring(rspamd_util.get_time() * 1000)
local row = { ['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = now }
buffer['logs']:push(row)
local flush_needed = false
local nlogs_total = buffer['logs']:length()
- if nlogs_total >= settings['limits']['max_rows'] * 10 then
- rspamd_logger.errx(rspamd_config,
- 'row count limit exceeded 10x: %s rows (limit %s), discarding data',
- nlogs_total, settings['limits']['max_rows'])
- buffer['logs'] = lua_util.newdeque()
- collectgarbage()
- return
- elseif nlogs_total >= settings['limits']['max_rows'] then
+ if nlogs_total >= settings['limits']['max_rows'] then
rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total,
settings['limits']['max_rows'])
flush_needed = true