From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Date: Fri, 8 May 2026 08:11:52 +0000 (+0200)
Subject: [Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module...
X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e3bc0888c13d39f7fe7ede2014cab48083a27e0f;p=thirdparty%2Frspamd.git
[Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module (#6018)
* [Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module
- reply_to_user / reply_to_domain: parsed from Reply-To via
rspamd_util.parse_mail_address, mirroring the from / mime_from split.
- received_ips: list of IPs from Received headers
- urls and urls_cta with the new collect_urls config block: per-URL
records {url, etld, host, protocol, flags, count} plus aggregate
metrics {total, unique, max_repeats, repeat_ratio}. CTA URLs are
collected via text_part:get_cta_urls({original=true}) and walked via
:get_redirected so url_redirector-resolved hops are captured, then
either kept inline at the top of urls (sorted ahead of non-CTA so
they survive max_urls truncation) or emitted into a dedicated
urls_cta when separate_cta is on
- action_forced: the module name from task:has_pre_result(), so logs
show which prefilter short-circuited the pipeline (or 'no force').
Renames get_received_delay to get_received_info (returns delay + ips
in one pass over the received chain) and replaces the local
merge_settings helper with lua_util.override_defaults — the two are
functionally equivalent recursive deep-merges, but override_defaults
is the project-wide maintained helper.
Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] elastic: reset queue counters on pop drain which prevents the indices from accumulating monotonically over the worker's lifetime
Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] elastic: address review feedback on PR #6018
- Drop tostring() around url:get_text() (already a Lua string) in
url_to_record and url_key.
- Drop tostring() around url:get_flags_num() (.. coerces numbers).
- Replace tostring(url) in CTA dedup key with url:get_text() to avoid
the __tostring metamethod's percent-encoding two-pass walk.
- Drop `or nil` no-op after url:get_redirected().
- Cache url:get_host() once in url_to_record (was called twice).
- Remove dead `if on then` guard on url:get_flags() — only set bits
are inserted, so every value is true.
- Cache tostring(real_ip) in get_received_info and tostring(ip_addr) /
tostring(origin_ip) in get_general_metadata; refactor to one call.
- In build_urls_metadata, compute url_key(u, false) once per URL and
reuse for the CTA lookup; only recompute when full_urls is true.
- Drop sort=true from task:get_urls() — the C-level qsort doesn't
survive: results are rehashed for dedup and re-sorted by count.
Also remove the misleading "deterministic order, stable dedup"
comment (table.sort is unstable in standard Lua).
Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] elastic: drop dead `or {}` after task:get_urls() and other functions that always provide table
Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
---------
Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
---
diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf
index 804a00dd7a..d713ff550f 100644
--- a/conf/modules.d/elastic.conf
+++ b/conf/modules.d/elastic.conf
@@ -49,6 +49,7 @@ elastic {
headers_count_ignore_above = 5; # Record only the first N same-named headers, add "ignored above..." if the limit is reached; set 0 to disable the limit
headers_text_ignore_above = 2048; # Strip specific header value and add "..." to the end; set 0 to disable the limit
symbols_nested = false;
+ urls_nested = false;
empty_value = "unknown"; # Empty numbers, IPs and IP nets are not customizable; they will always be 0, :: and ::/128 respectively
};
index_policy = {
@@ -97,6 +98,21 @@ elastic {
# "Precedence";
# "List-Id";
extra_collect_headers = [];
+ collect_urls = {
+ enabled = true;
+ max_urls = 0; # 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation)
+ separate_cta = true; # If true, also emit urls_cta with just CTA URLs from HTML parts
+ full_urls = false; # Log full URL text vs just host
+ full_cta_urls = false; # Log full URL text vs just host (for CTA URLs)
+ # Params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f
+ get_url_params = {
+ content = true; # Include URLs found inside text/html content
+ images = false; # Include URLs from
attributes (noisy)
+ emails = false; # Include mailto: URLs (rarely useful for phishing analysis)
+ sort = true; # Deterministic order, stable dedup, reproducible doc layout
+ protocols = ["http", "https", "ftp"]; # Protocols to collect
+ };
+ };
geoip = {
enabled = true;
managed = true;
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index f1de8e4b1f..c63d33042a 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -97,6 +97,7 @@ local settings = {
headers_count_ignore_above = 5, -- record only N first same named headers, add 'ignored above...' if reached, set 0 to disable limit
headers_text_ignore_above = 2048, -- strip specific header value and add '...' to the end, set 0 to disable limit
symbols_nested = false,
+ urls_nested = false,
empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively
},
index_policy = {
@@ -145,6 +146,20 @@ local settings = {
-- 'List-Id',
-- 'X-Mailer',
},
+ collect_urls = {
+ enabled = true,
+ max_urls = 0, -- 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation)
+ separate_cta = true, -- if true, also emit urls_cta with just CTA URLs from HTML parts
+ full_urls = false, -- log full URL text vs just host
+ full_cta_urls = false, -- log full URL text vs just host (for CTA URLs)
+ -- params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f
+ get_url_params = {
+ content = true, -- include URLs found inside text/html content
+ images = false, -- include URLs from
attributes (noisy)
+ emails = false, -- include mailto: URLs (rarely useful for phishing analysis)
+ protocols = { 'http', 'https', 'ftp' }, -- protocols to collect
+ },
+ },
geoip = {
enabled = true,
managed = true,
@@ -197,12 +212,18 @@ function Queue:get_all()
end
function Queue:pop()
+ -- queue already empty
if self.first > self.last then
return nil
end
local value = self.data[self.first]
self.data[self.first] = nil
self.first = self.first + 1
+ -- reset indices on drain so they don't grow without bound over the worker's lifetime
+ if self.first > self.last then
+ self.first = 1
+ self.last = 0
+ end
return value
end
@@ -296,15 +317,28 @@ local function handle_error(action, component, limit)
return true
end
-local function get_received_delay(received_headers)
+local function get_received_info(received_headers)
local now = math.floor(rspamd_util.get_time())
local timestamp = 0
local delay = 0
+ local ips = {}
+ local seen_ips = {}
for i, received_header in ipairs(received_headers) do
-- skip first received_header as it's own relay
- if i > 1 and received_header['timestamp'] and received_header['timestamp'] > 0 then
- timestamp = received_header['timestamp']
- break
+ if i > 1 then
+ if timestamp == 0 and received_header['timestamp'] and received_header['timestamp'] > 0 then
+ timestamp = received_header['timestamp']
+ end
+ -- '...' is rspamd's "valid but unknown" placeholder; elastic's t_ip mapping rejects it,
+ -- so guard against it here even though :is_valid() returns true for that case.
+ local real_ip = received_header['real_ip']
+ if real_ip and real_ip:is_valid() then
+ local s = tostring(real_ip)
+ if s ~= '...' and not seen_ips[s] then
+ seen_ips[s] = true
+ table.insert(ips, s)
+ end
+ end
end
end
if timestamp > 0 then
@@ -313,7 +347,149 @@ local function get_received_delay(received_headers)
delay = 0
end
end
- return delay
+ return delay, ips
+end
+
+local function url_to_record(u, full_urls, count)
+ local host = u:get_host()
+ local rec = {
+ etld = u:get_tld() or host,
+ host = host,
+ protocol = u:get_protocol(),
+ count = count,
+ }
+ if full_urls then
+ rec.url = u:get_text()
+ end
+ local flag_list = {}
+ -- url:get_flags() only inserts keys for set bits; every value is true
+ for name in pairs(u:get_flags()) do
+ table.insert(flag_list, name)
+ end
+ if #flag_list > 0 then
+ rec.flags = flag_list
+ end
+ return rec
+end
+
+-- Build dedup key for a url. full_urls = true uses full text (rspamd already deduped at C level),
+-- false collapses by host+protocol+flags so e.g. https://example.com/a and https://example.com/b
+-- merge into one entry with summed count.
+local function url_key(u, full_urls)
+ if full_urls then
+ return u:get_text()
+ end
+ return (u:get_host() or '') .. '|' ..
+ (u:get_protocol() or '') .. '|' ..
+ u:get_flags_num()
+end
+
+-- Collect, dedup, count, and sort URLs.
+-- Each url's u:get_count() gives rspamd's internal occurrence count (it tracks repetitions in C).
+-- cta_keys (optional) = set of keys (built from url_key with full_urls=false) identifying CTA URLs.
+-- cta_mode controls what happens with CTA-matching URLs:
+-- 'mark' (default) - keep them in the list, mark is_cta, sort to the top so they survive max_urls truncation
+-- 'skip' - exclude them entirely from this list (used when emitting them in a separate urls_cta field)
+-- Returns metadata object: { total, unique, max_repeats, repeat_ratio, list }
+local function build_urls_metadata(urls, max_urls, full_urls, cta_keys, cta_mode)
+ cta_mode = cta_mode or 'mark'
+ local by_key = {}
+ local order = {}
+ local total = 0
+ for _, u in ipairs(urls) do
+ local nfkey = url_key(u, false)
+ local is_cta = cta_keys and cta_keys[nfkey] or false
+ if not (is_cta and cta_mode == 'skip') then
+ local key = full_urls and url_key(u, true) or nfkey
+ local n = u:get_count() or 1
+ total = total + n
+ local entry = by_key[key]
+ if entry then
+ entry.count = entry.count + n
+ else
+ entry = {
+ url = u,
+ count = n,
+ is_cta = is_cta,
+ }
+ by_key[key] = entry
+ table.insert(order, entry)
+ end
+ end
+ end
+
+ local max_repeats = 0
+ for _, entry in ipairs(order) do
+ if entry.count > max_repeats then
+ max_repeats = entry.count
+ end
+ end
+
+ -- Sort: CTAs first (only relevant in 'mark' mode; in 'skip' mode no CTAs remain), then by count desc
+ table.sort(order, function(a, b)
+ if a.is_cta ~= b.is_cta then
+ return a.is_cta
+ end
+ return a.count > b.count
+ end)
+
+ local list = {}
+ for i, entry in ipairs(order) do
+ if max_urls > 0 and i > max_urls then
+ break
+ end
+ table.insert(list, url_to_record(entry.url, full_urls, entry.count))
+ end
+
+ local repeat_ratio = 0
+ if #order > 0 then
+ repeat_ratio = lua_util.round(total / #order, 3)
+ end
+
+ return {
+ total = total,
+ unique = #order,
+ max_repeats = max_repeats,
+ repeat_ratio = repeat_ratio,
+ list = list,
+ }
+end
+
+local function collect_cta_urls(task)
+ local cta = {}
+ local seen = {}
+
+ -- Append a URL to the CTA list (deduped), then walk its redirect chain
+ -- via url:get_redirected() so the full CTA-triggered path is captured.
+ -- url_redirector links each hop via set_redirected, so a CTA pointing at
+ -- a shortlink propagates to the resolved target via the linked_url
+ -- pointer; without walking, only the original (or first hop, depending
+ -- on get_cta_urls' return_original flag) would be marked CTA.
+ local function add_with_chain(u)
+ while u do
+ local key = u:get_text()
+ if seen[key] then
+ break
+ end
+ seen[key] = true
+ table.insert(cta, u)
+ u = u:get_redirected()
+ end
+ end
+
+ local parts = task:get_text_parts()
+ for _, part in ipairs(parts) do
+ if part:is_html() then
+ -- original=true returns the source HTML CTA, not its linked_url
+ -- one-hop-forward (the default), so the chain walk starts at the
+ -- actual entry point of the CTA.
+ local part_cta = part:get_cta_urls({ original = true })
+ for _, u in ipairs(part_cta) do
+ add_with_chain(u)
+ end
+ end
+ end
+ return cta
end
local function create_bulk_json(es_index, logs_to_send)
@@ -339,10 +515,14 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
local push_url
local bulk_json
local logs_to_send
+ -- captured so pop_first lands on the same Queue we read from, even if the
+ -- 10x-overflow guard later replaces buffer['logs'] with Queue:new()
+ -- between request start and callback firing.
+ local task_buffer = buffer['logs']
if flush_all then
- logs_to_send = buffer['logs']:get_all()
+ logs_to_send = task_buffer:get_all()
else
- logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows'])
+ logs_to_send = task_buffer:get_first(settings['limits']['max_rows'])
end
nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows
if nlogs_to_send > 0 then
@@ -401,7 +581,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
end
-- proccess results
if push_done then
- buffer['logs']:pop_first(nlogs_to_send)
+ task_buffer:pop_first(nlogs_to_send)
buffer['errors'] = 0
upstream:ok()
else
@@ -410,7 +590,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
rspamd_logger.errx(log_object,
'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
- buffer['logs']:pop_first(nlogs_to_send)
+ task_buffer:pop_first(nlogs_to_send)
buffer['errors'] = 0
else
buffer['errors'] = buffer['errors'] + 1
@@ -456,6 +636,9 @@ local function get_general_metadata(task)
r.rspamd_server = rspamd_hostname or empty
r.digest = task:get_digest() or empty
+ local has_pre_result, _, _, pre_result_module = task:has_pre_result()
+ r.action_forced = has_pre_result and (pre_result_module or 'unknown module') or 'no force'
+
r.action = task:get_metric_action() or empty
r.score = task:get_metric_score()[1] or 0
r.symbols = task:get_symbols_all()
@@ -481,9 +664,12 @@ local function get_general_metadata(task)
r.ip = '::'
r.is_local = false
local ip_addr = task:get_ip()
- if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
- r.is_local = ip_addr:is_local()
- r.ip = tostring(ip_addr)
+ if ip_addr and ip_addr:is_valid() then
+ local s = tostring(ip_addr)
+ if s ~= '...' then
+ r.is_local = ip_addr:is_local()
+ r.ip = s
+ end
end
r.sender_ip = '::'
@@ -492,8 +678,11 @@ local function get_general_metadata(task)
origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
local rspamd_ip = require "rspamd_ip"
local origin_ip = rspamd_ip.from_string(origin)
- if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
- r.sender_ip = tostring(origin_ip)
+ if origin_ip and origin_ip:is_valid() then
+ local s = tostring(origin_ip)
+ if s ~= '...' then
+ r.sender_ip = s
+ end
end
end
@@ -547,6 +736,20 @@ local function get_general_metadata(task)
end
end
+ r.reply_to_user = empty
+ r.reply_to_domain = empty
+ local reply_to_hdr = task:get_header_full('Reply-To')
+ if reply_to_hdr and reply_to_hdr[1] and reply_to_hdr[1].decoded then
+ local reply_to = rspamd_util.parse_mail_address(reply_to_hdr[1].decoded, task:get_mempool())
+ if reply_to and reply_to[1] and
+ reply_to[1].user and #reply_to[1].user > 0 and
+ reply_to[1].domain and #reply_to[1].domain > 0
+ then
+ r.reply_to_user = reply_to[1].user
+ r.reply_to_domain = reply_to[1].domain:lower()
+ end
+ end
+
local settings_id = task:get_settings_id()
if settings_id then
-- Convert to string
@@ -654,7 +857,39 @@ local function get_general_metadata(task)
local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings')
r.fuzzy_hashes = fuzzy_hashes or empty
- r.received_delay = get_received_delay(task:get_received_headers())
+ local received_delay, received_ips = get_received_info(task:get_received_headers())
+ r.received_delay = received_delay
+ if #received_ips > 0 then
+ r.received_ips = received_ips
+ else
+ r.received_ips = '::'
+ end
+
+ if settings['collect_urls']['enabled'] then
+ local task_urls = task:get_urls(settings['collect_urls']['get_url_params'])
+
+ local cta_urls = collect_cta_urls(task)
+ local cta_keys = {}
+ for _, u in ipairs(cta_urls) do
+ cta_keys[url_key(u, false)] = true
+ end
+
+ -- when separate_cta is on, exclude CTAs from urls (they live in urls_cta);
+ -- otherwise mark them and sort to the top so they survive max_urls truncation
+ local cta_mode = settings['collect_urls']['separate_cta'] and 'skip' or 'mark'
+
+ r.urls = build_urls_metadata(task_urls,
+ settings['collect_urls']['max_urls'],
+ settings['collect_urls']['full_urls'],
+ cta_keys, cta_mode)
+
+ if settings['collect_urls']['separate_cta'] and #cta_urls > 0 then
+ r.urls_cta = build_urls_metadata(cta_urls,
+ settings['collect_urls']['max_urls'],
+ settings['collect_urls']['full_cta_urls'],
+ nil, nil)
+ end
+ end
return r
end
@@ -940,7 +1175,7 @@ local function configure_index_policy(cfg, ev_base)
-- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result,
-- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}'
- local index_policy_json = ''
+ local index_policy_json
-- elastic lifecycle policy with hot state
if detected_distro['name'] == 'elastic' then
@@ -1170,7 +1405,7 @@ local function configure_index_policy(cfg, ev_base)
-- opensearch state delete
if settings['index_policy']['delete']['enabled'] then
local prev_state_id = state_id
- state_id = state_id + 1
+ -- state_id = state_id + 1 -- delete is the last state, no further actions appended; bump kept commented for symmetry with warm/cold
index_policy['policy']['states'][prev_state_id]['transitions'] = {
{
state_name = 'delete',
@@ -1268,6 +1503,33 @@ local function configure_index_template(cfg, ev_base)
symbols_obj['type'] = 'nested'
end
+ local urls_list_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ url = t_text_with_keyword,
+ etld = t_keyword,
+ host = t_keyword,
+ protocol = t_keyword,
+ flags = t_keyword,
+ count = t_long,
+ },
+ }
+ if settings['index_template']['urls_nested'] then
+ urls_list_obj['type'] = 'nested'
+ end
+ local urls_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ total = t_long,
+ unique = t_long,
+ max_repeats = t_long,
+ repeat_ratio = t_float,
+ list = urls_list_obj,
+ },
+ }
+
-- dynamic templates
local dynamic_templates_obj = {}
local dynamic_strings = {
@@ -1309,6 +1571,7 @@ local function configure_index_template(cfg, ev_base)
properties = {
rspamd_server = t_keyword,
digest = t_keyword,
+ action_forced = t_keyword,
action = t_keyword,
score = t_double,
symbols = symbols_obj,
@@ -1326,6 +1589,8 @@ local function configure_index_template(cfg, ev_base)
from_domain = t_keyword,
mime_from_user = t_keyword,
mime_from_domain = t_keyword,
+ reply_to_user = t_keyword,
+ reply_to_domain = t_keyword,
settings_id = t_keyword,
asn = asn_obj,
scan_time = t_float,
@@ -1333,6 +1598,9 @@ local function configure_index_template(cfg, ev_base)
non_en = t_boolean_nil_true,
fuzzy_hashes = t_text,
received_delay = t_long,
+ received_ips = t_ip,
+ urls = urls_obj,
+ urls_cta = urls_obj,
},
},
},
@@ -1552,24 +1820,8 @@ end
local opts = rspamd_config:get_all_opt('elastic')
-local function merge_settings(src, dst)
- for k, v in pairs(src) do
- if type(v) == 'table' and type(dst[k]) == 'table' then
- merge_settings(v, dst[k])
- else
- dst[k] = v
- end
- end
-end
-
if opts then
- for k, v in pairs(opts) do
- if type(v) == 'table' and settings[k] and type(settings[k]) == 'table' then
- merge_settings(v, settings[k])
- else
- settings[k] = v
- end
- end
+ settings = lua_util.override_defaults(settings, opts)
if not settings['enabled'] then
rspamd_logger.infox(rspamd_config, 'module disabled in config')