]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Rework] Rework Clickhouse plugin to use the new API
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 6 Aug 2018 16:56:17 +0000 (17:56 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 6 Aug 2018 17:00:34 +0000 (18:00 +0100)
src/plugins/lua/clickhouse.lua

index a4fd07034023c98d1ea91b2f319481718e70e053..3eebd5b5d7dae49754c26572217d0eecebbfe3de 100644 (file)
@@ -28,13 +28,11 @@ if confighelp then
   return
 end
 
-local E = {}
-
 local main_rows = {}
 local attachment_rows = {}
 local urls_rows = {}
 local emails_rows = {}
-local specific_rows = {}
+--local specific_rows = {}
 local asn_rows = {}
 local symbols_rows = {}
 local custom_rows = {}
@@ -252,34 +250,8 @@ local function clickhouse_asn_row(tname)
   return elt
 end
 
-local function clickhouse_first_row()
-  table.insert(main_rows, clickhouse_main_row(settings['table']))
-  if settings['attachments_table'] then
-    table.insert(attachment_rows,
-      clickhouse_attachments_row(settings['attachments_table']))
-  end
-  if settings['urls_table'] then
-    table.insert(urls_rows,
-      clickhouse_urls_row(settings['urls_table']))
-  end
-  if settings['emails_table'] then
-    table.insert(emails_rows,
-        clickhouse_emails_row(settings['emails_table']))
-  end
-  if settings['asn_table'] then
-    table.insert(asn_rows,
-      clickhouse_asn_row(settings['asn_table']))
-  end
-  if settings.enable_symbols and settings['symbols_table'] then
-    table.insert(symbols_rows,
-      clickhouse_symbols_row(settings['symbols_table']))
-  end
-
-  for k,rule in pairs(settings.custom_rules) do
-    if not custom_rows[k] then custom_rows[k] = {} end
-    table.insert(custom_rows[k],
-        rule.first_row())
-  end
+local function today(ts)
+  return os.date('%Y-%m-%d', ts)
 end
 
 local function clickhouse_check_symbol(task, symbols, need_score)
@@ -301,174 +273,73 @@ local function clickhouse_send_data(task)
   local upstream = settings.upstream:get_upstream_round_robin()
   local ip_addr = upstream:get_addr():to_string(true)
 
-  local function gen_http_cb(what, how_many)
-    return function (err_message, code, data, _)
-      if code ~= 200 or err_message then
-        if not err_message then err_message = data end
-        rspamd_logger.errx(task, "cannot send %s data to clickhouse server %s: %s",
-            what, ip_addr, err_message)
-        upstream:fail()
-      else
-        rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s",
-            how_many - 1, what, ip_addr)
-        upstream:ok()
-      end
+  local function gen_success_cb(what, how_many)
+    return function (_, _)
+      rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s",
+          how_many, what, ip_addr)
+      upstream:ok()
     end
   end
 
-  local body = table.concat(main_rows, ' ')
-  if not rspamd_http.request({
-      task = task,
-      url = connect_prefix .. ip_addr,
-      body = body,
-      callback = gen_http_cb('generic data', #main_rows),
-      gzip = settings.use_gzip,
-      mime_type = 'text/plain',
-      timeout = settings['timeout'],
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-    }) then
-     rspamd_logger.errx(task, "cannot send data to clickhouse server %s: cannot make request",
-        settings['server'])
+  local function gen_fail_cb(what, how_many)
+    return function (_, err)
+      rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s",
+          how_many, what, ip_addr, err)
+      upstream:fail()
+    end
   end
 
-  if #attachment_rows > 1 then
-    body = table.concat(attachment_rows, ' ')
-    if not rspamd_http.request({
+  local function send_data(what, tbl, query)
+    local ch_params = {
       task = task,
-      url = connect_prefix .. ip_addr,
-      body = body,
-      callback = gen_http_cb('attachments data', #attachment_rows),
-      mime_type = 'text/plain',
-      timeout = settings['timeout'],
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-    }) then
-      rspamd_logger.errx(task, "cannot send attachments to clickhouse server %s: cannot make request",
-        settings['server'])
+    }
+    local ret = lua_clickhouse.insert(upstream, settings, ch_params,
+        query, tbl,
+        gen_success_cb(what, #tbl),
+        gen_fail_cb(what, #tbl))
+    if not ret then
+      rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s",
+          #tbl, what, ip_addr, 'cannot make HTTP request')
     end
   end
+
+  send_data('generic data', main_rows,
+      clickhouse_main_row(settings['table']))
+
+
+  if #attachment_rows > 1 then
+    send_data('attachments data', attachment_rows,
+        clickhouse_attachments_row(settings.attachments_table))
+  end
+
   if #urls_rows > 1 then
-    body = table.concat(urls_rows, ' ')
-    if not rspamd_http.request({
-      task = task,
-      url = connect_prefix .. ip_addr,
-      body = body,
-      callback = gen_http_cb('urls data', #urls_rows),
-      mime_type = 'text/plain',
-      timeout = settings['timeout'],
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-    }) then
-      rspamd_logger.errx(task, "cannot send urls to clickhouse server %s: cannot make request",
-        settings['server'])
-    end
+    send_data('urls data', urls_rows,
+        clickhouse_urls_row(settings.urls_table))
   end
+
   if #emails_rows > 1 then
-    body = table.concat(emails_rows, ' ')
-    if not rspamd_http.request({
-      task = task,
-      url = connect_prefix .. ip_addr,
-      body = body,
-      callback = gen_http_cb('emails data', #emails_rows),
-      mime_type = 'text/plain',
-      timeout = settings['timeout'],
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-    }) then
-      rspamd_logger.errx(task, "cannot send emails to clickhouse server %s: cannot make request",
-          settings['server'])
-    end
+    send_data('emails data', emails_rows,
+        clickhouse_emails_row(settings.emails_table))
   end
+
   if #asn_rows > 1 then
-    body = table.concat(asn_rows, ' ')
-    if not rspamd_http.request({
-      task = task,
-      url = connect_prefix .. ip_addr,
-      body = body,
-      callback = gen_http_cb('asn data', #asn_rows),
-      mime_type = 'text/plain',
-      timeout = settings['timeout'],
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-    }) then
-      rspamd_logger.errx(task, "cannot send asn info to clickhouse server %s: cannot make request",
-        settings['server'])
-    end
+    send_data('asn data', asn_rows,
+        clickhouse_asn_row(settings.asn_table))
   end
 
   if #symbols_rows > 1 then
-    body = table.concat(symbols_rows, ' ')
-    if not rspamd_http.request({
-      task = task,
-      url = connect_prefix .. ip_addr,
-      body = body,
-      callback = gen_http_cb('symbols data', #symbols_rows),
-      mime_type = 'text/plain',
-      timeout = settings['timeout'],
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-    }) then
-      rspamd_logger.errx(task, "cannot send symbols info to clickhouse server %s: cannot make request",
-        settings['server'])
-    end
-  end
-
-  for k,specific in pairs(specific_rows) do
-    if #specific > 1 then
-      body = table.concat(specific, ' ')
-      if not rspamd_http.request({
-        task = task,
-        url = connect_prefix .. ip_addr,
-        body = body,
-        callback = gen_http_cb('domain specific data ('..k..')', #specific),
-        mime_type = 'text/plain',
-        timeout = settings['timeout'],
-        no_ssl_verify = settings.no_ssl_verify,
-        user = settings.user,
-        password = settings.password,
-      }) then
-        rspamd_logger.errx(task, "cannot send data for domain %s to clickhouse server %s: cannot make request",
-          k, settings['server'])
-      end
-    end
+    send_data('symbols data', symbols_rows,
+        clickhouse_symbols_row(settings.symbols_table))
   end
 
   for k,crows in pairs(custom_rows) do
     if #crows > 1 then
-      body = table.concat(crows, ' ')
-      if not rspamd_http.request({
-        task = task,
-        url = connect_prefix .. ip_addr,
-        body = body,
-        callback = gen_http_cb('custom data ('..k..')', #crows),
-        mime_type = 'text/plain',
-        timeout = settings['timeout'],
-        no_ssl_verify = settings.no_ssl_verify,
-        user = settings.user,
-        password = settings.password,
-      }) then
-        rspamd_logger.errx(task, "cannot send custom data %s to clickhouse server %s: cannot make request",
-            k, settings['server'])
-      end
+      send_data('custom data ('..k..')', settings.custom_rules[k].first_row(),
+          crows)
     end
   end
 end
 
-local function clickhouse_quote(str)
-  if str then
-    return str:gsub('[\'\\]', '\\%1'):lower()
-  else
-    return ''
-  end
-end
-
 local function clickhouse_collect(task)
   if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then return end
 
@@ -615,16 +486,34 @@ local function clickhouse_collect(task)
     gmt = false
   })
 
-  local elt = string.format("(today(),%d,'%s','%s','%s',%.2f,%d,%d,'%s','%s','%s','%s','%s','%s',%d,'%s','%s','%s','%s','%s','%s','%s')",
-        timestamp,
-        clickhouse_quote(from_domain), clickhouse_quote(mime_domain), ip_str, score,
-        nrcpts, task:get_size(), whitelist, bayes, fuzzy, fann,
-        dkim, dmarc, nurls, task:get_metric_action('default'),
-        clickhouse_quote(from_user), clickhouse_quote(mime_user),
-        clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
-        clickhouse_quote(list_id), task:get_digest())
-  table.insert(main_rows, elt)
+  local action = task:get_metric_action('default')
+
+  table.insert(main_rows, {
+    today(timestamp),
+    timestamp,
+    from_domain,
+    mime_domain,
+    ip_str,
+    score,
+    nrcpts,
+    task:get_size(),
+    whitelist,
+    bayes,
+    fuzzy,
+    fann,
+    dkim,
+    dmarc,
+    nurls,
+    action,
+    from_user,
+    mime_user,
+    rcpt_user,
+    rcpt_domain,
+    list_id,
+    task:get_digest()
+  })
 
+--[[ TODO: has been broken
   if settings['from_map'] and dkim == 'allow' then
     -- Use dkim
     local das = task:get_symbol(settings['dkim_allow_symbols'][1])
@@ -632,16 +521,14 @@ local function clickhouse_collect(task)
       for _,dkim_domain in ipairs(das[1]['options']) do
         local specific = settings.from_map:get_key(dkim_domain)
         if specific then
-          if not specific_rows[specific] then
-            local first = clickhouse_main_row(specific)
-            specific_rows[specific] = {first}
-          end
+          specific_rows[specific] = {}
           table.insert(specific_rows[specific], elt)
         end
       end
     end
 
   end
+--]]
 
   -- Attachments step
   local attachments_fnames = {}
@@ -652,23 +539,24 @@ local function clickhouse_collect(task)
     local fname = part:get_filename()
 
     if fname then
-      table.insert(attachments_fnames, string.format("'%s'", clickhouse_quote(fname)))
+      table.insert(attachments_fnames, fname)
       local type, subtype = part:get_type()
-      table.insert(attachments_ctypes, string.format("'%s/%s'",
-        clickhouse_quote(type), clickhouse_quote(subtype)))
-      table.insert(attachments_lengths, string.format("%s", tostring(part:get_length())))
-      table.insert(attachments_digests, string.format("'%s'", string.sub(part:get_digest(), 1, 16)))
+      table.insert(attachments_ctypes, string.format("%s/%s",
+          type, subtype))
+      table.insert(attachments_lengths, part:get_length())
+      table.insert(attachments_digests, string.sub(part:get_digest(), 1, 16))
     end
   end
 
   if #attachments_fnames > 0 then
-    elt = string.format("(today(),'%s',[%s],[%s],[%s],[%s])",
+    table.insert(attachment_rows, {
+      today(timestamp),
       task:get_digest(),
-      table.concat(attachments_fnames, ','),
-      table.concat(attachments_ctypes, ','),
-      table.concat(attachments_lengths, ','),
-      table.concat(attachments_digests, ','))
-    table.insert(attachment_rows, elt)
+      attachments_fnames,
+      attachments_ctypes,
+      attachments_lengths,
+      attachments_digests,
+    })
   end
 
   -- Urls step
@@ -676,40 +564,39 @@ local function clickhouse_collect(task)
   local urls_urls = {}
   if task:has_urls(false) then
     for _,u in ipairs(task:get_urls(false)) do
-      table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld())))
+      table.insert(urls_tlds, u:get_tld())
       if settings['full_urls'] then
-        table.insert(urls_urls, string.format("'%s'",
-          clickhouse_quote(u:get_text())))
+        table.insert(urls_urls, u:get_text())
       else
-        table.insert(urls_urls, string.format("'%s'",
-          clickhouse_quote(u:get_host())))
+        table.insert(urls_urls, u:get_host())
       end
     end
   end
 
   if #urls_tlds > 0 then
-    elt = string.format("(today(),'%s',[%s],[%s])",
+    table.insert(urls_rows, {
+      today(timestamp),
       task:get_digest(),
-      table.concat(urls_tlds, ','),
-      table.concat(urls_urls, ','))
-    table.insert(urls_rows, elt)
+      urls_tlds,
+      urls_urls
+    })
   end
 
   -- Emails step
   local emails = {}
   if task:has_urls(true) then
     for _,u in ipairs(task:get_emails()) do
-      table.insert(emails, string.format("'%s'", clickhouse_quote(
-          string.format('%s@%s', u:get_user(), u:get_host())
-      )))
+      table.insert(emails,
+          string.format('%s@%s', u:get_user(), u:get_host()))
     end
   end
 
   if #emails > 0 then
-    elt = string.format("(today(),'%s',[%s])",
-        task:get_digest(),
-        table.concat(emails, ','))
-    table.insert(emails_rows, elt)
+    table.insert(emails_rows, {
+      today(timestamp),
+      task:get_digest(),
+      emails,
+    })
   end
 
   -- ASN information
@@ -728,10 +615,13 @@ local function clickhouse_collect(task)
     if ret then
       ipnet = ret
     end
-    elt = string.format("(today(),'%s','%s','%s','%s')",
+    table.insert(asn_rows, {
+      today(timestamp),
       task:get_digest(),
-      clickhouse_quote(asn), clickhouse_quote(country), clickhouse_quote(ipnet))
-    table.insert(asn_rows, elt)
+      asn,
+      country,
+      ipnet
+    })
   end
 
   -- Symbols info
@@ -742,25 +632,22 @@ local function clickhouse_collect(task)
     local options_tab = {}
 
     for _,s in ipairs(symbols) do
-      table.insert(syms_tab, string.format("'%s'",
-        clickhouse_quote(s.name or '')))
-      table.insert(scores_tab, string.format('%.3f', s.score))
+      table.insert(syms_tab, s.name or '')
+      table.insert(scores_tab, s.score)
 
       if s.options then
-        table.insert(options_tab, string.format("'%s'",
-          clickhouse_quote(table.concat(s.options, ','))))
+        table.insert(options_tab, table.concat(s.options, ','))
       else
         table.insert(options_tab, "''");
       end
     end
 
-    elt = string.format("(today(),'%s',[%s],[%s],[%s])",
-      task:get_digest(),
-      table.concat(syms_tab, ','),
-      table.concat(scores_tab, ','),
-      table.concat(options_tab, ','))
-
-    table.insert(symbols_rows, elt)
+    table.insert(symbols_rows, {
+      today(timestamp),
+      syms_tab,
+      scores_tab,
+      options_tab
+    })
   end
 
   -- Custom data
@@ -778,11 +665,9 @@ local function clickhouse_collect(task)
     attachment_rows = {}
     urls_rows = {}
     emails_rows = {}
-    specific_rows = {}
     asn_rows = {}
     symbols_rows = {}
     custom_rows = {}
-    clickhouse_first_row()
   end
 end
 
@@ -981,8 +866,6 @@ if opts then
         return
       end
 
-
-      clickhouse_first_row()
       rspamd_config:register_symbol({
         name = 'CLICKHOUSE_COLLECT',
         type = 'idempotent',