return
end
-local main_rows = {}
-local attachment_rows = {}
-local urls_rows = {}
-local emails_rows = {}
---local specific_rows = {}
-local asn_rows = {}
-local symbols_rows = {}
+local data_rows = {}
local custom_rows = {}
local nrows = 0
local schema_version = 2 -- Current schema version
}
-local function clickhouse_main_row(tname)
+local function clickhouse_main_row(res)
local fields = {
'Date',
'TS',
'ListId',
'Digest'
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_attachments_row(tname)
- local attachement_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_attachments_row(res)
+ local fields = {
'Attachments.FileName',
'Attachments.ContentType',
'Attachments.Length',
'Attachments.Digest',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(attachement_fields, ','))
- return elt
+
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_urls_row(tname)
- local urls_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_urls_row(res)
+ local fields = {
'Urls.Tld',
'Urls.Url',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(urls_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_emails_row(tname)
- local emails_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_emails_row(res)
+ local fields = {
'Emails',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(emails_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_symbols_row(tname)
- local symbols_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_symbols_row(res)
+ local fields = {
'Symbols.Names',
'Symbols.Scores',
'Symbols.Options',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(symbols_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_asn_row(tname)
- local asn_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_asn_row(res)
+ local fields = {
'ASN',
'Country',
'IPNet',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(asn_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
local function today(ts)
end
end
- send_data('generic data', main_rows,
- clickhouse_main_row(settings['table']))
+ local fields = {}
+ clickhouse_main_row(fields)
+ clickhouse_attachments_row(fields)
+ clickhouse_urls_row(fields)
+ clickhouse_emails_row(fields)
+ clickhouse_asn_row(fields)
-
- if #attachment_rows > 1 then
- send_data('attachments data', attachment_rows,
- clickhouse_attachments_row(settings.attachments_table))
- end
-
- if #urls_rows > 1 then
- send_data('urls data', urls_rows,
- clickhouse_urls_row(settings.urls_table))
- end
-
- if #emails_rows > 1 then
- send_data('emails data', emails_rows,
- clickhouse_emails_row(settings.emails_table))
+ if settings.enable_symbols then
+ clickhouse_symbols_row(fields)
end
- if #asn_rows > 1 then
- send_data('asn data', asn_rows,
- clickhouse_asn_row(settings.asn_table))
- end
-
- if #symbols_rows > 1 then
- send_data('symbols data', symbols_rows,
- clickhouse_symbols_row(settings.symbols_table))
- end
+ send_data('generic data', data_rows,
+ string.format('INSERT INTO rspamd (%s)', table.concat(fields, ',')))
for k,crows in pairs(custom_rows) do
if #crows > 1 then
local action = task:get_metric_action('default')
local digest = task:get_digest()
- table.insert(main_rows, {
+ local row = {
today(timestamp),
timestamp,
from_domain,
rcpt_user,
rcpt_domain,
list_id,
- task:get_digest()
- })
-
---[[ TODO: has been broken
- if settings['from_map'] and dkim == 'allow' then
- -- Use dkim
- local das = task:get_symbol(settings['dkim_allow_symbols'][1])
- if ((das or E)[1] or E).options then
- for _,dkim_domain in ipairs(das[1]['options']) do
- local specific = settings.from_map:get_key(dkim_domain)
- if specific then
- specific_rows[specific] = {}
- table.insert(specific_rows[specific], elt)
- end
- end
- end
-
- end
---]]
+ digest
+ }
-- Attachments step
local attachments_fnames = {}
end
if #attachments_fnames > 0 then
- table.insert(attachment_rows, {
- today(timestamp),
- digest,
- attachments_fnames,
- attachments_ctypes,
- attachments_lengths,
- attachments_digests,
- })
+ table.insert(row, attachments_fnames)
+ table.insert(row, attachments_ctypes)
+ table.insert(row, attachments_lengths)
+ table.insert(row, attachments_digests)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Urls step
end
if #urls_tlds > 0 then
- table.insert(urls_rows, {
- today(timestamp),
- digest,
- urls_tlds,
- urls_urls
- })
+ table.insert(row, urls_tlds)
+ table.insert(row, urls_urls)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Emails step
- local emails = {}
if task:has_urls(true) then
- for _,u in ipairs(task:get_emails()) do
- table.insert(emails,
- string.format('%s@%s', u:get_user(), u:get_host()))
- end
- end
-
- if #emails > 0 then
- table.insert(emails_rows, {
- today(timestamp),
- digest,
- emails,
- })
+ table.insert(row, fun.totable(fun.map(function(u)
+ return string.format('%s@%s', u:get_user(), u:get_host())
+ end, task:get_emails())))
+ else
+ table.insert(row, {})
end
-- ASN information
- if settings['asn_table'] then
- local asn, country, ipnet = '--', '--', '--'
- local pool = task:get_mempool()
- ret = pool:get_variable("asn")
- if ret then
- asn = ret
- end
- ret = pool:get_variable("country")
- if ret then
- country = ret:sub(1, 2)
- end
- ret = pool:get_variable("ipnet")
- if ret then
- ipnet = ret
- end
- table.insert(asn_rows, {
- today(timestamp),
- digest,
- asn,
- country,
- ipnet
- })
+ local asn, country, ipnet = '--', '--', '--'
+ local pool = task:get_mempool()
+ ret = pool:get_variable("asn")
+ if ret then
+ asn = ret
end
+ ret = pool:get_variable("country")
+ if ret then
+ country = ret:sub(1, 2)
+ end
+ ret = pool:get_variable("ipnet")
+ if ret then
+ ipnet = ret
+ end
+ table.insert(row, asn)
+ table.insert(row, country)
+ table.insert(row, ipnet)
-- Symbols info
- if settings.enable_symbols and settings['symbols_table'] then
+ if settings.enable_symbols then
local symbols = task:get_symbols_all()
local syms_tab = {}
local scores_tab = {}
table.insert(options_tab, '');
end
end
-
- table.insert(symbols_rows, {
- today(timestamp),
- digest,
- syms_tab,
- scores_tab,
- options_tab
- })
+ table.insert(row, syms_tab)
+ table.insert(row, scores_tab)
+ table.insert(row, options_tab)
end
-- Custom data
end
nrows = nrows + 1
+ table.insert(data_rows, row)
rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
- main_rows = {}
- attachment_rows = {}
- urls_rows = {}
- emails_rows = {}
- asn_rows = {}
- symbols_rows = {}
+ data_rows = {}
custom_rows = {}
end
end