]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module...
authorDmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Fri, 8 May 2026 08:11:52 +0000 (10:11 +0200)
committerGitHub <noreply@github.com>
Fri, 8 May 2026 08:11:52 +0000 (09:11 +0100)
* [Feature] elastic: log Reply-To, received IPs, URL metadata, and pre-result module

- reply_to_user / reply_to_domain: parsed from Reply-To via
  rspamd_util.parse_mail_address, mirroring the from / mime_from split.
- received_ips: list of IPs from Received headers
- urls and urls_cta with the new collect_urls config block: per-URL
  records {url, etld, host, protocol, flags, count} plus aggregate
  metrics {total, unique, max_repeats, repeat_ratio}. CTA URLs are
  collected via text_part:get_cta_urls({original=true}) and walked via
  :get_redirected so url_redirector-resolved hops are captured, then
  either kept inline at the top of urls (sorted ahead of non-CTA so
  they survive max_urls truncation) or emitted into a dedicated
  urls_cta when separate_cta is on
- action_forced: the module name from task:has_pre_result(), so logs
  show which prefilter short-circuited the pipeline (or 'no force').

Renames get_received_delay to get_received_info (returns delay + ips
in one pass over the received chain) and replaces the local
merge_settings helper with lua_util.override_defaults — the two are
functionally equivalent recursive deep-merges, but override_defaults
is the project-wide maintained helper.

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] elastic: reset queue counters on pop drain which prevents the indices from accumulating monotonically over the worker's lifetime

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] elastic: address review feedback on PR #6018

- Drop tostring() around url:get_text() (already a Lua string) in
  url_to_record and url_key.
- Drop tostring() around url:get_flags_num() (.. coerces numbers).
- Replace tostring(url) in CTA dedup key with url:get_text() to avoid
  the __tostring metamethod's percent-encoding two-pass walk.
- Drop `or nil` no-op after url:get_redirected().
- Cache url:get_host() once in url_to_record (was called twice).
- Remove dead `if on then` guard on url:get_flags() — only set bits
  are inserted, so every value is true.
- Cache tostring(real_ip) in get_received_info and tostring(ip_addr) /
  tostring(origin_ip) in get_general_metadata; refactor to one call.
- In build_urls_metadata, compute url_key(u, false) once per URL and
  reuse for the CTA lookup; only recompute when full_urls is true.
- Drop sort=true from task:get_urls() — the C-level qsort doesn't
  survive: results are rehashed for dedup and re-sorted by count.
  Also remove the misleading "deterministic order, stable dedup"
  comment (table.sort is unstable in standard Lua).

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] elastic: drop dead `or {}` after task:get_urls() and other functions that always provide table

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
---------

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
conf/modules.d/elastic.conf
src/plugins/lua/elastic.lua

index 804a00dd7a9092a94cf61daa76eb47c24be895dc..d713ff550f7af8e1682cd2427f8afa4f1b358277 100644 (file)
@@ -49,6 +49,7 @@ elastic {
     headers_count_ignore_above = 5; # Record only the first N same-named headers, add "ignored above..." if the limit is reached; set 0 to disable the limit
     headers_text_ignore_above = 2048; # Strip specific header value and add "..." to the end; set 0 to disable the limit
     symbols_nested = false;
+    urls_nested = false;
     empty_value = "unknown"; # Empty numbers, IPs and IP nets are not customizable; they will always be 0, :: and ::/128 respectively
   };
   index_policy = {
@@ -97,6 +98,21 @@ elastic {
   # "Precedence";
   # "List-Id";
   extra_collect_headers = [];
+  collect_urls = {
+    enabled = true;
+    max_urls = 0; # 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation)
+    separate_cta = true; # If true, also emit urls_cta with just CTA URLs from HTML parts
+    full_urls = false; # Log full URL text vs just host
+    full_cta_urls = false; # Log full URL text vs just host (for CTA URLs)
+    # Params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f
+    get_url_params = {
+      content = true; # Include URLs found inside text/html content
+      images = false; # Include URLs from <img src> attributes (noisy)
+      emails = false; # Include mailto: URLs (rarely useful for phishing analysis)
+      sort = true; # Deterministic order, stable dedup, reproducible doc layout
+      protocols = ["http", "https", "ftp"]; # Protocols to collect
+    };
+  };
   geoip = {
     enabled = true;
     managed = true;
index f1de8e4b1f683384f550dbff9e65852cfa07235f..c63d33042a91f6f377deacf3c09f6f775a284c92 100644 (file)
@@ -97,6 +97,7 @@ local settings = {
     headers_count_ignore_above = 5, -- record only N first same named headers, add 'ignored above...' if reached, set 0 to disable limit
     headers_text_ignore_above = 2048, -- strip specific header value and add '...' to the end, set 0 to disable limit
     symbols_nested = false,
+    urls_nested = false,
     empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively
   },
   index_policy = {
@@ -145,6 +146,20 @@ local settings = {
     -- 'List-Id',
     -- 'X-Mailer',
   },
+  collect_urls = {
+    enabled = true,
+    max_urls = 0, -- 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation)
+    separate_cta = true, -- if true, also emit urls_cta with just CTA URLs from HTML parts
+    full_urls = false, -- log full URL text vs just host
+    full_cta_urls = false, -- log full URL text vs just host (for CTA URLs)
+    -- params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f
+    get_url_params = {
+      content = true, -- include URLs found inside text/html content
+      images = false, -- include URLs from <img src> attributes (noisy)
+      emails = false, -- include mailto: URLs (rarely useful for phishing analysis)
+      protocols = { 'http', 'https', 'ftp' }, -- protocols to collect
+    },
+  },
   geoip = {
     enabled = true,
     managed = true,
@@ -197,12 +212,18 @@ function Queue:get_all()
 end
 
 function Queue:pop()
+  -- queue already empty
   if self.first > self.last then
     return nil
   end
   local value = self.data[self.first]
   self.data[self.first] = nil
   self.first = self.first + 1
+  -- reset indices on drain so they don't grow without bound over the worker's lifetime
+  if self.first > self.last then
+    self.first = 1
+    self.last = 0
+  end
   return value
 end
 
@@ -296,15 +317,28 @@ local function handle_error(action, component, limit)
   return true
 end
 
-local function get_received_delay(received_headers)
+local function get_received_info(received_headers)
   local now = math.floor(rspamd_util.get_time())
   local timestamp = 0
   local delay = 0
+  local ips = {}
+  local seen_ips = {}
   for i, received_header in ipairs(received_headers) do
     -- skip first received_header as it's own relay
-    if i > 1 and received_header['timestamp'] and received_header['timestamp'] > 0 then
-      timestamp = received_header['timestamp']
-      break
+    if i > 1 then
+      if timestamp == 0 and received_header['timestamp'] and received_header['timestamp'] > 0 then
+        timestamp = received_header['timestamp']
+      end
+      -- '...' is rspamd's "valid but unknown" placeholder; elastic's t_ip mapping rejects it,
+      -- so guard against it here even though :is_valid() returns true for that case.
+      local real_ip = received_header['real_ip']
+      if real_ip and real_ip:is_valid() then
+        local s = tostring(real_ip)
+        if s ~= '...' and not seen_ips[s] then
+          seen_ips[s] = true
+          table.insert(ips, s)
+        end
+      end
     end
   end
   if timestamp > 0 then
@@ -313,7 +347,149 @@ local function get_received_delay(received_headers)
       delay = 0
     end
   end
-  return delay
+  return delay, ips
+end
+
+local function url_to_record(u, full_urls, count)
+  local host = u:get_host()
+  local rec = {
+    etld = u:get_tld() or host,
+    host = host,
+    protocol = u:get_protocol(),
+    count = count,
+  }
+  if full_urls then
+    rec.url = u:get_text()
+  end
+  local flag_list = {}
+  -- url:get_flags() only inserts keys for set bits; every value is true
+  for name in pairs(u:get_flags()) do
+    table.insert(flag_list, name)
+  end
+  if #flag_list > 0 then
+    rec.flags = flag_list
+  end
+  return rec
+end
+
+-- Build dedup key for a url. full_urls = true uses full text (rspamd already deduped at C level),
+-- false collapses by host+protocol+flags so e.g. https://example.com/a and https://example.com/b
+-- merge into one entry with summed count.
+local function url_key(u, full_urls)
+  if full_urls then
+    return u:get_text()
+  end
+  return (u:get_host() or '') .. '|' ..
+      (u:get_protocol() or '') .. '|' ..
+      u:get_flags_num()
+end
+
+-- Collect, dedup, count, and sort URLs.
+-- Each url's u:get_count() gives rspamd's internal occurrence count (it tracks repetitions in C).
+-- cta_keys (optional) = set of keys (built from url_key with full_urls=false) identifying CTA URLs.
+-- cta_mode controls what happens with CTA-matching URLs:
+--   'mark' (default) - keep them in the list, mark is_cta, sort to the top so they survive max_urls truncation
+--   'skip'           - exclude them entirely from this list (used when emitting them in a separate urls_cta field)
+-- Returns metadata object: { total, unique, max_repeats, repeat_ratio, list }
+local function build_urls_metadata(urls, max_urls, full_urls, cta_keys, cta_mode)
+  cta_mode = cta_mode or 'mark'
+  local by_key = {}
+  local order = {}
+  local total = 0
+  for _, u in ipairs(urls) do
+    local nfkey = url_key(u, false)
+    local is_cta = cta_keys and cta_keys[nfkey] or false
+    if not (is_cta and cta_mode == 'skip') then
+      local key = full_urls and url_key(u, true) or nfkey
+      local n = u:get_count() or 1
+      total = total + n
+      local entry = by_key[key]
+      if entry then
+        entry.count = entry.count + n
+      else
+        entry = {
+          url = u,
+          count = n,
+          is_cta = is_cta,
+        }
+        by_key[key] = entry
+        table.insert(order, entry)
+      end
+    end
+  end
+
+  local max_repeats = 0
+  for _, entry in ipairs(order) do
+    if entry.count > max_repeats then
+      max_repeats = entry.count
+    end
+  end
+
+  -- Sort: CTAs first (only relevant in 'mark' mode; in 'skip' mode no CTAs remain), then by count desc
+  table.sort(order, function(a, b)
+    if a.is_cta ~= b.is_cta then
+      return a.is_cta
+    end
+    return a.count > b.count
+  end)
+
+  local list = {}
+  for i, entry in ipairs(order) do
+    if max_urls > 0 and i > max_urls then
+      break
+    end
+    table.insert(list, url_to_record(entry.url, full_urls, entry.count))
+  end
+
+  local repeat_ratio = 0
+  if #order > 0 then
+    repeat_ratio = lua_util.round(total / #order, 3)
+  end
+
+  return {
+    total = total,
+    unique = #order,
+    max_repeats = max_repeats,
+    repeat_ratio = repeat_ratio,
+    list = list,
+  }
+end
+
+local function collect_cta_urls(task)
+  local cta = {}
+  local seen = {}
+
+  -- Append a URL to the CTA list (deduped), then walk its redirect chain
+  -- via url:get_redirected() so the full CTA-triggered path is captured.
+  -- url_redirector links each hop via set_redirected, so a CTA pointing at
+  -- a shortlink propagates to the resolved target via the linked_url
+  -- pointer; without walking, only the original (or first hop, depending
+  -- on get_cta_urls' return_original flag) would be marked CTA.
+  local function add_with_chain(u)
+    while u do
+      local key = u:get_text()
+      if seen[key] then
+        break
+      end
+      seen[key] = true
+      table.insert(cta, u)
+      u = u:get_redirected()
+    end
+  end
+
+  local parts = task:get_text_parts()
+  for _, part in ipairs(parts) do
+    if part:is_html() then
+      -- original=true returns the source HTML CTA, not its linked_url
+      -- one-hop-forward (the default), so the chain walk starts at the
+      -- actual entry point of the CTA.
+      local part_cta = part:get_cta_urls({ original = true })
+      for _, u in ipairs(part_cta) do
+        add_with_chain(u)
+      end
+    end
+  end
+  return cta
 end
 
 local function create_bulk_json(es_index, logs_to_send)
@@ -339,10 +515,14 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
   local push_url
   local bulk_json
   local logs_to_send
+  -- captured so pop_first lands on the same Queue we read from, even if the
+  -- 10x-overflow guard later replaces buffer['logs'] with Queue:new()
+  -- between request start and callback firing.
+  local task_buffer = buffer['logs']
   if flush_all then
-    logs_to_send = buffer['logs']:get_all()
+    logs_to_send = task_buffer:get_all()
   else
-    logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows'])
+    logs_to_send = task_buffer:get_first(settings['limits']['max_rows'])
   end
   nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows
   if nlogs_to_send > 0 then
@@ -401,7 +581,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
     end
     -- proccess results
     if push_done then
-      buffer['logs']:pop_first(nlogs_to_send)
+      task_buffer:pop_first(nlogs_to_send)
       buffer['errors'] = 0
       upstream:ok()
     else
@@ -410,7 +590,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base)
         rspamd_logger.errx(log_object,
           'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
           nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
-        buffer['logs']:pop_first(nlogs_to_send)
+        task_buffer:pop_first(nlogs_to_send)
         buffer['errors'] = 0
       else
         buffer['errors'] = buffer['errors'] + 1
@@ -456,6 +636,9 @@ local function get_general_metadata(task)
   r.rspamd_server = rspamd_hostname or empty
   r.digest = task:get_digest() or empty
 
+  local has_pre_result, _, _, pre_result_module = task:has_pre_result()
+  r.action_forced = has_pre_result and (pre_result_module or 'unknown module') or 'no force'
+
   r.action = task:get_metric_action() or empty
   r.score = task:get_metric_score()[1] or 0
   r.symbols = task:get_symbols_all()
@@ -481,9 +664,12 @@ local function get_general_metadata(task)
   r.ip = '::'
   r.is_local = false
   local ip_addr = task:get_ip()
-  if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
-    r.is_local = ip_addr:is_local()
-    r.ip = tostring(ip_addr)
+  if ip_addr and ip_addr:is_valid() then
+    local s = tostring(ip_addr)
+    if s ~= '...' then
+      r.is_local = ip_addr:is_local()
+      r.ip = s
+    end
   end
 
   r.sender_ip = '::'
@@ -492,8 +678,11 @@ local function get_general_metadata(task)
     origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
     local rspamd_ip = require "rspamd_ip"
     local origin_ip = rspamd_ip.from_string(origin)
-    if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
-      r.sender_ip = tostring(origin_ip)
+    if origin_ip and origin_ip:is_valid() then
+      local s = tostring(origin_ip)
+      if s ~= '...' then
+        r.sender_ip = s
+      end
     end
   end
 
@@ -547,6 +736,20 @@ local function get_general_metadata(task)
     end
   end
 
+  r.reply_to_user = empty
+  r.reply_to_domain = empty
+  local reply_to_hdr = task:get_header_full('Reply-To')
+  if reply_to_hdr and reply_to_hdr[1] and reply_to_hdr[1].decoded then
+    local reply_to = rspamd_util.parse_mail_address(reply_to_hdr[1].decoded, task:get_mempool())
+    if reply_to and reply_to[1] and
+      reply_to[1].user and #reply_to[1].user > 0 and
+      reply_to[1].domain and #reply_to[1].domain > 0
+    then
+      r.reply_to_user = reply_to[1].user
+      r.reply_to_domain = reply_to[1].domain:lower()
+    end
+  end
+
   local settings_id = task:get_settings_id()
   if settings_id then
     -- Convert to string
@@ -654,7 +857,39 @@ local function get_general_metadata(task)
   local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings')
   r.fuzzy_hashes = fuzzy_hashes or empty
 
-  r.received_delay = get_received_delay(task:get_received_headers())
+  local received_delay, received_ips = get_received_info(task:get_received_headers())
+  r.received_delay = received_delay
+  if #received_ips > 0 then
+    r.received_ips = received_ips
+  else
+    r.received_ips = '::'
+  end
+
+  if settings['collect_urls']['enabled'] then
+    local task_urls = task:get_urls(settings['collect_urls']['get_url_params'])
+
+    local cta_urls = collect_cta_urls(task)
+    local cta_keys = {}
+    for _, u in ipairs(cta_urls) do
+      cta_keys[url_key(u, false)] = true
+    end
+
+    -- when separate_cta is on, exclude CTAs from urls (they live in urls_cta);
+    -- otherwise mark them and sort to the top so they survive max_urls truncation
+    local cta_mode = settings['collect_urls']['separate_cta'] and 'skip' or 'mark'
+
+    r.urls = build_urls_metadata(task_urls,
+        settings['collect_urls']['max_urls'],
+        settings['collect_urls']['full_urls'],
+        cta_keys, cta_mode)
+
+    if settings['collect_urls']['separate_cta'] and #cta_urls > 0 then
+      r.urls_cta = build_urls_metadata(cta_urls,
+          settings['collect_urls']['max_urls'],
+          settings['collect_urls']['full_cta_urls'],
+          nil, nil)
+    end
+  end
 
   return r
 end
@@ -940,7 +1175,7 @@ local function configure_index_policy(cfg, ev_base)
 
   -- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result,
   -- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}'
-  local index_policy_json = ''
+  local index_policy_json
 
   -- elastic lifecycle policy with hot state
   if detected_distro['name'] == 'elastic' then
@@ -1170,7 +1405,7 @@ local function configure_index_policy(cfg, ev_base)
     -- opensearch state delete
     if settings['index_policy']['delete']['enabled'] then
       local prev_state_id = state_id
-      state_id = state_id + 1
+      -- state_id = state_id + 1 -- delete is the last state, no further actions appended; bump kept commented for symmetry with warm/cold
       index_policy['policy']['states'][prev_state_id]['transitions'] = {
         {
           state_name = 'delete',
@@ -1268,6 +1503,33 @@ local function configure_index_template(cfg, ev_base)
     symbols_obj['type'] = 'nested'
   end
 
+  local urls_list_obj = {
+    dynamic = false,
+    type = 'object',
+    properties = {
+      url = t_text_with_keyword,
+      etld = t_keyword,
+      host = t_keyword,
+      protocol = t_keyword,
+      flags = t_keyword,
+      count = t_long,
+    },
+  }
+  if settings['index_template']['urls_nested'] then
+    urls_list_obj['type'] = 'nested'
+  end
+  local urls_obj = {
+    dynamic = false,
+    type = 'object',
+    properties = {
+      total = t_long,
+      unique = t_long,
+      max_repeats = t_long,
+      repeat_ratio = t_float,
+      list = urls_list_obj,
+    },
+  }
+
   -- dynamic templates
   local dynamic_templates_obj = {}
   local dynamic_strings = {
@@ -1309,6 +1571,7 @@ local function configure_index_template(cfg, ev_base)
             properties = {
               rspamd_server = t_keyword,
               digest = t_keyword,
+              action_forced = t_keyword,
               action = t_keyword,
               score = t_double,
               symbols = symbols_obj,
@@ -1326,6 +1589,8 @@ local function configure_index_template(cfg, ev_base)
               from_domain = t_keyword,
               mime_from_user = t_keyword,
               mime_from_domain = t_keyword,
+              reply_to_user = t_keyword,
+              reply_to_domain = t_keyword,
               settings_id = t_keyword,
               asn = asn_obj,
               scan_time = t_float,
@@ -1333,6 +1598,9 @@ local function configure_index_template(cfg, ev_base)
               non_en = t_boolean_nil_true,
               fuzzy_hashes = t_text,
               received_delay = t_long,
+              received_ips = t_ip,
+              urls = urls_obj,
+              urls_cta = urls_obj,
             },
           },
         },
@@ -1552,24 +1820,8 @@ end
 
 local opts = rspamd_config:get_all_opt('elastic')
 
-local function merge_settings(src, dst)
-  for k, v in pairs(src) do
-    if type(v) == 'table' and type(dst[k]) == 'table' then
-      merge_settings(v, dst[k])
-    else
-      dst[k] = v
-    end
-  end
-end
-
 if opts then
-  for k, v in pairs(opts) do
-    if type(v) == 'table' and settings[k] and type(settings[k]) == 'table' then
-      merge_settings(v, settings[k])
-    else
-      settings[k] = v
-    end
-  end
+  settings = lua_util.override_defaults(settings, opts)
 
   if not settings['enabled'] then
     rspamd_logger.infox(rspamd_config, 'module disabled in config')