]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add extra tables API for clickhouse plugin 5810/head
authorVsevolod Stakhov <vsevolod@rspamd.com>
Wed, 31 Dec 2025 10:54:55 +0000 (10:54 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Wed, 31 Dec 2025 11:21:40 +0000 (11:21 +0000)
Allow other plugins to dynamically register custom Clickhouse tables
via rspamd_plugins['clickhouse'].register_extra_table(). Supports
per-table schemas, row callbacks (single or multiple rows), and
independent retention settings.

src/plugins/lua/clickhouse.lua

index 5454c559d8c3f7b19748705036b524373f4e926a..8e522d05baa3f7459b6e6816c2e491e545f7b101 100644 (file)
@@ -35,6 +35,9 @@ local last_collection = 0
 local final_call = false -- If the final collection has been started
 local schema_version = 9 -- Current schema version
 
+local extra_tables = {}
+local extra_table_rows = {}
+
 local settings = {
   limits = { -- Collection limits
     max_rows = 1000, -- How many rows are allowed (0 for disable this)
@@ -407,7 +410,22 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table,
   return false
 end
 
-local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
+local function upload_extra_table_schema(upstream, ev_base, cfg, tbl_config)
+  local ch_params = {
+    ev_base = ev_base,
+    config = cfg,
+  }
+  local sql = lua_util.template(tbl_config.schema, settings)
+  local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+  if err then
+    return err
+  end
+  rspamd_logger.infox(rspamd_config, 'uploaded extra table schema for %s (table: %s)',
+      tbl_config.name, tbl_config.table_name)
+  return nil
+end
+
+local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows, extra_rows)
   local log_object = task or rspamd_config
   local upstream = settings.upstream:get_upstream_round_robin()
   local ip_addr = upstream:get_addr():to_string(true)
@@ -475,6 +493,124 @@ local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
           settings.custom_rules[k].first_row())
     end
   end
+
+  if extra_rows then
+    for name, tbl_config in pairs(extra_tables) do
+      local trows = extra_rows[name]
+      if trows and #trows > 0 then
+        if not tbl_config.schema_uploaded then
+          local err = upload_extra_table_schema(upstream, ev_base, rspamd_config, tbl_config)
+          if err then
+            rspamd_logger.errx(log_object, 'failed to upload schema for extra table %s: %s', name, err)
+          else
+            tbl_config.schema_uploaded = true
+          end
+        end
+
+        if tbl_config.schema_uploaded then
+          send_data('extra table (' .. name .. ')', trows, tbl_config.insert_query())
+        end
+      end
+    end
+  end
+end
+
+--[[
+Register an extra Clickhouse table from another plugin.
+@param opts table with fields:
+  - name: unique registration identifier
+  - table_name: Clickhouse table name
+  - schema: CREATE TABLE statement
+  - insert_query: function returning INSERT statement
+  - get_row: function(task) returning row data table, array of rows, or nil
+  - retention: optional {enable, period_months, method}
+@return success, error_message
+]]
+local function register_extra_table(opts)
+  if not settings.upstream then
+    return false, 'clickhouse plugin is not configured'
+  end
+
+  if not opts.name or type(opts.name) ~= 'string' or #opts.name == 0 then
+    return false, 'name is required and must be a non-empty string'
+  end
+
+  if not opts.table_name or type(opts.table_name) ~= 'string' or #opts.table_name == 0 then
+    return false, 'table_name is required and must be a non-empty string'
+  end
+
+  if not opts.schema or type(opts.schema) ~= 'string' or #opts.schema == 0 then
+    return false, 'schema is required and must be a non-empty string'
+  end
+
+  if type(opts.insert_query) ~= 'function' then
+    return false, 'insert_query must be a function'
+  end
+
+  if type(opts.get_row) ~= 'function' then
+    return false, 'get_row must be a function'
+  end
+
+  if extra_tables[opts.name] then
+    return false, string.format('extra table "%s" is already registered', opts.name)
+  end
+
+  local retention = { enable = false }
+  if opts.retention then
+    if type(opts.retention) ~= 'table' then
+      return false, 'retention must be a table'
+    end
+    retention.enable = opts.retention.enable or false
+    retention.period_months = opts.retention.period_months or settings.retention.period_months
+    retention.method = opts.retention.method or settings.retention.method
+
+    if retention.method ~= 'drop' and retention.method ~= 'detach' then
+      return false, 'retention.method must be "drop" or "detach"'
+    end
+    if type(retention.period_months) ~= 'number' or
+        retention.period_months < 1 or
+        retention.period_months > 1000 then
+      return false, 'retention.period_months must be between 1 and 1000'
+    end
+  end
+
+  extra_tables[opts.name] = {
+    name = opts.name,
+    table_name = opts.table_name,
+    schema = opts.schema,
+    insert_query = opts.insert_query,
+    get_row = opts.get_row,
+    retention = retention,
+    schema_uploaded = false,
+  }
+
+  rspamd_logger.infox(rspamd_config, 'registered clickhouse extra table: %s (table: %s)',
+      opts.name, opts.table_name)
+
+  return true, nil
+end
+
+local function unregister_extra_table(name)
+  if extra_tables[name] then
+    extra_tables[name] = nil
+    extra_table_rows[name] = nil
+    rspamd_logger.infox(rspamd_config, 'unregistered clickhouse extra table: %s', name)
+    return true
+  end
+  return false
+end
+
+local function get_extra_tables()
+  local result = {}
+  for name, config in pairs(extra_tables) do
+    result[name] = {
+      name = config.name,
+      table_name = config.table_name,
+      schema_uploaded = config.schema_uploaded,
+      retention = config.retention,
+    }
+  end
+  return result
 end
 
 local function clickhouse_collect(task)
@@ -922,6 +1058,30 @@ local function clickhouse_collect(task)
     table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task)))
   end
 
+  for name, tbl_config in pairs(extra_tables) do
+    if not extra_table_rows[name] then
+      extra_table_rows[name] = {}
+    end
+
+    local ok, row_data = pcall(tbl_config.get_row, task)
+    if ok and row_data then
+      -- get_row can return a single row or array of rows
+      if row_data[1] and type(row_data[1]) == 'table' then
+        for _, single_row in ipairs(row_data) do
+          local tsv = lua_clickhouse.row_to_tsv(single_row)
+          table.insert(extra_table_rows[name], tsv)
+          used_memory = used_memory + #tsv
+        end
+      else
+        local tsv = lua_clickhouse.row_to_tsv(row_data)
+        table.insert(extra_table_rows[name], tsv)
+        used_memory = used_memory + #tsv
+      end
+    elseif not ok then
+      rspamd_logger.errx(task, 'error in get_row for extra table %s: %s', name, row_data)
+    end
+  end
+
   local dominated = false
   if settings.limits.max_rows > 0 and nrows > settings.limits.max_rows * 10 then
     dominated = true
@@ -939,6 +1099,7 @@ local function clickhouse_collect(task)
     used_memory = 0
     data_rows = {}
     custom_rows = {}
+    extra_table_rows = {}
     collectgarbage()
     return
   end
@@ -953,11 +1114,12 @@ local function clickhouse_collect(task)
       used_memory, settings.limits.max_memory)
 end
 
-local function do_remove_partition(ev_base, cfg, table_name, partition)
+local function do_remove_partition(ev_base, cfg, table_name, partition, method_override)
   lua_util.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition)
   local upstream = settings.upstream:get_upstream_round_robin()
   local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION '${partition}'"
-  local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
+  local method = method_override or settings.retention.method
+  local remove_method = (method == 'drop') and 'DROP' or 'DETACH'
   local sql_params = {
     ['table_name'] = table_name,
     ['remove_method'] = remove_method,
@@ -1084,13 +1246,15 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
   if need_collect then
     local saved_rows = data_rows
     local saved_custom = custom_rows
+    local saved_extra = extra_table_rows
     nrows = 0
     last_collection = now
     used_memory = 0
     data_rows = {}
     custom_rows = {}
+    extra_table_rows = {}
 
-    clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)
+    clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom, saved_extra)
 
     if settings.collect_garbage then
       collectgarbage()
@@ -1116,10 +1280,35 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
       "HAVING max(max_date) < toDate(now() - interval ${month} month)"
 
   local table_names = { 'rspamd' }
+  local table_retention = {
+    rspamd = {
+      period_months = settings.retention.period_months,
+      method = settings.retention.method
+    }
+  }
+
+  for _, tbl_config in pairs(extra_tables) do
+    if tbl_config.retention and tbl_config.retention.enable then
+      table.insert(table_names, tbl_config.table_name)
+      table_retention[tbl_config.table_name] = {
+        period_months = tbl_config.retention.period_months,
+        method = tbl_config.retention.method
+      }
+    end
+  end
+
+  -- Use minimum retention period to find all candidate partitions
+  local min_period = settings.retention.period_months
+  for _, ret in pairs(table_retention) do
+    if ret.period_months < min_period then
+      min_period = ret.period_months
+    end
+  end
+
   local tables = table.concat(table_names, "', '")
   local sql_params = {
     tables = tables,
-    month = settings.retention.period_months,
+    month = min_period,
   }
   local sql = lua_util.template(partition_to_remove_sql, sql_params)
 
@@ -1134,7 +1323,12 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
         settings['server'], err)
   else
     fun.each(function(row)
-      do_remove_partition(ev_base, cfg, row.table, row.partition)
+      local ret = table_retention[row.table]
+      if ret then
+        do_remove_partition(ev_base, cfg, row.table, row.partition, ret.method)
+      else
+        do_remove_partition(ev_base, cfg, row.table, row.partition)
+      end
     end, rows)
   end
 
@@ -1532,14 +1726,16 @@ if opts then
         final_call = true
         local saved_rows = data_rows
         local saved_custom = custom_rows
+        local saved_extra = extra_table_rows
 
         nrows = 0
         data_rows = {}
         used_memory = 0
         custom_rows = {}
+        extra_table_rows = {}
 
         clickhouse_send_data(task, nil, 'final collection',
-            saved_rows, saved_custom)
+            saved_rows, saved_custom, saved_extra)
 
         if settings.collect_garbage then
           collectgarbage()
@@ -1589,5 +1785,15 @@ if opts then
         end
       end
     end)
+
+    -- Expose API for other plugins to register extra tables
+    rspamd_plugins['clickhouse'] = {
+      register_extra_table = register_extra_table,
+      unregister_extra_table = unregister_extra_table,
+      get_extra_tables = get_extra_tables,
+      is_enabled = function()
+        return settings.upstream ~= nil
+      end,
+    }
   end
 end