headers_count_ignore_above = 5, -- record only N first same named headers, add 'ignored above...' if reached, set 0 to disable limit
headers_text_ignore_above = 2048, -- strip specific header value and add '...' to the end, set 0 to disable limit
symbols_nested = false,
+ urls_nested = false,
empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively
},
index_policy = {
-- 'List-Id',
-- 'X-Mailer',
},
+ collect_urls = {
+ enabled = true,
+ max_urls = 0, -- 0 = no limit; otherwise truncate (CTAs and most-repeated URLs are placed first so they survive truncation)
+ separate_cta = true, -- if true, also emit urls_cta with just CTA URLs from HTML parts
+ full_urls = false, -- log full URL text vs just host
+ full_cta_urls = false, -- log full URL text vs just host (for CTA URLs)
+ -- params passed verbatim to task:get_urls(); see https://docs.rspamd.com/lua/rspamd_task/#m1860f
+ get_url_params = {
+ content = true, -- include URLs found inside text/html content
+ images = false, -- include URLs from <img src> attributes (noisy)
+ emails = false, -- include mailto: URLs (rarely useful for phishing analysis)
+ protocols = { 'http', 'https', 'ftp' }, -- protocols to collect
+ },
+ },
geoip = {
enabled = true,
managed = true,
end
function Queue:pop()
+ -- queue already empty
if self.first > self.last then
return nil
end
local value = self.data[self.first]
self.data[self.first] = nil
self.first = self.first + 1
+ -- reset indices on drain so they don't grow without bound over the worker's lifetime
+ if self.first > self.last then
+ self.first = 1
+ self.last = 0
+ end
return value
end
return true
end
-local function get_received_delay(received_headers)
+local function get_received_info(received_headers)
local now = math.floor(rspamd_util.get_time())
local timestamp = 0
local delay = 0
+ local ips = {}
+ local seen_ips = {}
for i, received_header in ipairs(received_headers) do
-- skip first received_header as it's own relay
- if i > 1 and received_header['timestamp'] and received_header['timestamp'] > 0 then
- timestamp = received_header['timestamp']
- break
+ if i > 1 then
+ if timestamp == 0 and received_header['timestamp'] and received_header['timestamp'] > 0 then
+ timestamp = received_header['timestamp']
+ end
+ -- '...' is rspamd's "valid but unknown" placeholder; elastic's t_ip mapping rejects it,
+ -- so guard against it here even though :is_valid() returns true for that case.
+ local real_ip = received_header['real_ip']
+ if real_ip and real_ip:is_valid() then
+ local s = tostring(real_ip)
+ if s ~= '...' and not seen_ips[s] then
+ seen_ips[s] = true
+ table.insert(ips, s)
+ end
+ end
end
end
if timestamp > 0 then
delay = 0
end
end
- return delay
+ return delay, ips
+end
+
+local function url_to_record(u, full_urls, count)
+ local host = u:get_host()
+ local rec = {
+ etld = u:get_tld() or host,
+ host = host,
+ protocol = u:get_protocol(),
+ count = count,
+ }
+ if full_urls then
+ rec.url = u:get_text()
+ end
+ local flag_list = {}
+ -- url:get_flags() only inserts keys for set bits; every value is true
+ for name in pairs(u:get_flags()) do
+ table.insert(flag_list, name)
+ end
+ if #flag_list > 0 then
+ rec.flags = flag_list
+ end
+ return rec
+end
+
+-- Build dedup key for a url. full_urls = true uses full text (rspamd already deduped at C level),
+-- false collapses by host+protocol+flags so e.g. https://example.com/a and https://example.com/b
+-- merge into one entry with summed count.
+local function url_key(u, full_urls)
+ if full_urls then
+ return u:get_text()
+ end
+ return (u:get_host() or '') .. '|' ..
+ (u:get_protocol() or '') .. '|' ..
+ u:get_flags_num()
+end
+
+-- Collect, dedup, count, and sort URLs.
+-- Each url's u:get_count() gives rspamd's internal occurrence count (it tracks repetitions in C).
+-- cta_keys (optional) = set of keys (built from url_key with full_urls=false) identifying CTA URLs.
+-- cta_mode controls what happens with CTA-matching URLs:
+-- 'mark' (default) - keep them in the list, mark is_cta, sort to the top so they survive max_urls truncation
+-- 'skip' - exclude them entirely from this list (used when emitting them in a separate urls_cta field)
+-- Returns metadata object: { total, unique, max_repeats, repeat_ratio, list }
+local function build_urls_metadata(urls, max_urls, full_urls, cta_keys, cta_mode)
+ cta_mode = cta_mode or 'mark'
+ local by_key = {}
+ local order = {}
+ local total = 0
+ for _, u in ipairs(urls) do
+ local nfkey = url_key(u, false)
+ local is_cta = cta_keys and cta_keys[nfkey] or false
+ if not (is_cta and cta_mode == 'skip') then
+ local key = full_urls and url_key(u, true) or nfkey
+ local n = u:get_count() or 1
+ total = total + n
+ local entry = by_key[key]
+ if entry then
+ entry.count = entry.count + n
+ else
+ entry = {
+ url = u,
+ count = n,
+ is_cta = is_cta,
+ }
+ by_key[key] = entry
+ table.insert(order, entry)
+ end
+ end
+ end
+
+ local max_repeats = 0
+ for _, entry in ipairs(order) do
+ if entry.count > max_repeats then
+ max_repeats = entry.count
+ end
+ end
+
+ -- Sort: CTAs first (only relevant in 'mark' mode; in 'skip' mode no CTAs remain), then by count desc
+ table.sort(order, function(a, b)
+ if a.is_cta ~= b.is_cta then
+ return a.is_cta
+ end
+ return a.count > b.count
+ end)
+
+ local list = {}
+ for i, entry in ipairs(order) do
+ if max_urls > 0 and i > max_urls then
+ break
+ end
+ table.insert(list, url_to_record(entry.url, full_urls, entry.count))
+ end
+
+ local repeat_ratio = 0
+ if #order > 0 then
+ repeat_ratio = lua_util.round(total / #order, 3)
+ end
+
+ return {
+ total = total,
+ unique = #order,
+ max_repeats = max_repeats,
+ repeat_ratio = repeat_ratio,
+ list = list,
+ }
+end
+
+local function collect_cta_urls(task)
+ local cta = {}
+ local seen = {}
+
+ -- Append a URL to the CTA list (deduped), then walk its redirect chain
+ -- via url:get_redirected() so the full CTA-triggered path is captured.
+ -- url_redirector links each hop via set_redirected, so a CTA pointing at
+ -- a shortlink propagates to the resolved target via the linked_url
+ -- pointer; without walking, only the original (or first hop, depending
+ -- on get_cta_urls' return_original flag) would be marked CTA.
+ local function add_with_chain(u)
+ while u do
+ local key = u:get_text()
+ if seen[key] then
+ break
+ end
+ seen[key] = true
+ table.insert(cta, u)
+ u = u:get_redirected()
+ end
+ end
+
+ local parts = task:get_text_parts()
+ for _, part in ipairs(parts) do
+ if part:is_html() then
+ -- original=true returns the source HTML CTA, not its linked_url
+ -- one-hop-forward (the default), so the chain walk starts at the
+ -- actual entry point of the CTA.
+ local part_cta = part:get_cta_urls({ original = true })
+ for _, u in ipairs(part_cta) do
+ add_with_chain(u)
+ end
+ end
+ end
+ return cta
end
local function create_bulk_json(es_index, logs_to_send)
local push_url
local bulk_json
local logs_to_send
+ -- captured so pop_first lands on the same Queue we read from, even if the
+ -- 10x-overflow guard later replaces buffer['logs'] with Queue:new()
+ -- between request start and callback firing.
+ local task_buffer = buffer['logs']
if flush_all then
- logs_to_send = buffer['logs']:get_all()
+ logs_to_send = task_buffer:get_all()
else
- logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows'])
+ logs_to_send = task_buffer:get_first(settings['limits']['max_rows'])
end
nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows
if nlogs_to_send > 0 then
end
-- proccess results
if push_done then
- buffer['logs']:pop_first(nlogs_to_send)
+ task_buffer:pop_first(nlogs_to_send)
buffer['errors'] = 0
upstream:ok()
else
rspamd_logger.errx(log_object,
'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
- buffer['logs']:pop_first(nlogs_to_send)
+ task_buffer:pop_first(nlogs_to_send)
buffer['errors'] = 0
else
buffer['errors'] = buffer['errors'] + 1
r.rspamd_server = rspamd_hostname or empty
r.digest = task:get_digest() or empty
+ local has_pre_result, _, _, pre_result_module = task:has_pre_result()
+ r.action_forced = has_pre_result and (pre_result_module or 'unknown module') or 'no force'
+
r.action = task:get_metric_action() or empty
r.score = task:get_metric_score()[1] or 0
r.symbols = task:get_symbols_all()
r.ip = '::'
r.is_local = false
local ip_addr = task:get_ip()
- if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
- r.is_local = ip_addr:is_local()
- r.ip = tostring(ip_addr)
+ if ip_addr and ip_addr:is_valid() then
+ local s = tostring(ip_addr)
+ if s ~= '...' then
+ r.is_local = ip_addr:is_local()
+ r.ip = s
+ end
end
r.sender_ip = '::'
origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
local rspamd_ip = require "rspamd_ip"
local origin_ip = rspamd_ip.from_string(origin)
- if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
- r.sender_ip = tostring(origin_ip)
+ if origin_ip and origin_ip:is_valid() then
+ local s = tostring(origin_ip)
+ if s ~= '...' then
+ r.sender_ip = s
+ end
end
end
end
end
+ r.reply_to_user = empty
+ r.reply_to_domain = empty
+ local reply_to_hdr = task:get_header_full('Reply-To')
+ if reply_to_hdr and reply_to_hdr[1] and reply_to_hdr[1].decoded then
+ local reply_to = rspamd_util.parse_mail_address(reply_to_hdr[1].decoded, task:get_mempool())
+ if reply_to and reply_to[1] and
+ reply_to[1].user and #reply_to[1].user > 0 and
+ reply_to[1].domain and #reply_to[1].domain > 0
+ then
+ r.reply_to_user = reply_to[1].user
+ r.reply_to_domain = reply_to[1].domain:lower()
+ end
+ end
+
local settings_id = task:get_settings_id()
if settings_id then
-- Convert to string
local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings')
r.fuzzy_hashes = fuzzy_hashes or empty
- r.received_delay = get_received_delay(task:get_received_headers())
+ local received_delay, received_ips = get_received_info(task:get_received_headers())
+ r.received_delay = received_delay
+ if #received_ips > 0 then
+ r.received_ips = received_ips
+ else
+ r.received_ips = '::'
+ end
+
+ if settings['collect_urls']['enabled'] then
+ local task_urls = task:get_urls(settings['collect_urls']['get_url_params'])
+
+ local cta_urls = collect_cta_urls(task)
+ local cta_keys = {}
+ for _, u in ipairs(cta_urls) do
+ cta_keys[url_key(u, false)] = true
+ end
+
+ -- when separate_cta is on, exclude CTAs from urls (they live in urls_cta);
+ -- otherwise mark them and sort to the top so they survive max_urls truncation
+ local cta_mode = settings['collect_urls']['separate_cta'] and 'skip' or 'mark'
+
+ r.urls = build_urls_metadata(task_urls,
+ settings['collect_urls']['max_urls'],
+ settings['collect_urls']['full_urls'],
+ cta_keys, cta_mode)
+
+ if settings['collect_urls']['separate_cta'] and #cta_urls > 0 then
+ r.urls_cta = build_urls_metadata(cta_urls,
+ settings['collect_urls']['max_urls'],
+ settings['collect_urls']['full_cta_urls'],
+ nil, nil)
+ end
+ end
return r
end
-- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result,
-- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}'
- local index_policy_json = ''
+ local index_policy_json
-- elastic lifecycle policy with hot state
if detected_distro['name'] == 'elastic' then
-- opensearch state delete
if settings['index_policy']['delete']['enabled'] then
local prev_state_id = state_id
- state_id = state_id + 1
+ -- state_id = state_id + 1 -- delete is the last state, no further actions appended; bump kept commented for symmetry with warm/cold
index_policy['policy']['states'][prev_state_id]['transitions'] = {
{
state_name = 'delete',
symbols_obj['type'] = 'nested'
end
+ local urls_list_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ url = t_text_with_keyword,
+ etld = t_keyword,
+ host = t_keyword,
+ protocol = t_keyword,
+ flags = t_keyword,
+ count = t_long,
+ },
+ }
+ if settings['index_template']['urls_nested'] then
+ urls_list_obj['type'] = 'nested'
+ end
+ local urls_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ total = t_long,
+ unique = t_long,
+ max_repeats = t_long,
+ repeat_ratio = t_float,
+ list = urls_list_obj,
+ },
+ }
+
-- dynamic templates
local dynamic_templates_obj = {}
local dynamic_strings = {
properties = {
rspamd_server = t_keyword,
digest = t_keyword,
+ action_forced = t_keyword,
action = t_keyword,
score = t_double,
symbols = symbols_obj,
from_domain = t_keyword,
mime_from_user = t_keyword,
mime_from_domain = t_keyword,
+ reply_to_user = t_keyword,
+ reply_to_domain = t_keyword,
settings_id = t_keyword,
asn = asn_obj,
scan_time = t_float,
non_en = t_boolean_nil_true,
fuzzy_hashes = t_text,
received_delay = t_long,
+ received_ips = t_ip,
+ urls = urls_obj,
+ urls_cta = urls_obj,
},
},
},
local opts = rspamd_config:get_all_opt('elastic')
-local function merge_settings(src, dst)
- for k, v in pairs(src) do
- if type(v) == 'table' and type(dst[k]) == 'table' then
- merge_settings(v, dst[k])
- else
- dst[k] = v
- end
- end
-end
-
if opts then
- for k, v in pairs(opts) do
- if type(v) == 'table' and settings[k] and type(settings[k]) == 'table' then
- merge_settings(v, settings[k])
- else
- settings[k] = v
- end
- end
+ settings = lua_util.override_defaults(settings, opts)
if not settings['enabled'] then
rspamd_logger.infox(rspamd_config, 'module disabled in config')