From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Fri, 8 May 2026 08:11:52 +0000 (+0200) Subject: [Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module... X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e3bc0888c13d39f7fe7ede2014cab48083a27e0f;p=thirdparty%2Frspamd.git [Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module (#6018) * [Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module - reply_to_user / reply_to_domain: parsed from Reply-To via rspamd_util.parse_mail_address, mirroring the from / mime_from split. - received_ips: list of IPs from Received headers - urls and urls_cta with the new collect_urls config block: per-URL records {url, etld, host, protocol, flags, count} plus aggregate metrics {total, unique, max_repeats, repeat_ratio}. CTA URLs are collected via text_part:get_cta_urls({original=true}) and walked via :get_redirected so url_redirector-resolved hops are captured, then either kept inline at the top of urls (sorted ahead of non-CTA so they survive max_urls truncation) or emitted into a dedicated urls_cta when separate_cta is on - action_forced: the module name from task:has_pre_result(), so logs show which prefilter short-circuited the pipeline (or 'no force'). Renames get_received_delay to get_received_info (returns delay + ips in one pass over the received chain) and replaces the local merge_settings helper with lua_util.override_defaults — the two are functionally equivalent recursive deep-merges, but override_defaults is the project-wide maintained helper. Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> * [Fix] elastic: reset queue counters on pop drain which prevents the indices from accumulating monotonically over the worker's lifetime Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> * [Fix] elastic: address review feedback on PR #6018 - Drop tostring() around url:get_text() (already a Lua string) in url_to_record and url_key. - Drop tostring() around url:get_flags_num() (.. coerces numbers). - Replace tostring(url) in CTA dedup key with url:get_text() to avoid the __tostring metamethod's percent-encoding two-pass walk. - Drop `or nil` no-op after url:get_redirected(). - Cache url:get_host() once in url_to_record (was called twice). - Remove dead `if on then` guard on url:get_flags() — only set bits are inserted, so every value is true. - Cache tostring(real_ip) in get_received_info and tostring(ip_addr) / tostring(origin_ip) in get_general_metadata; refactor to one call. - In build_urls_metadata, compute url_key(u, false) once per URL and reuse for the CTA lookup; only recompute when full_urls is true. - Drop sort=true from task:get_urls() — the C-level qsort doesn't survive: results are rehashed for dedup and re-sorted by count. Also remove the misleading "deterministic order, stable dedup" comment (table.sort is unstable in standard Lua). Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> * [Fix] elastic: drop dead `or {}` after task:get_urls() and other functions that always provide table Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> --------- Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> --- diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf index 804a00dd7a..d713ff550f 100644 --- a/conf/modules.d/elastic.conf +++ b/conf/modules.d/elastic.conf @@ -49,6 +49,7 @@ elastic { headers_count_ignore_above = 5; # Record only the first N same-named headers, add "ignored above..." if the limit is reached; set 0 to disable the limit headers_text_ignore_above = 2048; # Strip specific header value and add "..." to the end; set 0 to disable the limit symbols_nested = false; + urls_nested = false; empty_value = "unknown"; # Empty numbers, IPs and IP nets are not customizable; they will always be 0, :: and ::/128 respectively }; index_policy = { @@ -97,6 +98,21 @@ elastic { # "Precedence"; # "List-Id"; extra_collect_headers = []; + collect_urls = { + enabled = true; + max_urls = 0; # 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation) + separate_cta = true; # If true, also emit urls_cta with just CTA URLs from HTML parts + full_urls = false; # Log full URL text vs just host + full_cta_urls = false; # Log full URL text vs just host (for CTA URLs) + # Params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f + get_url_params = { + content = true; # Include URLs found inside text/html content + images = false; # Include URLs from attributes (noisy) + emails = false; # Include mailto: URLs (rarely useful for phishing analysis) + sort = true; # Deterministic order, stable dedup, reproducible doc layout + protocols = ["http", "https", "ftp"]; # Protocols to collect + }; + }; geoip = { enabled = true; managed = true; diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index f1de8e4b1f..c63d33042a 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -97,6 +97,7 @@ local settings = { headers_count_ignore_above = 5, -- record only N first same named headers, add 'ignored above...' if reached, set 0 to disable limit headers_text_ignore_above = 2048, -- strip specific header value and add '...' to the end, set 0 to disable limit symbols_nested = false, + urls_nested = false, empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively }, index_policy = { @@ -145,6 +146,20 @@ local settings = { -- 'List-Id', -- 'X-Mailer', }, + collect_urls = { + enabled = true, + max_urls = 0, -- 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation) + separate_cta = true, -- if true, also emit urls_cta with just CTA URLs from HTML parts + full_urls = false, -- log full URL text vs just host + full_cta_urls = false, -- log full URL text vs just host (for CTA URLs) + -- params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f + get_url_params = { + content = true, -- include URLs found inside text/html content + images = false, -- include URLs from attributes (noisy) + emails = false, -- include mailto: URLs (rarely useful for phishing analysis) + protocols = { 'http', 'https', 'ftp' }, -- protocols to collect + }, + }, geoip = { enabled = true, managed = true, @@ -197,12 +212,18 @@ function Queue:get_all() end function Queue:pop() + -- queue already empty if self.first > self.last then return nil end local value = self.data[self.first] self.data[self.first] = nil self.first = self.first + 1 + -- reset indices on drain so they don't grow without bound over the worker's lifetime + if self.first > self.last then + self.first = 1 + self.last = 0 + end return value end @@ -296,15 +317,28 @@ local function handle_error(action, component, limit) return true end -local function get_received_delay(received_headers) +local function get_received_info(received_headers) local now = math.floor(rspamd_util.get_time()) local timestamp = 0 local delay = 0 + local ips = {} + local seen_ips = {} for i, received_header in ipairs(received_headers) do -- skip first received_header as it's own relay - if i > 1 and received_header['timestamp'] and received_header['timestamp'] > 0 then - timestamp = received_header['timestamp'] - break + if i > 1 then + if timestamp == 0 and received_header['timestamp'] and received_header['timestamp'] > 0 then + timestamp = received_header['timestamp'] + end + -- '...' is rspamd's "valid but unknown" placeholder; elastic's t_ip mapping rejects it, + -- so guard against it here even though :is_valid() returns true for that case. + local real_ip = received_header['real_ip'] + if real_ip and real_ip:is_valid() then + local s = tostring(real_ip) + if s ~= '...' and not seen_ips[s] then + seen_ips[s] = true + table.insert(ips, s) + end + end end end if timestamp > 0 then @@ -313,7 +347,149 @@ local function get_received_delay(received_headers) delay = 0 end end - return delay + return delay, ips +end + +local function url_to_record(u, full_urls, count) + local host = u:get_host() + local rec = { + etld = u:get_tld() or host, + host = host, + protocol = u:get_protocol(), + count = count, + } + if full_urls then + rec.url = u:get_text() + end + local flag_list = {} + -- url:get_flags() only inserts keys for set bits; every value is true + for name in pairs(u:get_flags()) do + table.insert(flag_list, name) + end + if #flag_list > 0 then + rec.flags = flag_list + end + return rec +end + +-- Build dedup key for a url. full_urls = true uses full text (rspamd already deduped at C level), +-- false collapses by host+protocol+flags so e.g. https://example.com/a and https://example.com/b +-- merge into one entry with summed count. +local function url_key(u, full_urls) + if full_urls then + return u:get_text() + end + return (u:get_host() or '') .. '|' .. + (u:get_protocol() or '') .. '|' .. + u:get_flags_num() +end + +-- Collect, dedup, count, and sort URLs. +-- Each url's u:get_count() gives rspamd's internal occurrence count (it tracks repetitions in C). +-- cta_keys (optional) = set of keys (built from url_key with full_urls=false) identifying CTA URLs. +-- cta_mode controls what happens with CTA-matching URLs: +-- 'mark' (default) - keep them in the list, mark is_cta, sort to the top so they survive max_urls truncation +-- 'skip' - exclude them entirely from this list (used when emitting them in a separate urls_cta field) +-- Returns metadata object: { total, unique, max_repeats, repeat_ratio, list } +local function build_urls_metadata(urls, max_urls, full_urls, cta_keys, cta_mode) + cta_mode = cta_mode or 'mark' + local by_key = {} + local order = {} + local total = 0 + for _, u in ipairs(urls) do + local nfkey = url_key(u, false) + local is_cta = cta_keys and cta_keys[nfkey] or false + if not (is_cta and cta_mode == 'skip') then + local key = full_urls and url_key(u, true) or nfkey + local n = u:get_count() or 1 + total = total + n + local entry = by_key[key] + if entry then + entry.count = entry.count + n + else + entry = { + url = u, + count = n, + is_cta = is_cta, + } + by_key[key] = entry + table.insert(order, entry) + end + end + end + + local max_repeats = 0 + for _, entry in ipairs(order) do + if entry.count > max_repeats then + max_repeats = entry.count + end + end + + -- Sort: CTAs first (only relevant in 'mark' mode; in 'skip' mode no CTAs remain), then by count desc + table.sort(order, function(a, b) + if a.is_cta ~= b.is_cta then + return a.is_cta + end + return a.count > b.count + end) + + local list = {} + for i, entry in ipairs(order) do + if max_urls > 0 and i > max_urls then + break + end + table.insert(list, url_to_record(entry.url, full_urls, entry.count)) + end + + local repeat_ratio = 0 + if #order > 0 then + repeat_ratio = lua_util.round(total / #order, 3) + end + + return { + total = total, + unique = #order, + max_repeats = max_repeats, + repeat_ratio = repeat_ratio, + list = list, + } +end + +local function collect_cta_urls(task) + local cta = {} + local seen = {} + + -- Append a URL to the CTA list (deduped), then walk its redirect chain + -- via url:get_redirected() so the full CTA-triggered path is captured. + -- url_redirector links each hop via set_redirected, so a CTA pointing at + -- a shortlink propagates to the resolved target via the linked_url + -- pointer; without walking, only the original (or first hop, depending + -- on get_cta_urls' return_original flag) would be marked CTA. + local function add_with_chain(u) + while u do + local key = u:get_text() + if seen[key] then + break + end + seen[key] = true + table.insert(cta, u) + u = u:get_redirected() + end + end + + local parts = task:get_text_parts() + for _, part in ipairs(parts) do + if part:is_html() then + -- original=true returns the source HTML CTA, not its linked_url + -- one-hop-forward (the default), so the chain walk starts at the + -- actual entry point of the CTA. + local part_cta = part:get_cta_urls({ original = true }) + for _, u in ipairs(part_cta) do + add_with_chain(u) + end + end + end + return cta end local function create_bulk_json(es_index, logs_to_send) @@ -339,10 +515,14 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) local push_url local bulk_json local logs_to_send + -- captured so pop_first lands on the same Queue we read from, even if the + -- 10x-overflow guard later replaces buffer['logs'] with Queue:new() + -- between request start and callback firing. + local task_buffer = buffer['logs'] if flush_all then - logs_to_send = buffer['logs']:get_all() + logs_to_send = task_buffer:get_all() else - logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows']) + logs_to_send = task_buffer:get_first(settings['limits']['max_rows']) end nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows if nlogs_to_send > 0 then @@ -401,7 +581,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) end -- proccess results if push_done then - buffer['logs']:pop_first(nlogs_to_send) + task_buffer:pop_first(nlogs_to_send) buffer['errors'] = 0 upstream:ok() else @@ -410,7 +590,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) rspamd_logger.errx(log_object, 'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger', nlogs_to_send, buffer['errors'], settings['limits']['max_fail']) - buffer['logs']:pop_first(nlogs_to_send) + task_buffer:pop_first(nlogs_to_send) buffer['errors'] = 0 else buffer['errors'] = buffer['errors'] + 1 @@ -456,6 +636,9 @@ local function get_general_metadata(task) r.rspamd_server = rspamd_hostname or empty r.digest = task:get_digest() or empty + local has_pre_result, _, _, pre_result_module = task:has_pre_result() + r.action_forced = has_pre_result and (pre_result_module or 'unknown module') or 'no force' + r.action = task:get_metric_action() or empty r.score = task:get_metric_score()[1] or 0 r.symbols = task:get_symbols_all() @@ -481,9 +664,12 @@ local function get_general_metadata(task) r.ip = '::' r.is_local = false local ip_addr = task:get_ip() - if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then - r.is_local = ip_addr:is_local() - r.ip = tostring(ip_addr) + if ip_addr and ip_addr:is_valid() then + local s = tostring(ip_addr) + if s ~= '...' then + r.is_local = ip_addr:is_local() + r.ip = s + end end r.sender_ip = '::' @@ -492,8 +678,11 @@ local function get_general_metadata(task) origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '') local rspamd_ip = require "rspamd_ip" local origin_ip = rspamd_ip.from_string(origin) - if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then - r.sender_ip = tostring(origin_ip) + if origin_ip and origin_ip:is_valid() then + local s = tostring(origin_ip) + if s ~= '...' then + r.sender_ip = s + end end end @@ -547,6 +736,20 @@ local function get_general_metadata(task) end end + r.reply_to_user = empty + r.reply_to_domain = empty + local reply_to_hdr = task:get_header_full('Reply-To') + if reply_to_hdr and reply_to_hdr[1] and reply_to_hdr[1].decoded then + local reply_to = rspamd_util.parse_mail_address(reply_to_hdr[1].decoded, task:get_mempool()) + if reply_to and reply_to[1] and + reply_to[1].user and #reply_to[1].user > 0 and + reply_to[1].domain and #reply_to[1].domain > 0 + then + r.reply_to_user = reply_to[1].user + r.reply_to_domain = reply_to[1].domain:lower() + end + end + local settings_id = task:get_settings_id() if settings_id then -- Convert to string @@ -654,7 +857,39 @@ local function get_general_metadata(task) local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings') r.fuzzy_hashes = fuzzy_hashes or empty - r.received_delay = get_received_delay(task:get_received_headers()) + local received_delay, received_ips = get_received_info(task:get_received_headers()) + r.received_delay = received_delay + if #received_ips > 0 then + r.received_ips = received_ips + else + r.received_ips = '::' + end + + if settings['collect_urls']['enabled'] then + local task_urls = task:get_urls(settings['collect_urls']['get_url_params']) + + local cta_urls = collect_cta_urls(task) + local cta_keys = {} + for _, u in ipairs(cta_urls) do + cta_keys[url_key(u, false)] = true + end + + -- when separate_cta is on, exclude CTAs from urls (they live in urls_cta); + -- otherwise mark them and sort to the top so they survive max_urls truncation + local cta_mode = settings['collect_urls']['separate_cta'] and 'skip' or 'mark' + + r.urls = build_urls_metadata(task_urls, + settings['collect_urls']['max_urls'], + settings['collect_urls']['full_urls'], + cta_keys, cta_mode) + + if settings['collect_urls']['separate_cta'] and #cta_urls > 0 then + r.urls_cta = build_urls_metadata(cta_urls, + settings['collect_urls']['max_urls'], + settings['collect_urls']['full_cta_urls'], + nil, nil) + end + end return r end @@ -940,7 +1175,7 @@ local function configure_index_policy(cfg, ev_base) -- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result, -- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}' - local index_policy_json = '' + local index_policy_json -- elastic lifecycle policy with hot state if detected_distro['name'] == 'elastic' then @@ -1170,7 +1405,7 @@ local function configure_index_policy(cfg, ev_base) -- opensearch state delete if settings['index_policy']['delete']['enabled'] then local prev_state_id = state_id - state_id = state_id + 1 + -- state_id = state_id + 1 -- delete is the last state, no further actions appended; bump kept commented for symmetry with warm/cold index_policy['policy']['states'][prev_state_id]['transitions'] = { { state_name = 'delete', @@ -1268,6 +1503,33 @@ local function configure_index_template(cfg, ev_base) symbols_obj['type'] = 'nested' end + local urls_list_obj = { + dynamic = false, + type = 'object', + properties = { + url = t_text_with_keyword, + etld = t_keyword, + host = t_keyword, + protocol = t_keyword, + flags = t_keyword, + count = t_long, + }, + } + if settings['index_template']['urls_nested'] then + urls_list_obj['type'] = 'nested' + end + local urls_obj = { + dynamic = false, + type = 'object', + properties = { + total = t_long, + unique = t_long, + max_repeats = t_long, + repeat_ratio = t_float, + list = urls_list_obj, + }, + } + -- dynamic templates local dynamic_templates_obj = {} local dynamic_strings = { @@ -1309,6 +1571,7 @@ local function configure_index_template(cfg, ev_base) properties = { rspamd_server = t_keyword, digest = t_keyword, + action_forced = t_keyword, action = t_keyword, score = t_double, symbols = symbols_obj, @@ -1326,6 +1589,8 @@ local function configure_index_template(cfg, ev_base) from_domain = t_keyword, mime_from_user = t_keyword, mime_from_domain = t_keyword, + reply_to_user = t_keyword, + reply_to_domain = t_keyword, settings_id = t_keyword, asn = asn_obj, scan_time = t_float, @@ -1333,6 +1598,9 @@ local function configure_index_template(cfg, ev_base) non_en = t_boolean_nil_true, fuzzy_hashes = t_text, received_delay = t_long, + received_ips = t_ip, + urls = urls_obj, + urls_cta = urls_obj, }, }, }, @@ -1552,24 +1820,8 @@ end local opts = rspamd_config:get_all_opt('elastic') -local function merge_settings(src, dst) - for k, v in pairs(src) do - if type(v) == 'table' and type(dst[k]) == 'table' then - merge_settings(v, dst[k]) - else - dst[k] = v - end - end -end - if opts then - for k, v in pairs(opts) do - if type(v) == 'table' and settings[k] and type(settings[k]) == 'table' then - merge_settings(v, settings[k]) - else - settings[k] = v - end - end + settings = lua_util.override_defaults(settings, opts) if not settings['enabled'] then rspamd_logger.infox(rspamd_config, 'module disabled in config')