local final_call = false -- If the final collection has been started
local schema_version = 9 -- Current schema version
+local extra_tables = {}
+local extra_table_rows = {}
+
local settings = {
limits = { -- Collection limits
max_rows = 1000, -- How many rows are allowed (0 for disable this)
return false
end
-local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
+local function upload_extra_table_schema(upstream, ev_base, cfg, tbl_config)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ local sql = lua_util.template(tbl_config.schema, settings)
+ local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+ if err then
+ return err
+ end
+ rspamd_logger.infox(rspamd_config, 'uploaded extra table schema for %s (table: %s)',
+ tbl_config.name, tbl_config.table_name)
+ return nil
+end
+
+local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows, extra_rows)
local log_object = task or rspamd_config
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
settings.custom_rules[k].first_row())
end
end
+
+ if extra_rows then
+ for name, tbl_config in pairs(extra_tables) do
+ local trows = extra_rows[name]
+ if trows and #trows > 0 then
+ if not tbl_config.schema_uploaded then
+ local err = upload_extra_table_schema(upstream, ev_base, rspamd_config, tbl_config)
+ if err then
+ rspamd_logger.errx(log_object, 'failed to upload schema for extra table %s: %s', name, err)
+ else
+ tbl_config.schema_uploaded = true
+ end
+ end
+
+ if tbl_config.schema_uploaded then
+ send_data('extra table (' .. name .. ')', trows, tbl_config.insert_query())
+ end
+ end
+ end
+ end
+end
+
+--[[
+Register an extra Clickhouse table from another plugin.
+@param opts table with fields:
+ - name: unique registration identifier
+ - table_name: Clickhouse table name
+ - schema: CREATE TABLE statement
+ - insert_query: function returning INSERT statement
+ - get_row: function(task) returning row data table, array of rows, or nil
+ - retention: optional {enable, period_months, method}
+@return success, error_message
+]]
+local function register_extra_table(opts)
+ if not settings.upstream then
+ return false, 'clickhouse plugin is not configured'
+ end
+
+ if not opts.name or type(opts.name) ~= 'string' or #opts.name == 0 then
+ return false, 'name is required and must be a non-empty string'
+ end
+
+ if not opts.table_name or type(opts.table_name) ~= 'string' or #opts.table_name == 0 then
+ return false, 'table_name is required and must be a non-empty string'
+ end
+
+ if not opts.schema or type(opts.schema) ~= 'string' or #opts.schema == 0 then
+ return false, 'schema is required and must be a non-empty string'
+ end
+
+ if type(opts.insert_query) ~= 'function' then
+ return false, 'insert_query must be a function'
+ end
+
+ if type(opts.get_row) ~= 'function' then
+ return false, 'get_row must be a function'
+ end
+
+ if extra_tables[opts.name] then
+ return false, string.format('extra table "%s" is already registered', opts.name)
+ end
+
+ local retention = { enable = false }
+ if opts.retention then
+ if type(opts.retention) ~= 'table' then
+ return false, 'retention must be a table'
+ end
+ retention.enable = opts.retention.enable or false
+ retention.period_months = opts.retention.period_months or settings.retention.period_months
+ retention.method = opts.retention.method or settings.retention.method
+
+ if retention.method ~= 'drop' and retention.method ~= 'detach' then
+ return false, 'retention.method must be "drop" or "detach"'
+ end
+ if type(retention.period_months) ~= 'number' or
+ retention.period_months < 1 or
+ retention.period_months > 1000 then
+ return false, 'retention.period_months must be between 1 and 1000'
+ end
+ end
+
+ extra_tables[opts.name] = {
+ name = opts.name,
+ table_name = opts.table_name,
+ schema = opts.schema,
+ insert_query = opts.insert_query,
+ get_row = opts.get_row,
+ retention = retention,
+ schema_uploaded = false,
+ }
+
+ rspamd_logger.infox(rspamd_config, 'registered clickhouse extra table: %s (table: %s)',
+ opts.name, opts.table_name)
+
+ return true, nil
+end
+
+local function unregister_extra_table(name)
+ if extra_tables[name] then
+ extra_tables[name] = nil
+ extra_table_rows[name] = nil
+ rspamd_logger.infox(rspamd_config, 'unregistered clickhouse extra table: %s', name)
+ return true
+ end
+ return false
+end
+
+local function get_extra_tables()
+ local result = {}
+ for name, config in pairs(extra_tables) do
+ result[name] = {
+ name = config.name,
+ table_name = config.table_name,
+ schema_uploaded = config.schema_uploaded,
+ retention = config.retention,
+ }
+ end
+ return result
end
local function clickhouse_collect(task)
table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task)))
end
+ for name, tbl_config in pairs(extra_tables) do
+ if not extra_table_rows[name] then
+ extra_table_rows[name] = {}
+ end
+
+ local ok, row_data = pcall(tbl_config.get_row, task)
+ if ok and row_data then
+ -- get_row can return a single row or array of rows
+ if row_data[1] and type(row_data[1]) == 'table' then
+ for _, single_row in ipairs(row_data) do
+ local tsv = lua_clickhouse.row_to_tsv(single_row)
+ table.insert(extra_table_rows[name], tsv)
+ used_memory = used_memory + #tsv
+ end
+ else
+ local tsv = lua_clickhouse.row_to_tsv(row_data)
+ table.insert(extra_table_rows[name], tsv)
+ used_memory = used_memory + #tsv
+ end
+ elseif not ok then
+ rspamd_logger.errx(task, 'error in get_row for extra table %s: %s', name, row_data)
+ end
+ end
+
local dominated = false
if settings.limits.max_rows > 0 and nrows > settings.limits.max_rows * 10 then
dominated = true
used_memory = 0
data_rows = {}
custom_rows = {}
+ extra_table_rows = {}
collectgarbage()
return
end
used_memory, settings.limits.max_memory)
end
-local function do_remove_partition(ev_base, cfg, table_name, partition)
+local function do_remove_partition(ev_base, cfg, table_name, partition, method_override)
lua_util.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition)
local upstream = settings.upstream:get_upstream_round_robin()
local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION '${partition}'"
- local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
+ local method = method_override or settings.retention.method
+ local remove_method = (method == 'drop') and 'DROP' or 'DETACH'
local sql_params = {
['table_name'] = table_name,
['remove_method'] = remove_method,
if need_collect then
local saved_rows = data_rows
local saved_custom = custom_rows
+ local saved_extra = extra_table_rows
nrows = 0
last_collection = now
used_memory = 0
data_rows = {}
custom_rows = {}
+ extra_table_rows = {}
- clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)
+ clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom, saved_extra)
if settings.collect_garbage then
collectgarbage()
"HAVING max(max_date) < toDate(now() - interval ${month} month)"
local table_names = { 'rspamd' }
+ local table_retention = {
+ rspamd = {
+ period_months = settings.retention.period_months,
+ method = settings.retention.method
+ }
+ }
+
+ for _, tbl_config in pairs(extra_tables) do
+ if tbl_config.retention and tbl_config.retention.enable then
+ table.insert(table_names, tbl_config.table_name)
+ table_retention[tbl_config.table_name] = {
+ period_months = tbl_config.retention.period_months,
+ method = tbl_config.retention.method
+ }
+ end
+ end
+
+ -- Use minimum retention period to find all candidate partitions
+ local min_period = settings.retention.period_months
+ for _, ret in pairs(table_retention) do
+ if ret.period_months < min_period then
+ min_period = ret.period_months
+ end
+ end
+
local tables = table.concat(table_names, "', '")
local sql_params = {
tables = tables,
- month = settings.retention.period_months,
+ month = min_period,
}
local sql = lua_util.template(partition_to_remove_sql, sql_params)
settings['server'], err)
else
fun.each(function(row)
- do_remove_partition(ev_base, cfg, row.table, row.partition)
+ local ret = table_retention[row.table]
+ if ret then
+ do_remove_partition(ev_base, cfg, row.table, row.partition, ret.method)
+ else
+ do_remove_partition(ev_base, cfg, row.table, row.partition)
+ end
end, rows)
end
final_call = true
local saved_rows = data_rows
local saved_custom = custom_rows
+ local saved_extra = extra_table_rows
nrows = 0
data_rows = {}
used_memory = 0
custom_rows = {}
+ extra_table_rows = {}
clickhouse_send_data(task, nil, 'final collection',
- saved_rows, saved_custom)
+ saved_rows, saved_custom, saved_extra)
if settings.collect_garbage then
collectgarbage()
end
end
end)
+
+ -- Expose API for other plugins to register extra tables
+ rspamd_plugins['clickhouse'] = {
+ register_extra_table = register_extra_table,
+ unregister_extra_table = unregister_extra_table,
+ get_extra_tables = get_extra_tables,
+ is_enabled = function()
+ return settings.upstream ~= nil
+ end,
+ }
end
end