]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Convert retention logic in Clickhouse module
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 6 Aug 2018 15:31:04 +0000 (16:31 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 6 Aug 2018 17:00:34 +0000 (18:00 +0100)
src/plugins/lua/clickhouse.lua

index f3f77849d679c8c316b2bfba8a6ec680b05165f2..a4fd07034023c98d1ea91b2f319481718e70e053 100644 (file)
@@ -19,7 +19,8 @@ local rspamd_http = require "rspamd_http"
 local rspamd_lua_utils = require "lua_util"
 local upstream_list = require "rspamd_upstream_list"
 local lua_util = require "lua_util"
-local ucl = require "ucl"
+local lua_clickhouse = require "lua_clickhouse"
+local fun = require "fun"
 
 local N = "clickhouse"
 
@@ -29,7 +30,7 @@ end
 
 local E = {}
 
-local rows = {}
+local main_rows = {}
 local attachment_rows = {}
 local urls_rows = {}
 local emails_rows = {}
@@ -252,7 +253,7 @@ local function clickhouse_asn_row(tname)
 end
 
 local function clickhouse_first_row()
-  table.insert(rows, clickhouse_main_row(settings['table']))
+  table.insert(main_rows, clickhouse_main_row(settings['table']))
   if settings['attachments_table'] then
     table.insert(attachment_rows,
       clickhouse_attachments_row(settings['attachments_table']))
@@ -315,12 +316,12 @@ local function clickhouse_send_data(task)
     end
   end
 
-  local body = table.concat(rows, ' ')
+  local body = table.concat(main_rows, ' ')
   if not rspamd_http.request({
       task = task,
       url = connect_prefix .. ip_addr,
       body = body,
-      callback = gen_http_cb('generic data', #rows),
+      callback = gen_http_cb('generic data', #main_rows),
       gzip = settings.use_gzip,
       mime_type = 'text/plain',
       timeout = settings['timeout'],
@@ -622,7 +623,7 @@ local function clickhouse_collect(task)
         clickhouse_quote(from_user), clickhouse_quote(mime_user),
         clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
         clickhouse_quote(list_id), task:get_digest())
-  table.insert(rows, elt)
+  table.insert(main_rows, elt)
 
   if settings['from_map'] and dkim == 'allow' then
     -- Use dkim
@@ -773,7 +774,7 @@ local function clickhouse_collect(task)
   if nrows > settings['limit'] then
     clickhouse_send_data(task)
     nrows = 0
-    rows = {}
+    main_rows = {}
     attachment_rows = {}
     urls_rows = {}
     emails_rows = {}
@@ -785,42 +786,6 @@ local function clickhouse_collect(task)
   end
 end
 
-local function mk_remove_http_cb(upstream, params, ok_cb)
-  local function do_remove_http_cb(err_message, code, data, _)
-    if code ~= 200 or err_message then
-      if not err_message then err_message = data end
-      local ip_addr = upstream:get_addr():to_string(true)
-      rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
-              ip_addr, err_message)
-      upstream:fail()
-    else
-      upstream:ok()
-      if (ok_cb) then
-        rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
-        ok_cb(params.ev_base, params.config, data)
-      end
-    end
-  end
-  return do_remove_http_cb
-end
-
-local function clickhouse_request(upstream, ok_cb, params)
-  rspamd_logger.debugm(N, rspamd_config, "clickhouse_request: %s", params.body)
-
-  params.callback = mk_remove_http_cb(upstream, params, ok_cb)
-  params.gzip = settings.use_gzip
-  params.mime_type = 'text/plain'
-  params.timeout = settings['timeout']
-  params.no_ssl_verify = settings.no_ssl_verify
-  params.user = settings.user
-  params.password = settings.password
-  if not params.url then
-    local ip_addr = upstream:get_addr():to_string(true)
-    params.url = connect_prefix .. ip_addr
-  end
-  return rspamd_http.request(params)
-end
-
 local function do_remove_partition(ev_base, cfg, table_name, partition_id)
   rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id)
   local upstream = settings.upstream:get_upstream_round_robin()
@@ -840,39 +805,27 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id)
     cfg = cfg,
   }
 
-  if not clickhouse_request(upstream, nil, ch_params) then
-    rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
+  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+      function(_, rows)
+        rspamd_logger.infox(rspamd_config,
+            'detached partition %s:%s on server %s', table_name, partition_id,
             settings['server'])
-  end
-end
-
-local function parse_clickhouse_response(ev_base, cfg, data)
-  rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data)
-  if data == nil then
-    -- clickhouse returned no data (i.e. empty resultset): exiting
-    return
-  end
-  local function parse_string(s)
-    local parser = ucl.parser()
-    local res, err = parser:parse_string(s)
-    if not res then
-      rspamd_logger.errx(rspamd_config, 'Parser error: %s', err)
-      return nil
-    end
-    return parser:get_object()
-  end
+      end,
+      function(_, err)
+        rspamd_logger.errx(rspamd_config,
+            "cannot detach partition %s:%s from server %s: %s",
+            table_name, partition_id,
+            settings['server'], err)
+      end)
 
-  -- iterate over rows
-  local ch_rows = lua_util.str_split(data, "\n")
-  for _, plain_row in pairs(ch_rows) do
-    if plain_row and plain_row:len() > 1 then
-      local parsed_row = parse_string(plain_row)
-      do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition)
-    end
+  if not ret then
+    rspamd_logger.errx(rspamd_config,
+        "cannot detach partition %s:%s from server %s: cannot make request",
+        table_name, partition_id,
+        settings['server'])
   end
 end
 
-
 --[[
   nil   - file is not writable, do not perform removal
   0     - it's time to perform removal
@@ -923,8 +876,6 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
   end
 
   local upstream = settings.upstream:get_upstream_round_robin()
-  local ip_addr = upstream:get_addr():to_string(true)
-
   local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);"
 
   local table_names = {}
@@ -938,13 +889,23 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
   }
   local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
 
+
   local ch_params = {
-    body = sql,
-    url  = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr),
     ev_base = ev_base,
     config = cfg,
   }
-  if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
+  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+      function(_, rows)
+        fun.each(function(row)
+          do_remove_partition(ev_base, cfg, row.table, row.partition)
+        end, rows)
+      end,
+      function(_, err)
+        rspamd_logger.errx(rspamd_config,
+            "cannot send data to clickhouse server %s: %s",
+            settings['server'], err)
+      end)
+  if not ret then
     rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
             settings['server'])
   end