From: Vsevolod Stakhov Date: Wed, 23 Nov 2016 13:14:01 +0000 (+0000) Subject: [Feature] Add clickhouse plugin X-Git-Tag: 1.4.1~93 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cc0f0d2377548291584a0c3262e6b823edda93c8;p=thirdparty%2Frspamd.git [Feature] Add clickhouse plugin --- diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua new file mode 100644 index 0000000000..659c5d5db5 --- /dev/null +++ b/src/plugins/lua/clickhouse.lua @@ -0,0 +1,464 @@ +--[[ +Copyright (c) 2016, Vsevolod Stakhov + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local rspamd_logger = require 'rspamd_logger' +local rspamd_http = require "rspamd_http" + +local rows = {} +local attachment_rows = {} +local urls_rows = {} +local specific_rows = {} +local nrows = 0 + +local settings = { + limit = 1000, + server = "localhost:8123", + timeout = 5.0, + bayes_spam_symbols = {'BAYES_SPAM'}, + bayes_ham_symbols = {'BAYES_HAM'}, + fann_symbols = {'FANN_SCORE'}, + fuzzy_symbols = {'FUZZY_DENIED'}, + whitelist_symbols = {'WHITELIST_DKIM', 'WHITELIST_SPF_DKIM', 'WHITELIST_DMARC'}, + dkim_allow_symbols = {'R_DKIM_ALLOW'}, + dkim_reject_symbols = {'R_DKIM_REJECT'}, + dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'}, + dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'}, + table = 'rspamd', + attachments_table = 'rspamd_attachments', + urls_table = 'rspamd_urls', + ipmask = 19, + full_urls = false, + from_tables = nil +} + +local function clickhouse_main_row(tname) + local fields = { + 'Date', + 'TS', + 'From', + 'MimeFrom', + 'IP', + 'Score', + 'NRcpt', + 'Size', + 'IsWhitelist', + 'IsBayes', + 'IsFuzzy', + 'IsFann', + 'IsDkim', + 'IsDmarc', + 'NUrls', + 'Action', + 'FromUser', + 'MimeUser', + 'RcptUser', + 'RcptDomain', + 'ListId', + 'Digest' + } + local elt = string.format('INSERT INTO %s (%s) VALUES ', + tname, table.concat(fields, ',')) + + return elt +end + +local function clickhouse_attachments_row(tname) + local attachement_fields = { + 'Date', + 'Digest', + 'Attachments.FileName', + 'Attachments.ContentType', + 'Attachments.Length', + 'Attachments.Digest', + } + local elt = string.format('INSERT INTO %s (%s) VALUES ', + tname, table.concat(attachement_fields, ',')) + return elt +end + +local function clickhouse_urls_row(tname) + local urls_fields = { + 'Date', + 'Digest', + 'Urls.Tld', + 'Urls.Url', + } + local elt = string.format('INSERT INTO %s (%s) VALUES ', + settings['urls_table'], table.concat(urls_fields, ',')) + return elt +end + +local function clickhouse_first_row() + table.insert(rows, clickhouse_main_row(settings['table'])) + if settings['attachments_table'] then + table.insert(attachment_rows, + clickhouse_attachments_row(settings['attachments_table'])) + end + if settings['urls_table'] then + table.insert(urls_rows, + clickhouse_urls_row(settings['urls_table'])) + end +end + +local function clickhouse_check_symbol(task, symbols, need_score) + for _,s in ipairs(symbols) do + if task:has_symbol(s) then + if need_score then + local sym = task:get_symbol(s)[1] + return sym['score'] + else + return true + end + end + end + + return false +end + +local function clickhouse_send_data(task) + local function http_cb(err_message, code, body, headers) + if code ~= 200 or err_message then + rspamd_logger.errx(task, "cannot send data to clickhouse server %s: %d:%s", + settings['server'], code, err_message) + else + rspamd_logger.infox(task, "sent %s rows to clickhouse server %s", + settings['limit'], settings['server']) + end + end + + local body = table.concat(rows, ' ') + if not rspamd_http.request({ + task = task, + url = 'http://' .. settings['server'], + body = body, + callback = http_cb, + mime_type = 'text/plain', + timeout = settings['timeout'], + }) then + rspamd_logger.errx(task, "cannot send data to clickhouse server %s: cannot make request", + settings['server']) + end + + if #attachment_rows > 1 then + body = table.concat(attachment_rows, ' ') + if not rspamd_http.request({ + task = task, + url = 'http://' .. settings['server'], + body = body, + callback = http_cb, + mime_type = 'text/plain', + timeout = settings['timeout'], + }) then + rspamd_logger.errx(task, "cannot send attachments to clickhouse server %s: cannot make request", + settings['server']) + end + end + if #urls_rows > 1 then + body = table.concat(urls_rows, ' ') + if not rspamd_http.request({ + task = task, + url = 'http://' .. settings['server'], + body = body, + callback = http_cb, + mime_type = 'text/plain', + timeout = settings['timeout'], + }) then + rspamd_logger.errx(task, "cannot send urls to clickhouse server %s: cannot make request", + settings['server']) + end + end + + for k,specific in pairs(specific_rows) do + if #specific > 1 then + body = table.concat(specific, ' ') + if not rspamd_http.request({ + task = task, + url = 'http://' .. settings['server'], + body = body, + callback = http_cb, + mime_type = 'text/plain', + timeout = settings['timeout'], + }) then + rspamd_logger.errx(task, "cannot send data for domain %s to clickhouse server %s: cannot make request", + k, settings['server']) + end + end + end +end + +local function clickhouse_quote(str) + if str then + return str:gsub('[\'\\]', '\\%1'):lower() + else + return '' + end +end + +local function clickhouse_collect(task) + local from_domain = '' + local from_user = '' + if task:has_from('smtp') then + local from = task:get_from('smtp')[1] + + if from then + from_domain = from['domain'] + from_user = from['user'] + end + + if from_domain == '' then + if task:get_helo() then + from_domain = task:get_helo() + end + end + else + if task:get_helo() then + from_domain = task:get_helo() + end + end + + local mime_domain = '' + local mime_user = '' + if task:has_from('mime') then + local from = task:get_from('mime')[1] + if from then + mime_domain = from['domain'] + mime_user = from['user'] + end + end + + local ip_str = 'undefined' + local ip = task:get_from_ip() + if ip and ip:is_valid() then + local ipnet = ip:apply_mask(settings['ipmask']) + ip_str = ipnet:to_string() + end + + local rcpt_user = '' + local rcpt_domain = '' + if task:has_recipients('smtp') then + local rcpt = task:get_recipients('smtp')[1] + rcpt_user = rcpt['user'] + rcpt_domain = rcpt['domain'] + end + + local list_id = '' + local lh = task:get_header('List-Id') + if lh then + list_id = lh + end + + local score = task:get_metric_score('default')[1]; + local bayes = 'unknown'; + local fuzzy = 'unknown'; + local fann = 'unknown'; + local whitelist = 'unknown'; + local dkim = 'unknown'; + local dmarc = 'unknown'; + + local ret + + ret = clickhouse_check_symbol(task, settings['bayes_spam_symbols'], false) + if ret then + bayes = 'spam' + end + + ret = clickhouse_check_symbol(task, settings['bayes_ham_symbols'], false) + if ret then + bayes = 'ham' + end + + ret = clickhouse_check_symbol(task, settings['fann_symbols'], true) + if ret then + if ret > 0 then + fann = 'spam' + else + fann = 'ham' + end + end + + + ret = clickhouse_check_symbol(task, settings['whitelist_symbols'], true) + if ret then + if ret < 0 then + whitelist = 'whitelist' + else + whitelist = 'blacklist' + end + end + + ret = clickhouse_check_symbol(task, settings['fuzzy_symbols'], false) + if ret then + fuzzy = 'deny' + end + + ret = clickhouse_check_symbol(task, settings['dkim_allow_symbols'], false) + if ret then + dkim = 'allow' + end + + ret = clickhouse_check_symbol(task, settings['dkim_reject_symbols'], false) + if ret then + dkim = 'reject' + end + + ret = clickhouse_check_symbol(task, settings['dmarc_allow_symbols'], false) + if ret then + dmarc = 'allow' + end + + ret = clickhouse_check_symbol(task, settings['dmarc_reject_symbols'], false) + if ret then + dmarc = 'reject' + end + + local nrcpts = 0 + if task:has_recipients('smtp') then + nrcpts = #task:get_recipients('smtp') + end + + local nurls = 0 + if task:has_urls(true) then + nurls = #task:get_urls(true) + end + + local timestamp = task:get_date({ + format = 'connect', + gmt = false + }) + + local elt = string.format("(today(),%d,'%s','%s','%s',%.2f,%d,%d,'%s','%s','%s','%s','%s','%s',%d,'%s','%s','%s','%s','%s','%s','%s')", + timestamp, + clickhouse_quote(from_domain), clickhouse_quote(mime_domain), ip_str, score, + nrcpts, task:get_size(), whitelist, bayes, fuzzy, fann, + dkim, dmarc, nurls, task:get_metric_action('default'), + clickhouse_quote(from_user), clickhouse_quote(mime_user), + clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain), + clickhouse_quote(list_id), task:get_digest()) + table.insert(rows, elt) + + if settings['from_map'] and dkim == 'allow' then + -- Use dkim + local das = task:get_symbol(settings['dkim_allow_symbols'][1]) + if das and das[1] and das[1]['options'] then + for i,dkim_domain in ipairs(das[1]['options']) do + local specific = settings.from_map:get_key(dkim_domain) + if specific then + if not specific_rows[specific] then + local first = clickhouse_main_row(specific) + specific_rows[specific] = {first} + end + table.insert(specific_rows[specific], elt) + end + end + end + + end + + -- Attachments step + local attachments_fnames = {} + local attachments_ctypes = {} + local attachments_lengths = {} + local attachments_digests = {} + for _,part in ipairs(task:get_parts()) do + local fname = part:get_filename() + + if fname then + table.insert(attachments_fnames, string.format("'%s'", clickhouse_quote(fname))) + local type, subtype = part:get_type() + table.insert(attachments_ctypes, string.format("'%s/%s'", + clickhouse_quote(type), clickhouse_quote(subtype))) + table.insert(attachments_lengths, string.format("%s", tostring(part:get_length()))) + table.insert(attachments_digests, string.format("'%s'", string.sub(part:get_digest(), 1, 16))) + end + end + + if #attachments_fnames > 0 then + elt = string.format("(today(),'%s',[%s],[%s],[%s],[%s])", + task:get_digest(), + table.concat(attachments_fnames, ','), + table.concat(attachments_ctypes, ','), + table.concat(attachments_lengths, ','), + table.concat(attachments_digests, ',')) + table.insert(attachment_rows, elt) + end + + -- Urls step + local urls_tlds = {} + local urls_urls = {} + if task:has_urls(false) then + for _,u in ipairs(task:get_urls()) do + table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld()))) + if settings['full_urls'] then + table.insert(urls_urls, string.format("'%s'", + clickhouse_quote(u:get_text()))) + else + table.insert(urls_urls, string.format("'%s'", + clickhouse_quote(u:get_host()))) + end + end + end + + if #urls_tlds > 0 then + elt = string.format("(today(),'%s',[%s],[%s])", + task:get_digest(), + table.concat(urls_tlds, ','), + table.concat(urls_urls, ',')) + table.insert(urls_rows, elt) + end + + nrows = nrows + 1 + + if nrows > settings['limit'] then + clickhouse_send_data(task) + nrows = 0 + rows = {} + attachment_rows = {} + urls_rows = {} + specific_rows = {} + clickhouse_first_row() + end +end + +local opts = rspamd_config:get_all_opt('clickhouse') +if opts then + for k,v in pairs(opts) do + settings[k] = v + end + + if not settings['server'] then + rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') + else + if settings['from_tables'] then + settings['from_map'] = rspamd_config:add_map({ + url = settings['from_tables'], + description = 'clickhouse specific domains', + type = 'regexp' + }) + end + clickhouse_first_row() + rspamd_config:register_symbol({ + name = 'CLICKHOUSE_COLLECT', + type = 'postfilter', + callback = clickhouse_collect, + priority = 10 + }) + rspamd_config:register_finish_script(function(task) + if nrows > 0 then + clickhouse_send_data(task) + end + end) + end +end