From: Vsevolod Stakhov Date: Sat, 14 Feb 2026 15:35:59 +0000 (+0000) Subject: [Feature] Add structured formatter to metadata_exporter X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d3d672f2e73c936966cb9bad31927e2247a515d8;p=thirdparty%2Frspamd.git [Feature] Add structured formatter to metadata_exporter Add a 'structured' output format that emits rich msgpack metadata including UUID correlation, extracted text, base64-encoded attachment and image content, URLs, and reply detection. Uses task:get_uuid() for cross-system correlation instead of generating UUIDs in Lua. --- diff --git a/src/plugins/lua/metadata_exporter.lua b/src/plugins/lua/metadata_exporter.lua index 9b086a8ff8..65356677d2 100644 --- a/src/plugins/lua/metadata_exporter.lua +++ b/src/plugins/lua/metadata_exporter.lua @@ -28,6 +28,7 @@ local rspamd_util = require "rspamd_util" local rspamd_logger = require "rspamd_logger" local rspamd_tcp = require "rspamd_tcp" local lua_redis = require "lua_redis" +local lua_mime = require "lua_mime" local ucl = require "ucl" local E = {} local N = 'metadata_exporter' @@ -270,6 +271,81 @@ local formatters = { end return lua_util.table_to_multipart_body(parts, boundary), { multipart_boundary = boundary } + end, + structured = function(task) + local meta = get_general_metadata(task, false, false) + -- Correlation identifier + local uuid = task:get_uuid() + meta.uuid = uuid + -- Inject X-Rspamd-UUID header for IMAP/external correlation + lua_mime.modify_headers(task, { + add = { ['X-Rspamd-UUID'] = { value = uuid, order = 0 } } + }) + -- Extracted text (cleaned, reply-trimmed) + local text_result = lua_mime.extract_text_limited(task, { + max_bytes = 32768, + smart_trim = true, + }) + if text_result and text_result.text and #text_result.text > 0 then + meta.text = text_result.text + meta.text_truncated = text_result.truncated or false + end + -- Attachments and images + local attachments = {} + local images = {} + for _, part in ipairs(task:get_parts()) do + local img = part:get_image() + if img then + local content = part:get_content() + table.insert(images, { + filename = img:get_filename() or '', + content_type = img:get_type() or '', + width = img:get_width(), + height = img:get_height(), + size = part:get_length(), + content = content or '', + }) + elseif part:is_attachment() then + local mime_type, mime_subtype = part:get_type() + local content = part:get_content() + table.insert(attachments, { + filename = part:get_filename() or '', + content_type = string.format('%s/%s', mime_type or '', mime_subtype or ''), + size = part:get_length(), + digest = string.sub(part:get_digest(), 1, 16), + content = content or '', + }) + end + end + if #attachments > 0 then + meta.attachments = attachments + end + if #images > 0 then + meta.images = images + end + -- URLs + local urls = lua_util.extract_specific_urls({ + task = task, + limit = 100, + esld_limit = 10, + need_emails = false, + need_images = false, + }) + if urls and #urls > 0 then + local url_list = {} + for _, u in ipairs(urls) do + table.insert(url_list, { + url = u:get_text(), + host = u:get_host(), + tld = u:get_tld(), + }) + end + meta.urls = url_list + end + -- Reply detection + local dominated_by = task:get_header('In-Reply-To') + meta.is_reply = (dominated_by ~= nil) + return ucl.to_format(meta, 'msgpack') end } @@ -432,6 +508,54 @@ local pushers = { read = false, }) end, + redis_stream = function(task, formatted, rule) + local function do_xadd(stream_key) + local _, ret, upstream + local function redis_xadd_cb(err) + if err then + rspamd_logger.errx(task, 'got error %s when publishing to stream on server %s', + err, upstream:get_addr()) + return maybe_defer(task, rule) + end + return true + end + local args = { stream_key } + if rule.max_len then + table.insert(args, 'MAXLEN') + table.insert(args, '~') + table.insert(args, tostring(rule.max_len)) + end + table.insert(args, '*') + table.insert(args, 'data') + table.insert(args, formatted) + ret, _, upstream = lua_redis.redis_make_request(task, + redis_params, + nil, + true, + redis_xadd_cb, + 'XADD', + args + ) + if not ret then + rspamd_logger.errx(task, 'error connecting to redis') + maybe_defer(task, rule) + end + end + if rule.per_recipient then + local rcpt = task:get_recipients('smtp') + if rcpt then + for _, a in ipairs(rcpt) do + if a.addr and #a.addr > 0 then + do_xadd(rule.stream_key .. ':' .. a.addr) + end + end + else + do_xadd(rule.stream_key) + end + else + do_xadd(rule.stream_key) + end + end, } local opts = rspamd_config:get_all_opt(N) @@ -650,6 +774,9 @@ local backend_required_elements = { 'host', 'port', }, + redis_stream = { + 'stream_key', + }, } local check_element = { selector = function(k, v) @@ -709,6 +836,18 @@ backend_check.redis_pubsub = function(k, rule) rule.timeout = redis_params.timeout end end +backend_check.redis_stream = function(k, rule) + if not redis_params then + redis_params = rspamd_parse_redis_server(N) + end + if not redis_params then + rspamd_logger.errx(rspamd_config, 'No redis servers are specified') + settings.rules[k] = nil + else + backend_check.default(k, rule) + rule.timeout = redis_params.timeout + end +end setmetatable(backend_check, { __index = function() return backend_check.default