]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Migrate CH data to a fat table
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 8 Aug 2018 12:59:52 +0000 (13:59 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 8 Aug 2018 13:01:37 +0000 (14:01 +0100)
src/plugins/lua/clickhouse.lua

index 54128235b520d4be2c778ee8a19873106c32ffe7..6eab8696124ce567a481401e5f7f845230057cbb 100644 (file)
@@ -28,13 +28,7 @@ if confighelp then
   return
 end
 
-local main_rows = {}
-local attachment_rows = {}
-local urls_rows = {}
-local emails_rows = {}
---local specific_rows = {}
-local asn_rows = {}
-local symbols_rows = {}
+local data_rows = {}
 local custom_rows = {}
 local nrows = 0
 local schema_version = 2 -- Current schema version
@@ -143,7 +137,7 @@ local migrations = {
 }
 
 
-local function clickhouse_main_row(tname)
+local function clickhouse_main_row(res)
   local fields = {
     'Date',
     'TS',
@@ -168,73 +162,52 @@ local function clickhouse_main_row(tname)
     'ListId',
     'Digest'
   }
-  local elt = string.format('INSERT INTO %s (%s) ',
-    tname, table.concat(fields, ','))
 
-  return elt
+  for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
-local function clickhouse_attachments_row(tname)
-  local attachement_fields = {
-    'Date',
-    'Digest',
+local function clickhouse_attachments_row(res)
+  local fields = {
     'Attachments.FileName',
     'Attachments.ContentType',
     'Attachments.Length',
     'Attachments.Digest',
   }
-  local elt = string.format('INSERT INTO %s (%s) ',
-    tname, table.concat(attachement_fields, ','))
-  return elt
+
+  for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
-local function clickhouse_urls_row(tname)
-  local urls_fields = {
-    'Date',
-    'Digest',
+local function clickhouse_urls_row(res)
+  local fields = {
     'Urls.Tld',
     'Urls.Url',
   }
-  local elt = string.format('INSERT INTO %s (%s) ',
-    tname, table.concat(urls_fields, ','))
-  return elt
+  for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
-local function clickhouse_emails_row(tname)
-  local emails_fields = {
-    'Date',
-    'Digest',
+local function clickhouse_emails_row(res)
+  local fields = {
     'Emails',
   }
-  local elt = string.format('INSERT INTO %s (%s) ',
-      tname, table.concat(emails_fields, ','))
-  return elt
+  for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
-local function clickhouse_symbols_row(tname)
-  local symbols_fields = {
-    'Date',
-    'Digest',
+local function clickhouse_symbols_row(res)
+  local fields = {
     'Symbols.Names',
     'Symbols.Scores',
     'Symbols.Options',
   }
-  local elt = string.format('INSERT INTO %s (%s) ',
-    tname, table.concat(symbols_fields, ','))
-  return elt
+  for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
-local function clickhouse_asn_row(tname)
-  local asn_fields = {
-    'Date',
-    'Digest',
+local function clickhouse_asn_row(res)
+  local fields = {
     'ASN',
     'Country',
     'IPNet',
   }
-  local elt = string.format('INSERT INTO %s (%s) ',
-    tname, table.concat(asn_fields, ','))
-  return elt
+  for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
 local function today(ts)
@@ -291,34 +264,19 @@ local function clickhouse_send_data(task)
     end
   end
 
-  send_data('generic data', main_rows,
-      clickhouse_main_row(settings['table']))
+  local fields = {}
+  clickhouse_main_row(fields)
+  clickhouse_attachments_row(fields)
+  clickhouse_urls_row(fields)
+  clickhouse_emails_row(fields)
+  clickhouse_asn_row(fields)
 
-
-  if #attachment_rows > 1 then
-    send_data('attachments data', attachment_rows,
-        clickhouse_attachments_row(settings.attachments_table))
-  end
-
-  if #urls_rows > 1 then
-    send_data('urls data', urls_rows,
-        clickhouse_urls_row(settings.urls_table))
-  end
-
-  if #emails_rows > 1 then
-    send_data('emails data', emails_rows,
-        clickhouse_emails_row(settings.emails_table))
+  if settings.enable_symbols then
+    clickhouse_symbols_row(fields)
   end
 
-  if #asn_rows > 1 then
-    send_data('asn data', asn_rows,
-        clickhouse_asn_row(settings.asn_table))
-  end
-
-  if #symbols_rows > 1 then
-    send_data('symbols data', symbols_rows,
-        clickhouse_symbols_row(settings.symbols_table))
-  end
+  send_data('generic data', data_rows,
+      string.format('INSERT INTO rspamd (%s)', table.concat(fields, ',')))
 
   for k,crows in pairs(custom_rows) do
     if #crows > 1 then
@@ -477,7 +435,7 @@ local function clickhouse_collect(task)
   local action = task:get_metric_action('default')
   local digest = task:get_digest()
 
-  table.insert(main_rows, {
+  local row = {
     today(timestamp),
     timestamp,
     from_domain,
@@ -499,25 +457,8 @@ local function clickhouse_collect(task)
     rcpt_user,
     rcpt_domain,
     list_id,
-    task:get_digest()
-  })
-
---[[ TODO: has been broken
-  if settings['from_map'] and dkim == 'allow' then
-    -- Use dkim
-    local das = task:get_symbol(settings['dkim_allow_symbols'][1])
-    if ((das or E)[1] or E).options then
-      for _,dkim_domain in ipairs(das[1]['options']) do
-        local specific = settings.from_map:get_key(dkim_domain)
-        if specific then
-          specific_rows[specific] = {}
-          table.insert(specific_rows[specific], elt)
-        end
-      end
-    end
-
-  end
---]]
+    digest
+  }
 
   -- Attachments step
   local attachments_fnames = {}
@@ -538,14 +479,15 @@ local function clickhouse_collect(task)
   end
 
   if #attachments_fnames > 0 then
-    table.insert(attachment_rows, {
-      today(timestamp),
-      digest,
-      attachments_fnames,
-      attachments_ctypes,
-      attachments_lengths,
-      attachments_digests,
-    })
+    table.insert(row, attachments_fnames)
+    table.insert(row,  attachments_ctypes)
+    table.insert(row,  attachments_lengths)
+    table.insert(row,   attachments_digests)
+  else
+    table.insert(row, {})
+    table.insert(row, {})
+    table.insert(row, {})
+    table.insert(row, {})
   end
 
   -- Urls step
@@ -563,58 +505,43 @@ local function clickhouse_collect(task)
   end
 
   if #urls_tlds > 0 then
-    table.insert(urls_rows, {
-      today(timestamp),
-      digest,
-      urls_tlds,
-      urls_urls
-    })
+    table.insert(row, urls_tlds)
+    table.insert(row, urls_urls)
+  else
+    table.insert(row, {})
+    table.insert(row, {})
   end
 
   -- Emails step
-  local emails = {}
   if task:has_urls(true) then
-    for _,u in ipairs(task:get_emails()) do
-      table.insert(emails,
-          string.format('%s@%s', u:get_user(), u:get_host()))
-    end
-  end
-
-  if #emails > 0 then
-    table.insert(emails_rows, {
-      today(timestamp),
-      digest,
-      emails,
-    })
+    table.insert(row, fun.totable(fun.map(function(u)
+      return string.format('%s@%s', u:get_user(), u:get_host())
+    end, task:get_emails())))
+  else
+    table.insert(row, {})
   end
 
   -- ASN information
-  if settings['asn_table'] then
-    local asn, country, ipnet = '--', '--', '--'
-    local pool = task:get_mempool()
-    ret = pool:get_variable("asn")
-    if ret then
-      asn = ret
-    end
-    ret = pool:get_variable("country")
-    if ret then
-      country = ret:sub(1, 2)
-    end
-    ret = pool:get_variable("ipnet")
-    if ret then
-      ipnet = ret
-    end
-    table.insert(asn_rows, {
-      today(timestamp),
-      digest,
-      asn,
-      country,
-      ipnet
-    })
+  local asn, country, ipnet = '--', '--', '--'
+  local pool = task:get_mempool()
+  ret = pool:get_variable("asn")
+  if ret then
+    asn = ret
   end
+  ret = pool:get_variable("country")
+  if ret then
+    country = ret:sub(1, 2)
+  end
+  ret = pool:get_variable("ipnet")
+  if ret then
+    ipnet = ret
+  end
+  table.insert(row, asn)
+  table.insert(row, country)
+  table.insert(row, ipnet)
 
   -- Symbols info
-  if settings.enable_symbols and settings['symbols_table'] then
+  if settings.enable_symbols then
     local symbols = task:get_symbols_all()
     local syms_tab = {}
     local scores_tab = {}
@@ -630,14 +557,9 @@ local function clickhouse_collect(task)
         table.insert(options_tab, '');
       end
     end
-
-    table.insert(symbols_rows, {
-      today(timestamp),
-      digest,
-      syms_tab,
-      scores_tab,
-      options_tab
-    })
+    table.insert(row, syms_tab)
+    table.insert(row, scores_tab)
+    table.insert(row, options_tab)
   end
 
   -- Custom data
@@ -647,17 +569,13 @@ local function clickhouse_collect(task)
   end
 
   nrows = nrows + 1
+  table.insert(data_rows, row)
   rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
 
   if nrows > settings['limit'] then
     clickhouse_send_data(task)
     nrows = 0
-    main_rows = {}
-    attachment_rows = {}
-    urls_rows = {}
-    emails_rows = {}
-    asn_rows = {}
-    symbols_rows = {}
+    data_rows = {}
     custom_rows = {}
   end
 end