]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add structured formatter to metadata_exporter
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 14 Feb 2026 15:35:59 +0000 (15:35 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 14 Feb 2026 18:09:36 +0000 (18:09 +0000)
Add a 'structured' output format that emits rich msgpack metadata
including UUID correlation, extracted text, base64-encoded attachment
and image content, URLs, and reply detection. Uses task:get_uuid()
for cross-system correlation instead of generating UUIDs in Lua.

src/plugins/lua/metadata_exporter.lua

index 9b086a8ff80045a84479ccf8dabfae67b547eb24..65356677d250b68ddcd2dd0d5bad8e3ef08772af 100644 (file)
@@ -28,6 +28,7 @@ local rspamd_util = require "rspamd_util"
 local rspamd_logger = require "rspamd_logger"
 local rspamd_tcp = require "rspamd_tcp"
 local lua_redis = require "lua_redis"
+local lua_mime = require "lua_mime"
 local ucl = require "ucl"
 local E = {}
 local N = 'metadata_exporter'
@@ -270,6 +271,81 @@ local formatters = {
     end
     return lua_util.table_to_multipart_body(parts, boundary),
            { multipart_boundary = boundary }
+  end,
+  structured = function(task)
+    local meta = get_general_metadata(task, false, false)
+    -- Correlation identifier
+    local uuid = task:get_uuid()
+    meta.uuid = uuid
+    -- Inject X-Rspamd-UUID header for IMAP/external correlation
+    lua_mime.modify_headers(task, {
+      add = { ['X-Rspamd-UUID'] = { value = uuid, order = 0 } }
+    })
+    -- Extracted text (cleaned, reply-trimmed)
+    local text_result = lua_mime.extract_text_limited(task, {
+      max_bytes = 32768,
+      smart_trim = true,
+    })
+    if text_result and text_result.text and #text_result.text > 0 then
+      meta.text = text_result.text
+      meta.text_truncated = text_result.truncated or false
+    end
+    -- Attachments and images
+    local attachments = {}
+    local images = {}
+    for _, part in ipairs(task:get_parts()) do
+      local img = part:get_image()
+      if img then
+        local content = part:get_content()
+        table.insert(images, {
+          filename = img:get_filename() or '',
+          content_type = img:get_type() or '',
+          width = img:get_width(),
+          height = img:get_height(),
+          size = part:get_length(),
+          content = content or '',
+        })
+      elseif part:is_attachment() then
+        local mime_type, mime_subtype = part:get_type()
+        local content = part:get_content()
+        table.insert(attachments, {
+          filename = part:get_filename() or '',
+          content_type = string.format('%s/%s', mime_type or '', mime_subtype or ''),
+          size = part:get_length(),
+          digest = string.sub(part:get_digest(), 1, 16),
+          content = content or '',
+        })
+      end
+    end
+    if #attachments > 0 then
+      meta.attachments = attachments
+    end
+    if #images > 0 then
+      meta.images = images
+    end
+    -- URLs
+    local urls = lua_util.extract_specific_urls({
+      task = task,
+      limit = 100,
+      esld_limit = 10,
+      need_emails = false,
+      need_images = false,
+    })
+    if urls and #urls > 0 then
+      local url_list = {}
+      for _, u in ipairs(urls) do
+        table.insert(url_list, {
+          url = u:get_text(),
+          host = u:get_host(),
+          tld = u:get_tld(),
+        })
+      end
+      meta.urls = url_list
+    end
+    -- Reply detection
+    local dominated_by = task:get_header('In-Reply-To')
+    meta.is_reply = (dominated_by ~= nil)
+    return ucl.to_format(meta, 'msgpack')
   end
 }
 
@@ -432,6 +508,54 @@ local pushers = {
       read = false,
     })
   end,
+  redis_stream = function(task, formatted, rule)
+    local function do_xadd(stream_key)
+      local _, ret, upstream
+      local function redis_xadd_cb(err)
+        if err then
+          rspamd_logger.errx(task, 'got error %s when publishing to stream on server %s',
+            err, upstream:get_addr())
+          return maybe_defer(task, rule)
+        end
+        return true
+      end
+      local args = { stream_key }
+      if rule.max_len then
+        table.insert(args, 'MAXLEN')
+        table.insert(args, '~')
+        table.insert(args, tostring(rule.max_len))
+      end
+      table.insert(args, '*')
+      table.insert(args, 'data')
+      table.insert(args, formatted)
+      ret, _, upstream = lua_redis.redis_make_request(task,
+        redis_params,
+        nil,
+        true,
+        redis_xadd_cb,
+        'XADD',
+        args
+      )
+      if not ret then
+        rspamd_logger.errx(task, 'error connecting to redis')
+        maybe_defer(task, rule)
+      end
+    end
+    if rule.per_recipient then
+      local rcpt = task:get_recipients('smtp')
+      if rcpt then
+        for _, a in ipairs(rcpt) do
+          if a.addr and #a.addr > 0 then
+            do_xadd(rule.stream_key .. ':' .. a.addr)
+          end
+        end
+      else
+        do_xadd(rule.stream_key)
+      end
+    else
+      do_xadd(rule.stream_key)
+    end
+  end,
 }
 
 local opts = rspamd_config:get_all_opt(N)
@@ -650,6 +774,9 @@ local backend_required_elements = {
     'host',
     'port',
   },
+  redis_stream = {
+    'stream_key',
+  },
 }
 local check_element = {
   selector = function(k, v)
@@ -709,6 +836,18 @@ backend_check.redis_pubsub = function(k, rule)
     rule.timeout = redis_params.timeout
   end
 end
+backend_check.redis_stream = function(k, rule)
+  if not redis_params then
+    redis_params = rspamd_parse_redis_server(N)
+  end
+  if not redis_params then
+    rspamd_logger.errx(rspamd_config, 'No redis servers are specified')
+    settings.rules[k] = nil
+  else
+    backend_check.default(k, rule)
+    rule.timeout = redis_params.timeout
+  end
+end
 setmetatable(backend_check, {
   __index = function()
     return backend_check.default