From: Vsevolod Stakhov Date: Wed, 31 Dec 2025 10:54:55 +0000 (+0000) Subject: [Feature] Add extra tables API for clickhouse plugin X-Git-Tag: 3.14.3~15^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9882c1b21e360c9e08aa987046401e1a8b86a6d7;p=thirdparty%2Frspamd.git [Feature] Add extra tables API for clickhouse plugin Allow other plugins to dynamically register custom Clickhouse tables via rspamd_plugins['clickhouse'].register_extra_table(). Supports per-table schemas, row callbacks (single or multiple rows), and independent retention settings. --- diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 5454c559d8..8e522d05ba 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -35,6 +35,9 @@ local last_collection = 0 local final_call = false -- If the final collection has been started local schema_version = 9 -- Current schema version +local extra_tables = {} +local extra_table_rows = {} + local settings = { limits = { -- Collection limits max_rows = 1000, -- How many rows are allowed (0 for disable this) @@ -407,7 +410,22 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table, return false end -local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows) +local function upload_extra_table_schema(upstream, ev_base, cfg, tbl_config) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + local sql = lua_util.template(tbl_config.schema, settings) + local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) + if err then + return err + end + rspamd_logger.infox(rspamd_config, 'uploaded extra table schema for %s (table: %s)', + tbl_config.name, tbl_config.table_name) + return nil +end + +local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows, extra_rows) local log_object = task or rspamd_config local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) @@ -475,6 +493,124 @@ local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows) settings.custom_rules[k].first_row()) end end + + if extra_rows then + for name, tbl_config in pairs(extra_tables) do + local trows = extra_rows[name] + if trows and #trows > 0 then + if not tbl_config.schema_uploaded then + local err = upload_extra_table_schema(upstream, ev_base, rspamd_config, tbl_config) + if err then + rspamd_logger.errx(log_object, 'failed to upload schema for extra table %s: %s', name, err) + else + tbl_config.schema_uploaded = true + end + end + + if tbl_config.schema_uploaded then + send_data('extra table (' .. name .. ')', trows, tbl_config.insert_query()) + end + end + end + end +end + +--[[ +Register an extra Clickhouse table from another plugin. +@param opts table with fields: + - name: unique registration identifier + - table_name: Clickhouse table name + - schema: CREATE TABLE statement + - insert_query: function returning INSERT statement + - get_row: function(task) returning row data table, array of rows, or nil + - retention: optional {enable, period_months, method} +@return success, error_message +]] +local function register_extra_table(opts) + if not settings.upstream then + return false, 'clickhouse plugin is not configured' + end + + if not opts.name or type(opts.name) ~= 'string' or #opts.name == 0 then + return false, 'name is required and must be a non-empty string' + end + + if not opts.table_name or type(opts.table_name) ~= 'string' or #opts.table_name == 0 then + return false, 'table_name is required and must be a non-empty string' + end + + if not opts.schema or type(opts.schema) ~= 'string' or #opts.schema == 0 then + return false, 'schema is required and must be a non-empty string' + end + + if type(opts.insert_query) ~= 'function' then + return false, 'insert_query must be a function' + end + + if type(opts.get_row) ~= 'function' then + return false, 'get_row must be a function' + end + + if extra_tables[opts.name] then + return false, string.format('extra table "%s" is already registered', opts.name) + end + + local retention = { enable = false } + if opts.retention then + if type(opts.retention) ~= 'table' then + return false, 'retention must be a table' + end + retention.enable = opts.retention.enable or false + retention.period_months = opts.retention.period_months or settings.retention.period_months + retention.method = opts.retention.method or settings.retention.method + + if retention.method ~= 'drop' and retention.method ~= 'detach' then + return false, 'retention.method must be "drop" or "detach"' + end + if type(retention.period_months) ~= 'number' or + retention.period_months < 1 or + retention.period_months > 1000 then + return false, 'retention.period_months must be between 1 and 1000' + end + end + + extra_tables[opts.name] = { + name = opts.name, + table_name = opts.table_name, + schema = opts.schema, + insert_query = opts.insert_query, + get_row = opts.get_row, + retention = retention, + schema_uploaded = false, + } + + rspamd_logger.infox(rspamd_config, 'registered clickhouse extra table: %s (table: %s)', + opts.name, opts.table_name) + + return true, nil +end + +local function unregister_extra_table(name) + if extra_tables[name] then + extra_tables[name] = nil + extra_table_rows[name] = nil + rspamd_logger.infox(rspamd_config, 'unregistered clickhouse extra table: %s', name) + return true + end + return false +end + +local function get_extra_tables() + local result = {} + for name, config in pairs(extra_tables) do + result[name] = { + name = config.name, + table_name = config.table_name, + schema_uploaded = config.schema_uploaded, + retention = config.retention, + } + end + return result end local function clickhouse_collect(task) @@ -922,6 +1058,30 @@ local function clickhouse_collect(task) table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task))) end + for name, tbl_config in pairs(extra_tables) do + if not extra_table_rows[name] then + extra_table_rows[name] = {} + end + + local ok, row_data = pcall(tbl_config.get_row, task) + if ok and row_data then + -- get_row can return a single row or array of rows + if row_data[1] and type(row_data[1]) == 'table' then + for _, single_row in ipairs(row_data) do + local tsv = lua_clickhouse.row_to_tsv(single_row) + table.insert(extra_table_rows[name], tsv) + used_memory = used_memory + #tsv + end + else + local tsv = lua_clickhouse.row_to_tsv(row_data) + table.insert(extra_table_rows[name], tsv) + used_memory = used_memory + #tsv + end + elseif not ok then + rspamd_logger.errx(task, 'error in get_row for extra table %s: %s', name, row_data) + end + end + local dominated = false if settings.limits.max_rows > 0 and nrows > settings.limits.max_rows * 10 then dominated = true @@ -939,6 +1099,7 @@ local function clickhouse_collect(task) used_memory = 0 data_rows = {} custom_rows = {} + extra_table_rows = {} collectgarbage() return end @@ -953,11 +1114,12 @@ local function clickhouse_collect(task) used_memory, settings.limits.max_memory) end -local function do_remove_partition(ev_base, cfg, table_name, partition) +local function do_remove_partition(ev_base, cfg, table_name, partition, method_override) lua_util.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition) local upstream = settings.upstream:get_upstream_round_robin() local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION '${partition}'" - local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH' + local method = method_override or settings.retention.method + local remove_method = (method == 'drop') and 'DROP' or 'DETACH' local sql_params = { ['table_name'] = table_name, ['remove_method'] = remove_method, @@ -1084,13 +1246,15 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) if need_collect then local saved_rows = data_rows local saved_custom = custom_rows + local saved_extra = extra_table_rows nrows = 0 last_collection = now used_memory = 0 data_rows = {} custom_rows = {} + extra_table_rows = {} - clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom) + clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom, saved_extra) if settings.collect_garbage then collectgarbage() @@ -1116,10 +1280,35 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) "HAVING max(max_date) < toDate(now() - interval ${month} month)" local table_names = { 'rspamd' } + local table_retention = { + rspamd = { + period_months = settings.retention.period_months, + method = settings.retention.method + } + } + + for _, tbl_config in pairs(extra_tables) do + if tbl_config.retention and tbl_config.retention.enable then + table.insert(table_names, tbl_config.table_name) + table_retention[tbl_config.table_name] = { + period_months = tbl_config.retention.period_months, + method = tbl_config.retention.method + } + end + end + + -- Use minimum retention period to find all candidate partitions + local min_period = settings.retention.period_months + for _, ret in pairs(table_retention) do + if ret.period_months < min_period then + min_period = ret.period_months + end + end + local tables = table.concat(table_names, "', '") local sql_params = { tables = tables, - month = settings.retention.period_months, + month = min_period, } local sql = lua_util.template(partition_to_remove_sql, sql_params) @@ -1134,7 +1323,12 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) settings['server'], err) else fun.each(function(row) - do_remove_partition(ev_base, cfg, row.table, row.partition) + local ret = table_retention[row.table] + if ret then + do_remove_partition(ev_base, cfg, row.table, row.partition, ret.method) + else + do_remove_partition(ev_base, cfg, row.table, row.partition) + end end, rows) end @@ -1532,14 +1726,16 @@ if opts then final_call = true local saved_rows = data_rows local saved_custom = custom_rows + local saved_extra = extra_table_rows nrows = 0 data_rows = {} used_memory = 0 custom_rows = {} + extra_table_rows = {} clickhouse_send_data(task, nil, 'final collection', - saved_rows, saved_custom) + saved_rows, saved_custom, saved_extra) if settings.collect_garbage then collectgarbage() @@ -1589,5 +1785,15 @@ if opts then end end end) + + -- Expose API for other plugins to register extra tables + rspamd_plugins['clickhouse'] = { + register_extra_table = register_extra_table, + unregister_extra_table = unregister_extra_table, + get_extra_tables = get_extra_tables, + is_enabled = function() + return settings.upstream ~= nil + end, + } end end