]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] mx_check: three-layer cache rewrite (#6055)
authorDmytro Alieksieiev <1865999+dragoangel@users.noreply.github.com>
Fri, 29 May 2026 10:30:33 +0000 (12:30 +0200)
committerGitHub <noreply@github.com>
Fri, 29 May 2026 10:30:33 +0000 (11:30 +0100)
* [Feature] mx_check: three-layer cache rewrite

This is the comprehensive implementation behind issue #6032. The single-
layer cache from previous shape is replaced by a three-layer Redis design
(d:<domain> / m:<mxhost> / i:<ip>) under <key_prefix>:. Short-code wire
formats minimise Redis footprint; per-layer validators
(is_valid_cache_value) treat unrecognised entries as a cache miss;
the resolve / probe path that follows then issues a fresh cache_set at
the same key, overwriting the bad entry in place.

Probe coordination

- SET NX EX claims the i:<ip> probe lock; a post-claim GET disambiguates
  held lock, already-published verdict, and corrupted-value-needing-heal
  cases. A separate force_claim_probe_lock path overwrites corrupted
  values to break the SET NX loop without leaking refcounts.
- Redis errors during the lock claim surface as MX_REDIS_ERROR; lock held
  by another worker surfaces as MX_INFLIGHT and skips duplicated TCP
  connections which under high-load would result in DoS like activity
  from the target side and most likely will negatively impact Rspamd's
  user IP/ASN/Org reputation.

DNS / probe model

- Dual-stack via probe_ipv4 / probe_ipv6 / prefer_ipv6 with family-tagged
  cache values (v4: / v6: / v64:) and coverage checks so flipping the
  probe-family set re-resolves only as needed.
- Real DNS path failures (SERVFAIL / REFUSED / timeout) are distinguished
  from authoritative NXDOMAIN / NOREC via is_dns_real_failure; the former
  surface as MX_DNS_FAIL (cached as 'df') so a recovered resolver path
  can be re-tried promptly. NXDOMAIN/NOREC collapse into MX_NONE.
- step3 partitions resolved IPs into PUBLIC / LOCAL (RFC1918 / CGNAT /
  ULA) / BOGON (loopback, TEST-NET, multicast, link-local, etc.). Only
  PUBLIC IPs reach the TCP probe. MX_LOCAL_ONLY / MX_LOCAL_MIX /
  MX_BOGON_ONLY / MX_BOGON_MIX fire with the offending IPs as options.
  test_mode lifts loopback out of the bogon set so the probe path can be
  exercised against 127.0.0.1.

Symbol surface

- Multi-source: check_from / check_mime_from / check_reply_to with
  envelope > reply-to > mime-from priority dedup if same domain is hitting
  MX checks from different sources. Per-source prefixes
  (symbol_prefix_from / symbol_prefix_mime_from / symbol_prefix_reply_to)
  fan every MX_* symbol across the three sources at registration time.
- A-fallback path (no MX RR, A used as implicit MX per RFC 5321 §5.1)
  has its own MX_A_* symbol family so operators can score it
  independently of the MX-RR path.
- Per-outcome greylist and reject gates (greylist_invalid /
  greylist_none / greylist_broken / ..., reject_null_mx with
  reject_authorized / reject_local kill switches); null-MX domains can
  now trigger a real set_pre_result. reject_nxdomain_mx removed
  as bad option to serve, practically nxdomain reject would be good only
  on eTLD+1.
- Probe-outcome symbols (MX_GOOD / MX_TIMEOUT_* / MX_REFUSED /
  MX_INVALID / MX_ERROR / MX_INFLIGHT) populate the option field with
  the MX hostname; IP-class symbols still carry IPs since that's where
  IP information is the point. MX_REDIS_ERROR has no option (it's a
  module-internal signal).
- New punishment maps: bad_mxs (glob on MX hostnames) and bad_ips
  (radix on resolved IPs). Any hit short-circuits with MX_BAD /
  MX_IP_BAD before any TCP probe runs which allows to punish
  domains which shares same MX infra.

Scoring

- set_metric_all_sources ships sensible defaults for every symbol.
  Operators can tune any weight through the new "mx" group in
  conf/groups.conf via local.d/mx_group.conf or override.d/
  mx_group.conf without touching the module.

Functional tests

- 167_mx_check.robot refreshed for the new symbol set; MX_NONE replaces
  MX_NXDOMAIN/MX_MISSING, MX_A_REFUSED covers the closed-port
  A-fallback case, and MX_BAD / MX_IP_BAD have dedicated assertions.
- 168_mx_check_greeting.robot covers verify_greeting=true /
  send_quit=false: silent listener -> MX_TIMEOUT_READ; continuation
  220- with no follow-up held past read_timeout -> MX_GOOD (a
  regression that re-queued reads under send_quit=false would surface
  as MX_TIMEOUT_READ); 5xx greeting -> MX_ERROR; non-SMTP line ->
  MX_INVALID.
- 169_mx_check_greeting_quit.robot covers verify_greeting=true /
  send_quit=true: proper multi-line timing -> MX_GOOD plus dummy
  status file QUIT_AFTER_FINAL (catches a regression where QUIT is
  sent before the final 220 line, which rspamd's verdict alone cannot
  detect); slow second line -> MX_TIMEOUT_READ.
- util/dummy_smtp.py mock with silent / error / messy / greeting_single
  / greeting_multi modes and a --status-file argument for out-of-band
  timing verification.

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Feature] mx_check: optional per-entry weight multiplier for bad_mxs / bad_ips

Both bad_mxs (glob) and bad_ips (radix) entries can now carry an optional numeric second token that is read as a weight multiplier on top of the MX_BAD / MX_IP_BAD group score. Examples: `trapmx.example.com 3` triples the weight; `1.2.3.4 0.5` halves it. Default multiplier is 1.0 (no value or non-numeric value). Lets operators tier confidence within a single map without maintaining several.

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] Use static parent callback in mx_check module

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] Add missing executable flag on dummy_smtp python script

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Chore] Add group to parent mx_check symbol

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
* [Fix] change rspamd_config:add_map to lua_maps so inline maps works too, adjust autotests so they survive parallelism

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
---------

Signed-off-by: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Co-authored-by: Vsevolod Stakhov <vsevolod@rspamd.com>
conf/groups.conf
conf/modules.d/mx_check.conf
src/plugins/lua/mx_check.lua
test/functional/cases/167_mx_check.robot
test/functional/cases/168_mx_check_greeting.robot [new file with mode: 0644]
test/functional/cases/169_mx_check_greeting_quit.robot [new file with mode: 0644]
test/functional/configs/mx_check-dns.conf
test/functional/configs/mx_check-greeting-quit.conf [new file with mode: 0644]
test/functional/configs/mx_check-greeting.conf [new file with mode: 0644]
test/functional/configs/mx_check.conf
test/functional/util/dummy_smtp.py [new file with mode: 0755]

index ba769ee7ca0d37f4fd774a99991647504a23be8c..df1d1734e94974b19c76e08f42e87bd223ca2eef 100644 (file)
@@ -65,6 +65,12 @@ group "policies" {
     .include(try=true; priority=10) "$LOCAL_CONFDIR/override.d/policies_group.conf"
 }
 
+# MX_*, REPLYTO_MX_*, MIME_FROM_MX_*
+group "mx" {
+    .include(try=true; priority=1; duplicate=merge) "$LOCAL_CONFDIR/local.d/mx_group.conf"
+    .include(try=true; priority=10) "$LOCAL_CONFDIR/override.d/mx_group.conf"
+}
+
 group "whitelist" {
     .include "$CONFDIR/scores.d/whitelist_group.conf"
     .include(try=true; priority=1; duplicate=merge) "$LOCAL_CONFDIR/local.d/whitelist_group.conf"
index 62af12cfe0c8095ec20562bcc51a59733b66a660..c9483d6e3f9bfac6c2d34057c150ac5d3ab4ca98 100644 (file)
 #
 # Module documentation can be found at  https://rspamd.com/doc/modules/mx_check.html
 
-# This module is *DISABLED* by default
-# If you need to enable it, then define the following line in
-# local.d/mx_check.conf:
-#
-# enabled = true;
-#
-# You also need to define redis servers for this module
+# Disabled by default. Set `enabled = true` in local.d/mx_check.conf and
+# define redis servers.
 
 mx_check {
-  # Per-phase TCP timeouts (seconds). The effective symbol budget is
-  # connect_timeout + read_timeout + dns.timeout (see options.inc);
-  # task_timeout must be higher than that sum to avoid forced termination.
-  #
-  # NOTE: `connect_timeout` and `verify_greeting` are intentionally NOT set
-  # in this shipped file. They carry legacy aliases (`timeout` ->
-  # connect_timeout, `wait_for_greeting` -> verify_greeting) that the module
-  # maps only when the new key is absent from the merged config. Setting them
-  # here would make every config always carry them, so a legacy alias placed
-  # in local.d/mx_check.conf would be silently ignored. Their defaults live
-  # in the module itself; the values below are shown for documentation only.
-  #
-  # connect_timeout = 1.0;   # default; legacy alias: `timeout`
-  # read_timeout is only used when verify_greeting = true.
+  enabled = false;
+
+  # TCP probe timeouts. read_timeout only used with verify_greeting = true.
+  connect_timeout = 2.0;
   read_timeout = 5.0;
 
-  # SMTP greeting validation. When verify_greeting = true the probe reads
-  # the SMTP banner, validates the reply code (220 -> good, 4xx/5xx ->
-  # MX_ERROR, other -> MX_INVALID) and handles multi-line banners
-  # correctly. send_quit, when also true, issues `QUIT` after the final
-  # banner line for a clean shutdown.
-  # verify_greeting = false;   # default; legacy alias: `wait_for_greeting`
+  port = 25;
+
+  # When true, probe reads SMTP banner and validates 3-digit reply code.
+  # send_quit issues QUIT after the banner on success.
+  verify_greeting = false;
   send_quit = false;
 
-  # Cache TTLs (seconds).
-  expire = 86400;          # successful outcomes
-  expire_novalid = 7200;   # hard failures (refused, invalid, nxdomain, null, broken, error)
-  expire_timeout = 1800;   # transient timeouts (so recovery surfaces quickly)
+  # Cache TTLs (seconds). expire_dns = 0 disables d:/m: caching.
+  # Read timeout (tr) rides `expire`: TCP connected, listener alive --
+  # almost always a long greeting delay (postscreen / tarpit), not dead.
+  expire = 86400;          # i: good verdict + error:<code> + read timeout (1d)
+  expire_dns = 1800;       # d:/m: DNS results (30m)
+  expire_novalid = 14400;  # i: hard failures (4h)
+  expire_timeout = 7200;   # i: connect timeout (2h)
 
-  # Force-reject behaviour. Defaults are off; flip these only after
-  # auditing your operator scores and DNS reliability.
-  reject_nxdomain = false;
   reject_null_mx = false;
+  reject_null_mx_message = "Domain published RFC 7505 Null MX";
 
-  # Run scope. By default mx_check only inspects untrusted inbound mail:
-  # authenticated and local-network senders are skipped as our own traffic.
-  # ESP / outbound deployments can opt into checking that traffic too.
+  # Never force-reject authenticated / locally-originated traffic.
+  reject_authorized = false;
+  reject_local = false;
+
+  # Greylist advice for recoverable failures.
+  greylist_invalid = true;
+  greylist_none = true;
+  greylist_broken = true;
+  greylist_refused = true;
+  greylist_null = true;
+  greylist_timeout_connect = true;
+  # Postfix postscreen delays the banner ~6s; raise read_timeout to 7-10s
+  # before disabling this.
+  greylist_timeout_read = true;
+
+  # Never greylist authenticated / locally-originated traffic.
+  greylist_authorized = false;
+  greylist_local = false;
+
+  # Opt back into checking authenticated / local traffic (ESP use case).
   check_authorized = false;
   check_local = false;
 
-  # Per-layer trust/skip maps (optional; no default). Each accepts a file
-  # path, URL, or inline list.
-  #   exclude_domains - sender domains to skip entirely (emit MX_WHITE).
-  #   exclude_mxs     - trusted MX hostnames; a hit short-circuits the whole
-  #                     check with MX_WHITE. Glob map: a bare hostname matches
-  #                     exactly, an explicit `*.foo` matches subdomains.
-  #   exclude_ips     - IP/CIDR ranges dropped from the probe set. If the set
-  #                     empties as a result, MX_SKIP fires.
-  # exclude_domains = "$LOCAL_CONFDIR/local.d/mx_exclude_domains.map";
-  # exclude_mxs = "$LOCAL_CONFDIR/local.d/mx_exclude_mxs.map";
-  # exclude_ips = "$LOCAL_CONFDIR/local.d/mx_exclude_ips.map";
-
-  # MX targets resolving into private (RFC1918/CGNAT/ULA) or non-routable
-  # (loopback/TEST-NET/multicast/...) address space are never probed and are
-  # flagged with MX_LOCAL_* / MX_BOGON_*. These ranges are a fixed correctness
-  # invariant and are not operator-tunable. `test_mode` is FOR TESTING ONLY:
-  # it makes loopback probeable so the probe path can be exercised against a
-  # local listener. Never enable it in production.
-  # test_mode = false;
-
-  # Symbol names (primary surface; today's behaviour preserved).
+  # Source domains. One probe + one symbol per unique domain, prefixed
+  # per symbol_prefix_* (highest-priority source wins: from > reply_to > mime_from).
+  check_from = true;
+  check_mime_from = true;
+  check_reply_to = true;
+
+  # Address-family controls. probe_ipv6 stays false until outbound v6:25
+  # is verified end-to-end. prefer_ipv6 only takes effect with both on.
+  probe_ipv4 = true;
+  probe_ipv6 = false;
+  prefer_ipv6 = true;
+
+  # Cap on MX targets (step 2) and A/AAAA fan-out (step 3). Smaller =
+  # fewer Redis lookups on cache miss. Probe still only dials ips[1].
+  max_mx_a_records = 3;
+
+  # Per-source symbol prefixes. Envelope-from is unprefixed.
+  symbol_prefix_from = "";
+  symbol_prefix_mime_from = "MIME_FROM_";
+  symbol_prefix_reply_to = "REPLYTO_";
+
+  # MX_INVALID fires only when TCP accepts but the listener doesn't speak
+  # SMTP. Every other failure has its own symbol below.
   symbol_bad_mx = "MX_INVALID";
-  symbol_no_mx = "MX_MISSING";
   symbol_good_mx = "MX_GOOD";
   symbol_white_mx = "MX_WHITE";
 
-  # Finer outcome symbols (Phase A/C: score 0, informational).
+  # MX-RR path outcomes.
   symbol_mx_refused = "MX_REFUSED";
   symbol_mx_timeout_connect = "MX_TIMEOUT_CONNECT";
   symbol_mx_timeout_read = "MX_TIMEOUT_READ";
   symbol_mx_error = "MX_ERROR";
-  symbol_mx_nxdomain = "MX_NXDOMAIN";
+  symbol_mx_none = "MX_NONE";
   symbol_mx_null = "MX_NULL";
   symbol_mx_broken = "MX_BROKEN";
-  symbol_mx_local_only = "MX_LOCAL_ONLY";
+
+  # A-fallback path (RFC 5321 §5.1: no MX, A used as implicit MX).
+  # Stronger evidence than MX-RR equivalents -- parked-domain shape.
+  symbol_mx_a_good = "MX_A_GOOD";
+  symbol_mx_a_refused = "MX_A_REFUSED";
+  symbol_mx_a_timeout_connect = "MX_A_TIMEOUT_CONNECT";
+  symbol_mx_a_timeout_read = "MX_A_TIMEOUT_READ";
+  symbol_mx_a_error = "MX_A_ERROR";
+  symbol_mx_a_invalid = "MX_A_INVALID";
+
+  # Our-side DNS path failure (SERVFAIL/REFUSED/timeout). Sender not at fault.
+  symbol_mx_dns_fail = "MX_DNS_FAIL";
+
+  # IP-class symbols. _ONLY = every IP in class, no probe run.
+  # _MIX = some in class, public subset probed.
+  symbol_mx_local_only = "MX_LOCAL_ONLY";    # RFC1918 / CGNAT / ULA
   symbol_mx_local_mix = "MX_LOCAL_MIX";
-  symbol_mx_bogon_only = "MX_BOGON_ONLY";
+  symbol_mx_bogon_only = "MX_BOGON_ONLY";    # loopback, TEST-NET, multicast, etc.
   symbol_mx_bogon_mix = "MX_BOGON_MIX";
-  symbol_mx_skip = "MX_SKIP";
 
-  # Redis key prefix. The three cache layers live under <key_prefix>:d:<domain>,
-  # <key_prefix>:m:<mx_host>, and <key_prefix>:i:<ip>. Old single-layer keys
-  # of the form <key_prefix><domain> from previous releases become orphans
-  # and age out under their own TTL.
-  key_prefix = "rmx";
+  symbol_mx_skip = "MX_SKIP";                  # exclude_ips dropped every routable IP
+  symbol_mx_bad = "MX_BAD";                    # bad_mxs hit: punishment short-circuit
+  symbol_mx_ip_bad = "MX_IP_BAD";              # bad_ips hit: punishment short-circuit
+  symbol_mx_inflight = "MX_INFLIGHT";          # another worker holds the i-layer lock
+  symbol_mx_redis_error = "MX_REDIS_ERROR";    # Redis failed during lock claim; probe skipped
 
-  # !!! Disabled by default !!!
-  enabled = false;
+  # Per-layer trust/skip maps. exclude_domains/exclude_mxs are trust hits
+  # (-> MX_WHITE). exclude_ips drops IPs from the probe set; full match
+  # fires MX_SKIP.
+  # exclude_domains = "/etc/rspamd/local.d/maps.d/mx_check_exclude_domains.inc";
+  # exclude_mxs = "/etc/rspamd/local.d/maps.d/mx_check_mx_whitelist.inc";
+  # exclude_ips = "/etc/rspamd/local.d/maps.d/mx_check_ip_skip.inc";
+
+  # Punishment maps. Hit -> short-circuit with MX_BAD / MX_IP_BAD. bad_mxs
+  # is glob on MX hostnames; bad_ips is radix on resolved MX IPs. No probing
+  # or further symbol emission past the match.
+  # bad_mxs = "/etc/rspamd/local.d/maps.d/mx_check_bad_mxs.inc";
+  # bad_ips = "/etc/rspamd/local.d/maps.d/mx_check_bad_ips.inc";
+
+  # Cache layers live under <key_prefix>:d:<domain>, :m:<mx>, :i:<ip>.
+  key_prefix = "rmx";
 
   .include(try=true,priority=5) "${DBDIR}/dynamic/mx_check.conf"
   .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/mx_check.conf"
index c071a3d4eaef526aa0109c994a9e0810f797401e..08d219a6d1819463b5885f601bf5381e1d1211c2 100644 (file)
@@ -18,18 +18,20 @@ if confighelp then
   return
 end
 
--- MX check plugin — Phase A rewrite (issue #6032 step 2)
+-- MX check plugin.
 --
--- Three-layer Redis cache (d:/m:/i:) under <key_prefix>:; probe shapes split
--- via lua_tcp's on_error + phased timeouts (PR #6034); multi-line SMTP banner
--- parsing under verify_greeting/send_quit (wait_for_greeting deprecated).
--- Finer outcome symbols (MX_REFUSED, MX_TIMEOUT_*, MX_ERROR, MX_NULL,
--- MX_BROKEN, MX_NXDOMAIN) emit at score 0 for tuning data; primary symbols
--- (MX_GOOD/MX_INVALID/MX_MISSING/MX_WHITE) preserve today's behaviour.
+-- Three-layer Redis cache (d:<domain> / m:<mxhost> / i:<ip>) under
+-- <key_prefix>:. Two TCP probe shapes: plain connect-only, or connect +
+-- multi-line SMTP banner validation (verify_greeting / send_quit). Resolved
+-- IPs are classified into PUBLIC / LOCAL (RFC1918, CGNAT, ULA) / BOGON
+-- (loopback, TEST-NET, multicast, link-local, etc.) before any probe runs.
+-- Optional trust/skip maps at each cache layer (exclude_domains, exclude_mxs,
+-- exclude_ips). Symbols at any cache layer can short-circuit further work.
 
 local rspamd_logger = require "rspamd_logger"
 local rspamd_tcp = require "rspamd_tcp"
 local rspamd_util = require "rspamd_util"
+local rspamd_ip = require "rspamd_ip"
 local lua_util = require "lua_util"
 local lua_redis = require "lua_redis"
 local lua_maps = require "lua_maps"
@@ -42,28 +44,60 @@ local E = {}
 local DNS_ERR_NXDOMAIN = 'no records with this name'
 local DNS_ERR_NOREC = 'requested record is not found'
 
+-- Source-dedup priority (lower wins)
+local SOURCE_PRIORITY = { from = 1, reply_to = 2, mime_from = 3 }
+
+-- Anything that isn't NXDOMAIN/NOREC is a real DNS path problem (SERVFAIL,
+-- timeout, unreachable resolver) -- don't blame the sender.
+local function is_dns_real_failure(err)
+  return err and err ~= DNS_ERR_NXDOMAIN and err ~= DNS_ERR_NOREC
+end
+
+-- Lowercase DNS names so byte-exact Redis keys don't miss on case variance
+-- (RFC 1035 §2.3.3). Returns nil for non-string/empty input.
+local function norm_name(name)
+  if type(name) ~= 'string' or #name == 0 then return nil end
+  return string.lower(name)
+end
+
+
 local settings = {
-  -- timeouts (phased; legacy `timeout` parsed at config load as a fallback)
-  connect_timeout = 1.0,
+  -- Per-phase TCP timeouts. read_timeout is only used with verify_greeting.
+  connect_timeout = 2.0,
   read_timeout = 5.0,
 
-  -- greeting controls
+  -- SMTP banner validation. verify_greeting reads and code-checks the banner;
+  -- send_quit issues QUIT after the final banner line on success.
   verify_greeting = false,
   send_quit = false,
 
-  -- cache TTLs
-  expire = 86400, -- successful outcomes
-  expire_novalid = 7200, -- hard failures
-  expire_timeout = 1800, -- transient timeouts (so recovery surfaces quickly)
+  -- Cache TTLs. expire_dns = 0 disables d:/m: caching entirely.
+  expire = 86400,         -- i: good probe verdict + SMTP-error code + read timeout (1d)
+  expire_dns = 1800,      -- d:/m: DNS results (30m; 0 = disable)
+  expire_novalid = 14400, -- i: hard failures (refused / invalid) (4h)
+  expire_timeout = 7200,  -- i: connect timeout (2h)
 
-  -- behaviour
-  reject_nxdomain = false,
   reject_null_mx = false,
-  greylist_invalid = true,
+  reject_null_mx_message = 'Domain published RFC 7505 Null MX',
 
-  -- run scope: by default mx_check skips authenticated and local-network
-  -- senders (they are our own traffic). ESP/outbound deployments can flip
-  -- these on to check outgoing mail too.
+  -- Never force-reject authenticated / locally-originated traffic.
+  reject_authorized = false,
+  reject_local = false,
+
+  -- Greylist advice on recoverable failures.
+  greylist_invalid = true,
+  greylist_none = true,
+  greylist_broken = true,
+  greylist_refused = true,
+  greylist_null = true,
+  greylist_timeout_connect = true,
+  greylist_timeout_read = true,
+
+  -- Never greylist authenticated / locally-originated traffic.
+  greylist_authorized = false,
+  greylist_local = false,
+
+  -- Opt back into checking authenticated / locally-originated traffic.
   check_authorized = false,
   check_local = false,
 
@@ -72,98 +106,212 @@ local settings = {
   -- against a local listener. NEVER enable this in production.
   test_mode = false,
 
-  -- maps / cache
+  -- Source domains. One probe + one symbol per unique domain;
+  -- envelope > reply-to > mime-from picks the symbol prefix.
+  check_from = true,
+  check_mime_from = true,
+  check_reply_to = true,
+
+  -- Address-family controls. Both off disables the module at config-load.
+  probe_ipv4 = true,
+  probe_ipv6 = false,
+  prefer_ipv6 = true,
+
   key_prefix = 'rmx',
-  max_mx_a_records = 5,
+  -- Cap MX list (step 2) and A/AAAA fan-out (step 3).
+  max_mx_a_records = 3,
 
-  -- SMTP port (configurable so the module is testable on unprivileged ports;
-  -- production should leave this at 25).
   port = 25,
 
-  -- primary symbols (today's surface)
+  -- Per-source symbol prefixes. Envelope-from is unprefixed.
+  symbol_prefix_from = '',
+  symbol_prefix_mime_from = 'MIME_FROM_',
+  symbol_prefix_reply_to = 'REPLYTO_',
+
+  -- Primary symbols.
   symbol_bad_mx = 'MX_INVALID',
-  symbol_no_mx = 'MX_MISSING',
   symbol_good_mx = 'MX_GOOD',
   symbol_white_mx = 'MX_WHITE',
 
-  -- finer symbols (Phase A: score 0, informational)
+  -- Finer outcome symbols (MX-RR path).
   symbol_mx_refused = 'MX_REFUSED',
   symbol_mx_timeout_connect = 'MX_TIMEOUT_CONNECT',
   symbol_mx_timeout_read = 'MX_TIMEOUT_READ',
   symbol_mx_error = 'MX_ERROR',
-  symbol_mx_nxdomain = 'MX_NXDOMAIN',
+  symbol_mx_none = 'MX_NONE',
   symbol_mx_null = 'MX_NULL',
   symbol_mx_broken = 'MX_BROKEN',
+  symbol_mx_dns_fail = 'MX_DNS_FAIL',
+
+  -- A-fallback path symbols (RFC 5321 §5.1: no MX RR, A used as implicit MX).
+  symbol_mx_a_good = 'MX_A_GOOD',
+  symbol_mx_a_refused = 'MX_A_REFUSED',
+  symbol_mx_a_timeout_connect = 'MX_A_TIMEOUT_CONNECT',
+  symbol_mx_a_timeout_read = 'MX_A_TIMEOUT_READ',
+  symbol_mx_a_error = 'MX_A_ERROR',
+  symbol_mx_a_invalid = 'MX_A_INVALID',
 
-  -- IP-class symbols (Phase C: score 0, informational)
+  -- IP-class symbols.
   symbol_mx_local_only = 'MX_LOCAL_ONLY',
   symbol_mx_local_mix = 'MX_LOCAL_MIX',
   symbol_mx_bogon_only = 'MX_BOGON_ONLY',
   symbol_mx_bogon_mix = 'MX_BOGON_MIX',
+
+  -- Per-layer trust/skip maps. exclude_domains and exclude_mxs are trust
+  -- statements (hit -> MX_WHITE, short-circuit). exclude_ips is a probe-set
+  -- filter (hit -> drop IP; full match -> MX_SKIP).
   symbol_mx_skip = 'MX_SKIP',
+
+  -- Punishment maps (mirror exclude_mxs / exclude_ips). bad_mxs is glob on
+  -- MX hostnames; bad_ips is radix on resolved IPs. Any hit short-circuits
+  -- the lookup with the corresponding symbol; no further probing happens.
+  symbol_mx_bad = 'MX_BAD',
+  symbol_mx_ip_bad = 'MX_IP_BAD',
+
+  -- Another worker holds the i-layer probe lock.
+  symbol_mx_inflight = 'MX_INFLIGHT',
+
+  -- Redis error during lock claim; probe skipped.
+  symbol_mx_redis_error = 'MX_REDIS_ERROR',
 }
 
--- RFC-defined address ranges, classified for MX-target probing. An MX RR
--- resolving into LOCAL space is unprobeable from our vantage point (we would
--- reach our own LAN, not the publisher's); one resolving into BOGON space has
--- no legitimate meaning at all and is a packet-injection footgun. Both sets
--- are a fixed correctness invariant and are deliberately not operator-tunable.
-local LOCAL_RANGES = {
-  '10.0.0.0/8',      -- RFC 1918 private
-  '172.16.0.0/12',   -- RFC 1918 private
-  '192.168.0.0/16',  -- RFC 1918 private
-  '100.64.0.0/10',   -- RFC 6598 CGNAT
-  'fc00::/7',        -- RFC 4193 IPv6 unique-local
+-- Static IP-class ranges; module-private radix maps built at config-load.
+local LOCAL_CIDRS = {
+  -- IPv4 RFC 1918
+  '10.0.0.0/8',
+  '172.16.0.0/12',
+  '192.168.0.0/16',
+  -- IPv4 CGNAT (RFC 6598)
+  '100.64.0.0/10',
+  -- IPv6 unique-local (RFC 4193)
+  'fc00::/7',
 }
-local BOGON_RANGES = {
-  '0.0.0.0/8',       -- RFC 1122 "this network"
-  '127.0.0.0/8',     -- loopback (dropped from the set under test_mode)
-  '169.254.0.0/16',  -- RFC 3927 link-local (APIPA)
-  '192.0.0.0/24',    -- RFC 6890 IETF protocol assignments
-  '192.0.2.0/24',    -- RFC 5737 TEST-NET-1
-  '192.88.99.0/24',  -- RFC 7526 6to4 relay anycast (deprecated)
-  '198.18.0.0/15',   -- RFC 2544 benchmarking
-  '198.51.100.0/24', -- RFC 5737 TEST-NET-2
-  '203.0.113.0/24',  -- RFC 5737 TEST-NET-3
-  '224.0.0.0/4',     -- RFC 5771 IPv4 multicast
-  '240.0.0.0/4',     -- RFC 1112 reserved (incl. 255.255.255.255 broadcast)
-  '::/128',          -- IPv6 unspecified
-  '::1/128',         -- IPv6 loopback (dropped from the set under test_mode)
-  '64:ff9b::/96',    -- RFC 6052 NAT64
-  -- NB: the IPv4-mapped range ::ffff:0:0/96 is deliberately NOT listed.
-  -- rspamd's radix stores every IPv4 address as its v4-mapped form, so that
-  -- prefix would match all IPv4 traffic. v4-mapped IPv6 MX targets can only
-  -- arise once AAAA probing lands and must be rejected before the radix test.
-  '100::/64',        -- RFC 6666 IPv6 discard-only
-  '2001:db8::/32',   -- RFC 3849 IPv6 documentation
-  'fe80::/10',       -- IPv6 link-local
-  'ff00::/8',        -- IPv6 multicast
+
+-- Loopback prefixes lifted out of BOGON_CIDRS at config-load when test_mode
+-- is on, so the probe path can be exercised against a local listener.
+local LOOPBACK_CIDRS = { ['127.0.0.0/8'] = true, ['::1/128'] = true }
+
+local BOGON_CIDRS = {
+  -- Loopback (dropped under test_mode)
+  '127.0.0.0/8',
+  '::1/128',
+  -- Link-local (APIPA / IPv6 link-local)
+  '169.254.0.0/16',
+  'fe80::/10',
+  -- "This network" (RFC 1122; source-only)
+  '0.0.0.0/8',
+  -- IETF protocol assignments
+  '192.0.0.0/24',
+  -- TEST-NET-1/2/3 (documentation)
+  '192.0.2.0/24',
+  '198.51.100.0/24',
+  '203.0.113.0/24',
+  -- 6to4 anycast (deprecated)
+  '192.88.99.0/24',
+  -- Benchmarking
+  '198.18.0.0/15',
+  -- IPv4 multicast
+  '224.0.0.0/4',
+  -- IPv4 reserved / "Class E" (includes 255.255.255.255 broadcast)
+  '240.0.0.0/4',
+  -- IPv6 unspecified
+  '::/128',
+  -- NAT64 (RFC 6052)
+  '64:ff9b::/96',
+  -- IPv6 discard prefix (RFC 6666)
+  '100::/64',
+  -- IPv6 documentation
+  '2001:db8::/32',
+  -- IPv6 multicast
+  'ff00::/8',
 }
--- Loopback prefixes lifted out of the BOGON set when test_mode is on.
-local LOOPBACK_RANGES = { ['127.0.0.0/8'] = true, ['::1/128'] = true }
 
 local redis_params
 local exclude_domains
 local exclude_mxs
 local exclude_ips
-local local_map
-local bogon_map
+local bad_mxs
+local bad_ips
+local local_ip_map
+local bogon_ip_map
+
+-- Drop IPs whose family is currently disabled; applied at every cache read.
+local function filter_by_family(ips)
+  if settings.probe_ipv4 and settings.probe_ipv6 then
+    return ips
+  end
+  local out = {}
+  for _, ip_str in ipairs(ips) do
+    local ip = rspamd_ip.from_string(ip_str)
+    if ip and ip:is_valid() then
+      local v = ip:get_version()
+      if (v == 6 and settings.probe_ipv6) or (v == 4 and settings.probe_ipv4) then
+        out[#out + 1] = ip_str
+      end
+    end
+  end
+  return out
+end
 
--- ---------------------------------------------------------------------------
--- Cache layer (Redis-backed; falls back gracefully if Redis is unavailable
--- mid-task — we still complete the probe, we just skip the write).
--- ---------------------------------------------------------------------------
+-- Classify into 'public' / 'local' / 'bogon'. Not rspamd_inet_addr:is_local
+-- -- that misses RFC1918 / CGNAT / ULA.
+local function classify_ip(ip_str)
+  if bogon_ip_map and bogon_ip_map:get_key(ip_str) then
+    return 'bogon'
+  end
+  if local_ip_map and local_ip_map:get_key(ip_str) then
+    return 'local'
+  end
+  return 'public'
+end
+
+local function source_prefix(src)
+  if src == 'reply_to' then return settings.symbol_prefix_reply_to end
+  if src == 'mime_from' then return settings.symbol_prefix_mime_from end
+  return settings.symbol_prefix_from
+end
+
+-- Pre-emit IP-class symbols alongside whatever the probe produces. The
+-- offending IPs are passed as symbol options (one option per IP) so
+-- operators can see exactly which addresses tripped the class without
+-- digging into the resolver logs.
+local function emit_ip_class_symbols(task, mx_domain, locals, bogons, has_public, src)
+  local p = source_prefix(src or 'from')
+  if #locals > 0 then
+    local sym = p .. (has_public and settings.symbol_mx_local_mix
+                                   or settings.symbol_mx_local_only)
+    task:insert_result(sym, 1.0, locals)
+    lua_util.debugm(N, task, '%s for %s: %s', sym, mx_domain, table.concat(locals, ','))
+  end
+  if #bogons > 0 then
+    local sym = p .. (has_public and settings.symbol_mx_bogon_mix
+                                   or settings.symbol_mx_bogon_only)
+    task:insert_result(sym, 1.0, bogons)
+    lua_util.debugm(N, task, '%s for %s: %s', sym, mx_domain, table.concat(bogons, ','))
+  end
+end
+
+-- Cache layer (Redis-backed; degrades gracefully on Redis loss).
 
 local function cache_key(layer, value)
   return string.format('%s:%s:%s', settings.key_prefix, layer, value)
 end
 
+-- d:/m: caching disabled when expire_dns = 0; reads synthesise a miss.
+local function dns_cache_disabled(layer)
+  return (layer == 'd' or layer == 'm') and settings.expire_dns == 0
+end
+
 local function cache_get(task, layer, value, cb)
+  if dns_cache_disabled(layer) then
+    cb(nil, nil, '')
+    return
+  end
   local key = cache_key(layer, value)
   local function on_reply(err, data)
     cb(err, data, key)
   end
-  local ok = rspamd_redis_make_request(task, redis_params, key, false,
+  local ok = lua_redis.rspamd_redis_make_request(task, redis_params, key, false,
       on_reply, 'GET', { key })
   if not ok then
     -- Synthesise a miss so the caller proceeds with DNS/probe.
@@ -172,39 +320,184 @@ local function cache_get(task, layer, value, cb)
 end
 
 local function cache_set(task, layer, value, payload, ttl)
+  if ttl == 0 or dns_cache_disabled(layer) then
+    return
+  end
   local key = cache_key(layer, value)
   local function on_reply(err)
     if err then
       rspamd_logger.errx(task, 'mx_check cache write %s: %s', key, err)
     end
   end
-  local ok = rspamd_redis_make_request(task, redis_params, key, true,
+  local ok = lua_redis.rspamd_redis_make_request(task, redis_params, key, true,
       on_reply, 'SETEX', { key, tostring(ttl), payload })
-  if not ok then
+  if ok then
+    lua_util.debugm(N, task, 'cache write %s ttl=%s value=%s', key, ttl, payload)
+  else
     rspamd_logger.errx(task, 'mx_check cache write failed (no redis): %s', key)
   end
-  lua_util.debugm(N, task, 'cache write %s ttl=%s value=%s', key, ttl, payload)
 end
 
--- Choose the TTL class for an i-layer verdict.
+-- Recognised cache value shapes per layer. Unknown values are treated as a
+-- miss by callers; the natural resolve / probe path then issues cache_set,
+-- which overwrites the bad entry in place (no DEL needed -- DEL would just
+-- add a write op and a blocking Redis operation for no behavioural gain).
+--   i: 'gd' | 'rf' | 'tc' | 'tr' | 'inv' | 'err:NNN'
+--      (the lock value 'l' is handled separately by callers)
+--   d: 'no' | 'bkn' | 'null' | 'df'
+--      | 'mx:<host:prio,...>'
+--      | 'a:v4:<ip,...>' | 'a:v6:<ip,...>' | 'a:v64:<ip,...>'
+--   m: 'no' | 'df'
+--      | 'v4:<ip,...>' | 'v6:<ip,...>' | 'v64:<ip,...>'
+-- Prefix match only; empty / malformed payloads degrade gracefully via the
+-- downstream decode (empty list -> re-resolve), and validating each entry
+-- would just duplicate that parse logic.
+local function is_valid_cache_value(layer, v)
+  if type(v) ~= 'string' or #v == 0 then return false end
+  if layer == 'i' then
+    if v == 'gd' or v == 'rf' or v == 'tc' or v == 'tr' or v == 'inv' then
+      return true
+    end
+    return v:match('^err:%d%d%d$') ~= nil
+  elseif layer == 'd' then
+    if v == 'no' or v == 'bkn' or v == 'null' or v == 'df' then return true end
+    if v:match('^mx:') then return true end
+    return v:match('^a:v4:') ~= nil
+        or v:match('^a:v6:') ~= nil
+        or v:match('^a:v64:') ~= nil
+  elseif layer == 'm' then
+    if v == 'no' or v == 'df' then return true end
+    return v:match('^v4:') ~= nil
+        or v:match('^v6:') ~= nil
+        or v:match('^v64:') ~= nil
+  end
+  return false
+end
+
+local function lock_ttl_seconds()
+  return settings.connect_timeout
+      + (settings.verify_greeting and settings.read_timeout or 0)
+      + 1.0
+end
+
+-- Forcefully write 'l' at i:<ip> overwriting whatever's there. Used on the
+-- recovery path when the key holds invalid data (SET NX would loop on it).
+-- Other workers reading cache_get afterwards see 'l' and defer via
+-- MX_INFLIGHT, so only this worker probes.
+local function force_claim_probe_lock(task, ip, on_ok, on_error)
+  local key = cache_key('i', ip)
+  local lock_ttl = lock_ttl_seconds()
+  local function on_reply(err)
+    if err then
+      rspamd_logger.errx(task, 'mx_check force-claim %s: redis error %s', key, err)
+      on_error()
+      return
+    end
+    lua_util.debugm(N, task, 'force-claimed probe lock %s (ttl=%ss)', key, lock_ttl)
+    on_ok()
+  end
+  local ok = lua_redis.rspamd_redis_make_request(task, redis_params, key, true,
+      on_reply, 'SET',
+      { key, 'l', 'EX', tostring(math.ceil(lock_ttl)) })
+  if not ok then
+    rspamd_logger.errx(task, 'mx_check force-claim dispatch failed: %s', key)
+    on_error()
+  end
+end
+
+-- SET NX EX "lock" at i:<ip> to coordinate parallel workers. Redis failures
+-- fail-closed (on_error -> skip probe) so a dead cache layer can't drive an
+-- uncoordinated herd. The eventual cache_set overwrites the lock value with
+-- the real verdict.
+--
+-- Callback dispatch from the post-claim GET when SET NX fails:
+--   on_won()           the key holds an unrecognised value; force-claim
+--                      first to actually hold the lock, then probe.
+--   on_lost(nil)       the key holds 'l' or is gone -- another worker owns
+--                      the probe; caller defers via MX_INFLIGHT.
+--   on_lost(verdict)   the key holds a valid verdict (a worker raced ahead
+--                      between our cache_get and SET NX); caller uses it.
+--   on_error()         Redis dispatch / I/O error.
+local function try_claim_probe_lock(task, ip, on_won, on_lost, on_error)
+  local key = cache_key('i', ip)
+  local lock_ttl = lock_ttl_seconds()
+  local function on_set_reply(err, data)
+    if err then
+      rspamd_logger.errx(task, 'mx_check probe lock %s: redis error %s', key, err)
+      on_error()
+      return
+    end
+    -- Redis SET NX returns "OK" on success and nil otherwise; rspamd_redis
+    -- surfaces nil as false. Be defensive on both shapes.
+    if data == 'OK' or data == true then
+      lua_util.debugm(N, task, 'probe lock %s: claimed (ttl=%ss)', key, lock_ttl)
+      on_won()
+      return
+    end
+    -- SET NX failed: the key existed. GET it to tell a held lock ('l') from a
+    -- published verdict (race window between our cache_get miss and this
+    -- SET NX).
+    lua_redis.rspamd_redis_make_request(task, redis_params, key, false,
+        function(get_err, get_data)
+          if get_err then
+            rspamd_logger.errx(task, 'mx_check probe lock %s: post-claim GET error %s',
+              key, get_err)
+            on_error()
+            return
+          end
+          if type(get_data) ~= 'string' or #get_data == 0 or get_data == 'l' then
+            lua_util.debugm(N, task, 'probe lock %s: held by another worker', key)
+            on_lost(nil)
+            return
+          end
+          if is_valid_cache_value('i', get_data) then
+            lua_util.debugm(N, task, 'probe lock %s: verdict already published (%s)',
+              key, get_data)
+            on_lost(get_data)
+            return
+          end
+          -- Non-'l', non-verdict value at i:<ip>. Force-claim to actually
+          -- hold the lock (plain on_won() without writing 'l' would let
+          -- every parallel worker probe in lockstep) then dispatch to on_won.
+          lua_util.debugm(N, task,
+            "probe lock %s: bad cache value '%s', force-claiming to overwrite",
+            key, get_data)
+          force_claim_probe_lock(task, ip, on_won, on_error)
+        end, 'GET', { key })
+  end
+  local ok = lua_redis.rspamd_redis_make_request(task, redis_params, key, true,
+      on_set_reply, 'SET',
+      { key, 'l', 'EX', tostring(math.ceil(lock_ttl)), 'NX' })
+  if not ok then
+    rspamd_logger.errx(task, 'mx_check probe lock dispatch failed: %s', key)
+    on_error()
+  end
+end
+
+-- TTL class for an i-layer verdict. 4xx/5xx are 'gd' (real SMTP that just
+-- declined our probe) and cache at the long expire TTL. 'tr' (read timeout)
+-- also rides the long TTL: TCP connected, listener is alive -- the read
+-- timeout is almost always a long greeting delay (Postfix postscreen,
+-- tarpit, big provider rate-limit), not a dead host.
 local function ttl_for_verdict(verdict)
-  if verdict == 'good' then
+  if verdict == 'gd' or verdict == 'tr' or string.find(verdict, '^err:') then
     return settings.expire
-  elseif verdict == 'timeout_connect' or verdict == 'timeout_read' then
+  elseif verdict == 'tc' then
     return settings.expire_timeout
   else
-    -- refused / invalid / error:<code>
+    -- 'rf' / 'inv'
     return settings.expire_novalid
   end
 end
 
--- ---------------------------------------------------------------------------
--- Encoding helpers for d-layer and m-layer values.
--- d-layer:  "mx:host1:prio1,host2:prio2,..."  | "mx_miss:ip1,ip2,..."  | "nxd" | "null"
--- m-layer:  "ip1,ip2,..."                     | "nxd"
--- i-layer:  "good" | "refused" | "timeout_connect" | "timeout_read"
---           | "invalid" | "error:<code>"
--- ---------------------------------------------------------------------------
+-- Cache value formats (short codes minimise Redis footprint):
+--   d:<domain>  "mx:host:prio,..." | "a:<v>:ip,..." | "no" | "null" |
+--               "bkn" | "df"
+--   m:<host>    "<v>:ip,..." | "no" | "df"
+--   i:<ip>      "gd" | "rf" | "tc" | "tr" |
+--               "inv" | "err:<code>" | "l" (probe in flight)
+-- <v> ∈ {v4, v6, v64} encodes which DNS families were queried at write
+-- time; readers re-resolve when current flags need a family not in <v>.
 
 local function encode_mx_list(results)
   local parts = {}
@@ -215,7 +508,7 @@ local function encode_mx_list(results)
 end
 
 local function decode_mx_list(value)
-  -- value already stripped of "mx:" prefix
+  -- value already stripped of "mx:" prefix.
   local out = {}
   for entry in string.gmatch(value, '[^,]+') do
     local host, prio = string.match(entry, '^(.-):(%-?%d+)$')
@@ -234,8 +527,41 @@ local function encode_ip_list(ips)
   return table.concat(parts, ',')
 end
 
-local function decode_ip_list(value)
-  return lua_util.str_split(value, ',')
+-- Family-tag prefix for IP-list cache values. Encodes which DNS families
+-- were queried at write time so readers can tell "cache covers current
+-- flags" (use it) from "cache was partial, current needs more" (re-resolve).
+-- Without this distinction every filter-to-empty would force a re-resolve
+-- even when the cache definitively says "no IPs in that family".
+local function family_prefix()
+  if settings.probe_ipv4 and settings.probe_ipv6 then return 'v64' end
+  if settings.probe_ipv4 then return 'v4' end
+  return 'v6'
+end
+
+-- Returns {v4, v6} booleans (which families the cache entry queried) and
+-- the IP list table. Returns nil on unrecognised / legacy formats so the
+-- caller can treat them as cache misses.
+local function decode_ip_list_with_family(value)
+  local prefix, body
+  if value:sub(1, 4) == 'v64:' then
+    prefix, body = { v4 = true, v6 = true }, value:sub(5)
+  elseif value:sub(1, 3) == 'v4:' then
+    prefix, body = { v4 = true }, value:sub(4)
+  elseif value:sub(1, 3) == 'v6:' then
+    prefix, body = { v6 = true }, value:sub(4)
+  else
+    return nil, nil
+  end
+  return prefix, lua_util.str_split(body, ',')
+end
+
+-- True iff the cached queried-families set covers every currently-enabled
+-- probe family (cache was at least as informed as we need now).
+local function family_coverage_ok(queried)
+  if not queried then return false end
+  if settings.probe_ipv4 and not queried.v4 then return false end
+  if settings.probe_ipv6 and not queried.v6 then return false end
+  return true
 end
 
 -- Detect RFC 7505 Null MX: a single MX RR with priority 0 and root target.
@@ -250,29 +576,7 @@ local function is_null_mx(results)
   return r.name == '' or r.name == '.'
 end
 
--- Classify resolved MX-target IPs into PUBLIC / LOCAL / BOGON buckets.
--- Membership is mutually exclusive; anything that matches neither map is
--- PUBLIC.  Range membership depends only on the IP, so it is recomputed per
--- lookup rather than cached.
-local function partition_ips(ips)
-  local public, locals, bogons = {}, {}, {}
-  for _, ip in ipairs(ips) do
-    if bogon_map and bogon_map:get_key(ip) then
-      bogons[#bogons + 1] = ip
-    elseif local_map and local_map:get_key(ip) then
-      locals[#locals + 1] = ip
-    else
-      public[#public + 1] = ip
-    end
-  end
-  return public, locals, bogons
-end
-
--- ---------------------------------------------------------------------------
--- SMTP banner parsing.
--- Returns {code = "220", sep = ' '|'-', rest = "<text>"} or nil for non-SMTP.
--- ---------------------------------------------------------------------------
-
+-- SMTP banner line -> {code, sep} or nil for non-SMTP.
 local function parse_greeting_line(data)
   if type(data) ~= 'string' then
     data = tostring(data or '')
@@ -284,41 +588,30 @@ local function parse_greeting_line(data)
   return { code = code, sep = sep }
 end
 
--- ---------------------------------------------------------------------------
--- Probe shapes.
---
--- probe_connect_only:  open TCP, success on connect, close.  Distinguishes
---   connect errors (refused vs timeout) via on_error.
---
--- probe_with_greeting: open TCP, read banner line-by-line, validate the
---   3-digit reply code, optionally send QUIT after the final line of a 220
---   banner, then close.  Connect-phase errors via on_error; read-phase
---   outcomes via the read callback.
---
--- Both invoke `cb(verdict, extra)` where verdict is one of:
+-- Probe shapes. Both invoke cb(verdict) where verdict is one of:
 --   good | refused | timeout_connect | timeout_read | invalid | error:<code>
--- ---------------------------------------------------------------------------
+-- probe_connect_only:  open TCP, success-on-connect, close.
+-- probe_with_greeting: open TCP, read+validate SMTP banner, optional QUIT.
 
 local function classify_connect_error(err)
   local e = tostring(err or ''):lower()
   if e:find('refused', 1, true)
       or e:find('reset', 1, true)
       or e:find('econnrefused', 1, true) then
-    return 'refused'
+    return 'rf'
   end
   if e:find('timeout', 1, true)
       or e:find('timed out', 1, true)
       or e:find('unreachable', 1, true)
       or e:find('no route', 1, true) then
-    return 'timeout_connect'
+    return 'tc'
   end
   return nil -- local-side: EPERM, EADDRNOTAVAIL, etc. — caller logs only.
 end
 
 local function probe_connect_only(task, ip, cb)
-  -- One-shot wrapper: on_error may fire synchronously (refused on localhost)
-  -- before rspamd_tcp.new returns, then the function returns false, then our
-  -- !ok fallback below would fire cb again with a worse verdict.  Guard it.
+  -- on_error may fire synchronously (refused on localhost) before rspamd_tcp.new
+  -- returns; the !ok fallback would then double-fire cb. Guard with `fired`.
   local fired = false
   local function finish(verdict)
     if fired then
@@ -330,19 +623,18 @@ local function probe_connect_only(task, ip, cb)
 
   local function on_connect(conn)
     conn:close()
-    finish('good')
+    finish('gd')
   end
   local function on_error(err)
     local v = classify_connect_error(err)
     if not v then
       rspamd_logger.infox(task, 'mx probe local error for %s: %s', ip, err)
-      v = 'timeout_connect'
+      v = 'tc'
     end
     finish(v)
   end
 
-  -- lua_tcp_request requires `callback` even with read=false; it is a no-op
-  -- in the pure connect-only shape because no read handler is queued.
+  -- lua_tcp_request requires `callback` even with read=false; no-op here.
   local function stub_cb() end
 
   local ok = rspamd_tcp.new({
@@ -357,7 +649,7 @@ local function probe_connect_only(task, ip, cb)
   })
 
   if not ok then
-    finish('timeout_connect')
+    finish('tc')
   end
 end
 
@@ -372,17 +664,16 @@ local function probe_with_greeting(task, ip, cb)
   end
 
   local function on_error(err)
-    -- Connect-phase only (lua_tcp guarantees the gate via LUA_TCP_FLAG_CONNECTED).
+    -- Connect-phase only (gated by LUA_TCP_FLAG_CONNECTED in lua_tcp).
     local v = classify_connect_error(err)
     if not v then
       rspamd_logger.infox(task, 'mx probe local error for %s: %s', ip, err)
-      v = 'timeout_connect'
+      v = 'tc'
     end
     finish(v)
   end
 
-  -- Forward declaration so the read callback can re-queue itself for
-  -- multi-line banner draining.
+  -- Forward decl: read callback re-queues itself for multi-line banners.
   local read_line
 
   local function send_quit_and_close(conn)
@@ -395,10 +686,10 @@ local function probe_with_greeting(task, ip, cb)
     if io_err then
       local e = tostring(io_err or ''):lower()
       if e:find('timeout', 1, true) then
-        finish('timeout_read')
+        finish('tr')
       else
         -- EOF before CRLF, or anything not a timeout, is non-SMTP behaviour.
-        finish('invalid')
+        finish('inv')
       end
       if conn then
         conn:close()
@@ -408,39 +699,38 @@ local function probe_with_greeting(task, ip, cb)
 
     local parsed = parse_greeting_line(data)
     if not parsed then
-      finish('invalid')
+      finish('inv')
       conn:close()
       return
     end
 
-    local family = string.sub(parsed.code, 1, 1)
-    if family == '2' then
-      -- 220 (or other 2xx) — successful greeting. If send_quit, drain any
-      -- continuation lines before issuing QUIT so we don't talk mid-banner.
+    -- 220: valid SMTP greeting. Disconnect on the first 220 unless
+    -- send_quit is on with a continuation banner (drain until sep == ' ').
+    if parsed.code == '220' then
+      if settings.send_quit and parsed.sep == '-' then
+        conn:add_read(read_line, CRLF)
+        return
+      end
+      finish('gd')
       if settings.send_quit then
-        if parsed.sep == '-' then
-          -- More banner lines to come; keep reading until the final line.
-          conn:add_read(read_line, CRLF)
-          return
-        end
-        finish('good')
         send_quit_and_close(conn)
       else
-        finish('good')
         conn:close()
       end
       return
     end
 
+    -- 4xx/5xx: real SMTP rejected our probe; drop silently (421/554 close
+    -- the channel anyway per RFC 5321 §3.5, so QUIT is wasted).
+    local family = string.sub(parsed.code, 1, 1)
     if family == '4' or family == '5' then
-      finish('error:' .. parsed.code)
+      finish('err:' .. parsed.code)
       conn:close()
       return
     end
 
-    -- 1xx, 3xx, or anything else with the right shape but the wrong class —
-    -- treat as non-SMTP.
-    finish('invalid')
+    -- 1xx/3xx/non-220 2xx: 3-digit shape but wrong class for a banner.
+    finish('inv')
     conn:close()
   end
 
@@ -457,384 +747,628 @@ local function probe_with_greeting(task, ip, cb)
 
   if not ok then
     rspamd_logger.errx(task, 'mx_check: failed to dispatch TCP probe to %s', ip)
-    finish('timeout_connect')
+    finish('tc')
   end
 end
 
--- ---------------------------------------------------------------------------
--- Outcome → symbol emission.  Phase A: emit existing primary symbols at
--- today's scores plus finer symbols at score 0 for tuning data.
--- ---------------------------------------------------------------------------
+-- Force-reject gate. Authenticated / local traffic never rejected.
+local function should_reject(task, kind)
+  if task:get_user() and not settings.reject_authorized then
+    return false
+  end
+  local ip = task:get_ip()
+  if ip and ip:is_local() and not settings.reject_local then
+    return false
+  end
+  if kind == 'null' then return settings.reject_null_mx end
+  return false
+end
 
-local function emit_outcome(task, mx_domain, outcome, info)
-  -- info table: { mx_missing = bool, host = string, code = string (for error), key = string (for white) }
+-- Greylist gate. Authenticated / local never greylisted; suppressed when
+-- the same outcome is force-rejected (pre-result reject lands first).
+local function should_greylist(task, kind)
+  if task:get_user() and not settings.greylist_authorized then
+    return false
+  end
+  local ip = task:get_ip()
+  if ip and ip:is_local() and not settings.greylist_local then
+    return false
+  end
+  if should_reject(task, kind) then return false end
+  if kind == 'inv' then return settings.greylist_invalid end
+  if kind == 'no' then return settings.greylist_none end
+  if kind == 'bkn' then return settings.greylist_broken end
+  if kind == 'rf' then return settings.greylist_refused end
+  if kind == 'null' then return settings.greylist_null end
+  if kind == 'tc' then return settings.greylist_timeout_connect end
+  if kind == 'tr' then return settings.greylist_timeout_read end
+  return false
+end
+
+local function advise_greylist(task, reason)
+  task:get_mempool():set_variable('grey_greylisted_required', '1')
+  lua_util.debugm(N, task, 'advice to greylist: %s', reason)
+end
+
+-- Map lookup verdict -> result symbols. src picks the prefix from settings
+-- (symbol_prefix_{from,mime_from,reply_to}). When info.mx_missing is true
+-- (A-fallback path), probe outcomes fire MX_A_*.
+local function emit_outcome(task, mx_domain, outcome, info, src)
   info = info or {}
+  local p = source_prefix(src or 'from')
+  local host = info.host or mx_domain
+  local function sym(mx_key, mx_a_key)
+    return p .. settings[info.mx_missing and mx_a_key or mx_key]
+  end
 
   if outcome == 'white' then
-    task:insert_result(settings.symbol_white_mx, 1.0, info.key or mx_domain)
+    task:insert_result(p .. settings.symbol_white_mx, 1.0, info.key or mx_domain)
+    return
+  end
+
+  if outcome == 'ip_class_skipped' then
+    -- IP-class symbols already fired at the classification step.
     return
   end
 
   if outcome == 'skip' then
-    -- The probe set was emptied by the exclude_ips filter; MX_SKIP has
-    -- already been emitted where the filter ran.  Neutral outcome — no
-    -- MX_INVALID, and no MX_MISSING noise for a deliberately skipped check.
+    task:insert_result(p .. settings.symbol_mx_skip, 1.0, info.key or mx_domain)
+    return
+  end
+
+  if outcome == 'bad_mx' then
+    task:insert_result(p .. settings.symbol_mx_bad,
+      info.weight_mult or 1.0, info.key or mx_domain)
+    return
+  end
+
+  if outcome == 'bad_ip' then
+    task:insert_result(p .. settings.symbol_mx_ip_bad,
+      info.weight_mult or 1.0, info.key or mx_domain)
+    return
+  end
+
+  if outcome == 'inflight' then
+    task:insert_result(p .. settings.symbol_mx_inflight, 1.0,
+      info.host or mx_domain)
     return
   end
 
-  -- A-fallback path: today's module fires MX_MISSING whenever the MX RR is
-  -- absent and we fell back to A, independent of the probe outcome.  Match
-  -- that behaviour: emit MX_MISSING first, then fall through to the regular
-  -- outcome emission below.
-  if info.mx_missing then
-    task:insert_result(settings.symbol_no_mx, 1.0, info.host or mx_domain)
+  if outcome == 'df' then
+    task:insert_result(p .. settings.symbol_mx_dns_fail, 1.0, host)
+    return
   end
 
-  if outcome == 'good' then
-    task:insert_result(settings.symbol_good_mx, 1.0, info.host or mx_domain)
+  if outcome == 'gd' then
+    task:insert_result(sym('symbol_good_mx', 'symbol_mx_a_good'), 1.0, host)
     return
   end
 
-  -- DNS-level outcomes: emit the finer symbol AND fall through to the
-  -- MX_INVALID emission path below so the primary skip-mail signal still
-  -- fires (matches today's behaviour: anything not connectable = MX_INVALID).
-  local invalid_reason
+  -- DNS-level outcomes (no MX_A_* split — these are name-level facts).
   if outcome == 'null' then
-    task:insert_result(settings.symbol_mx_null, 1.0, mx_domain)
-    invalid_reason = 'null mx'
-    if settings.reject_null_mx then
-      invalid_reason = 'null mx: rejected'
+    task:insert_result(p .. settings.symbol_mx_null, 1.0, mx_domain)
+    if should_reject(task, 'null') then
+      task:set_pre_result('reject', settings.reject_null_mx_message, N)
+    elseif should_greylist(task, 'null') then
+      advise_greylist(task, 'mx_null')
+    end
+    return
+  end
+  if outcome == 'no' then
+    task:insert_result(p .. settings.symbol_mx_none, 1.0, mx_domain)
+    if should_greylist(task, 'no') then
+      advise_greylist(task, 'mx_none')
     end
-  elseif outcome == 'nxdomain' then
-    task:insert_result(settings.symbol_mx_nxdomain, 1.0, mx_domain)
-    invalid_reason = 'nxdomain'
-    if settings.reject_nxdomain then
-      invalid_reason = 'nxdomain: rejected'
+    return
+  end
+  if outcome == 'bkn' then
+    -- MX_BROKEN is MX-RR-only by construction; no A-fallback variant.
+    task:insert_result(p .. settings.symbol_mx_broken, 1.0, mx_domain)
+    if should_greylist(task, 'bkn') then
+      advise_greylist(task, 'mx_broken')
     end
-  elseif outcome == 'broken' then
-    task:insert_result(settings.symbol_mx_broken, 1.0, mx_domain)
-    invalid_reason = 'broken mx'
-  elseif outcome == 'local_only' then
-    -- MX_LOCAL_ONLY was already emitted during IP classification.
-    invalid_reason = 'mx resolves only to private addresses'
-  elseif outcome == 'bogon_only' then
-    -- MX_BOGON_ONLY was already emitted during IP classification.
-    invalid_reason = 'mx resolves only to non-routable addresses'
-  end
-
-  if invalid_reason then
-    -- DNS failures: emit MX_INVALID with a descriptive reason.  We bypass
-    -- greylisting here because there is no transient signal — DNS results
-    -- already include their own caching/retry semantics.
-    task:insert_result(settings.symbol_bad_mx, 1.0, invalid_reason)
     return
   end
 
-  -- Finer probe outcomes.
-  local finer
-  local code_param
-  if outcome == 'refused' then
-    finer = settings.symbol_mx_refused
-  elseif outcome == 'timeout_connect' then
-    finer = settings.symbol_mx_timeout_connect
-  elseif outcome == 'timeout_read' then
-    finer = settings.symbol_mx_timeout_read
-  elseif outcome == 'invalid' then
-    finer = nil -- MX_INVALID is the primary; no finer symbol
-  else
-    -- "error:<code>"
-    local code = string.match(outcome, '^error:(%d+)$')
-    if code then
-      finer = settings.symbol_mx_error
-      code_param = code
+  -- TCP-probe finer outcomes — split by path.
+  if outcome == 'rf' then
+    task:insert_result(sym('symbol_mx_refused', 'symbol_mx_a_refused'), 1.0, host)
+    if should_greylist(task, 'rf') then
+      advise_greylist(task, 'mx_refused')
     end
+    return
   end
-
-  if finer then
-    if code_param then
-      task:insert_result(finer, 1.0, code_param)
-    else
-      task:insert_result(finer, 1.0, info.host or mx_domain)
+  if outcome == 'tc' then
+    task:insert_result(sym('symbol_mx_timeout_connect', 'symbol_mx_a_timeout_connect'), 1.0, host)
+    if should_greylist(task, 'tc') then
+      advise_greylist(task, 'mx_timeout_connect')
     end
+    return
   end
-
-  -- Special case: a 4xx/5xx greeting means the MX is a real SMTP server —
-  -- map to MX_GOOD for the primary symbol (today's behaviour too).
-  if string.find(outcome, '^error:', 1) then
-    if info.mx_missing then
-      task:insert_result(settings.symbol_no_mx, 1.0, info.host or mx_domain)
+  if outcome == 'tr' then
+    task:insert_result(sym('symbol_mx_timeout_read', 'symbol_mx_a_timeout_read'), 1.0, host)
+    if should_greylist(task, 'tr') then
+      advise_greylist(task, 'mx_timeout_read')
     end
-    task:insert_result(settings.symbol_good_mx, 1.0, info.host or mx_domain)
     return
   end
 
-  -- All remaining outcomes are MX_INVALID territory.
-  if settings.greylist_invalid then
-    task:get_mempool():set_variable('grey_greylisted_required', '1')
-    lua_util.debugm(N, task, 'advice to greylist a message')
-    task:insert_result(settings.symbol_bad_mx, 1.0, 'greylisted')
-  else
-    task:insert_result(settings.symbol_bad_mx, 1.0)
+  -- 4xx/5xx: real SMTP rejected our probe. Fire GOOD + ERROR with code.
+  local code = string.match(outcome, '^err:(%d+)$')
+  if code then
+    task:insert_result(sym('symbol_mx_error', 'symbol_mx_a_error'), 1.0, {host, code})
+    task:insert_result(sym('symbol_good_mx', 'symbol_mx_a_good'), 1.0, host)
+    return
   end
-end
 
--- ---------------------------------------------------------------------------
--- Lookup orchestrator: step1 (d:) → step2 (m:) → step3 (i:).
--- Stateful via small closure-captured tables; each cache GET is its own
--- continuation.
--- ---------------------------------------------------------------------------
+  -- MX_INVALID / MX_A_INVALID: TCP up, banner not valid SMTP.
+  if outcome == 'inv' then
+    local invalid_sym = sym('symbol_bad_mx', 'symbol_mx_a_invalid')
+    if should_greylist(task, 'inv') then
+      advise_greylist(task, 'mx_invalid')
+      task:insert_result(invalid_sym, 1.0, 'greylisted')
+    else
+      task:insert_result(invalid_sym, 1.0)
+    end
+    return
+  end
 
-local function lookup(task, mx_domain, done)
-  local ctx = { mx_domain = mx_domain, mx_missing = false }
+  -- Module-internal failure: Redis was unreachable mid-claim.
+  if outcome == 'redis_error' then
+    task:insert_result(p .. settings.symbol_mx_redis_error, 1.0)
+    return
+  end
+end
 
-  -- step 3: resolve a verdict for one MX's IP set — the first cached i:
-  -- verdict wins, else probe the first IP.  The verdict and the IP that
-  -- produced it are handed to `on_result`; the caller decides whether to stop
-  -- or, for a multi-MX domain, fall through to the next MX host.
-  local function step3(ips, on_result)
-    if not ips or #ips == 0 then
-      -- Should not happen — defensive.
-      on_result('invalid', nil)
+-- Parallel A/AAAA resolution. Callback: done(ip_strs, err_code) with
+-- err_code in {nil, 'no_records', 'df'}. dns_fail only when every
+-- queried family had a real network-level error.
+local function resolve_addresses(task, name, done)
+  local r = task:get_resolver()
+  local pending = 0
+  local v4_ips, v6_ips
+  local v4_err, v6_err
+  local cap = settings.max_mx_a_records
+
+  local function maybe_done()
+    if pending > 0 then
       return
     end
-
-    local i = 1
-    local function try_next()
-      if i > #ips then
-        -- All uncached; probe the first IP.
-        local ip = ips[1]
-        local function on_probe(verdict)
-          cache_set(task, 'i', ip, verdict, ttl_for_verdict(verdict))
-          on_result(verdict, ip)
-        end
-        if settings.verify_greeting then
-          probe_with_greeting(task, ip, on_probe)
-        else
-          probe_connect_only(task, ip, on_probe)
-        end
-        return
+    local combined = {}
+    -- Interleave 1:1 from each family so the cap doesn't starve one side.
+    -- prefer_ipv6 only picks which family lands first at each index.
+    local first, second
+    if settings.prefer_ipv6 then
+      first, second = v6_ips, v4_ips
+    else
+      first, second = v4_ips, v6_ips
+    end
+    local idx = 1
+    while not cap or #combined < cap do
+      local took = false
+      if first and first[idx] then
+        combined[#combined + 1] = first[idx]
+        took = true
+        if cap and #combined >= cap then break end
       end
+      if second and second[idx] then
+        combined[#combined + 1] = second[idx]
+        took = true
+      end
+      if not took then break end
+      idx = idx + 1
+    end
 
-      local ip = ips[i]
-      cache_get(task, 'i', ip, function(err, data)
-        if not err and type(data) == 'string' and #data > 0 then
-          ctx.from_cache = true
-          on_result(data, ip)
-          return
-        end
-        i = i + 1
-        try_next()
-      end)
+    if #combined > 0 then
+      done(combined, nil)
+      return
     end
+    -- dns_fail only if every queried family had a real failure; an
+    -- authoritative NXDOMAIN/NOREC from any family is collapsed into
+    -- 'no_records' (operationally equivalent without eTLD+1 verification).
+    local v4_real_fail = settings.probe_ipv4 and is_dns_real_failure(v4_err)
+    local v6_real_fail = settings.probe_ipv6 and is_dns_real_failure(v6_err)
+    local v4_clean = settings.probe_ipv4 and not is_dns_real_failure(v4_err)
+    local v6_clean = settings.probe_ipv6 and not is_dns_real_failure(v6_err)
+    if (v4_real_fail or v6_real_fail) and not (v4_clean or v6_clean) then
+      done({}, 'df')
+      return
+    end
+    done({}, 'no_records')
+  end
 
-    try_next()
+  if settings.probe_ipv4 then
+    pending = pending + 1
+    r:resolve('a', {
+      name = name,
+      task = task,
+      forced = true,
+      callback = function(_, _, addrs, err)
+        pending = pending - 1
+        v4_err = err
+        if addrs and #addrs > 0 then
+          local v = {}
+          for _, addr in ipairs(addrs) do
+            v[#v + 1] = addr:to_string()
+          end
+          lua_util.shuffle(v)
+          v4_ips = v
+        end
+        maybe_done()
+      end,
+    })
   end
 
-  -- Terminal step3 result handler for the single-IP-set paths (A-fallback and
-  -- the cached mx_miss: shortcut): there is no MX list to fall through to.
-  local function finish_single(verdict, host)
-    ctx.host = host
-    done(verdict, ctx)
+  if settings.probe_ipv6 then
+    pending = pending + 1
+    r:resolve('aaaa', {
+      name = name,
+      task = task,
+      forced = true,
+      callback = function(_, _, addrs, err)
+        pending = pending - 1
+        v6_err = err
+        if addrs and #addrs > 0 then
+          local v = {}
+          for _, addr in ipairs(addrs) do
+            v[#v + 1] = addr:to_string()
+          end
+          lua_util.shuffle(v)
+          v6_ips = v
+        end
+        maybe_done()
+      end,
+    })
   end
+end
 
-  -- Classify one MX target's IP set, emit the IP-class symbols, drop the
-  -- operator-excluded IPs, and probe whatever public addresses remain.
-  -- Bogon and private addresses are never probed: probing them is meaningless
-  -- from our vantage point and, for bogons, a packet-injection footgun.
-  local function probe_ip_set(ips, on_result)
-    local public, locals, bogons = partition_ips(ips)
+-- Lookup orchestrator: step1 (d:) -> step2 (m:) -> step3 (i:).
+local function lookup(task, mx_domain, src, done)
+  local ctx = { mx_domain = mx_domain, mx_missing = false }
 
-    if #public == 0 then
-      -- Nothing probeable: the MX target resolves only into private and/or
-      -- non-routable space.
-      if #locals > 0 then
-        task:insert_result(settings.symbol_mx_local_only, 1.0, locals[1])
-      end
-      if #bogons > 0 then
-        task:insert_result(settings.symbol_mx_bogon_only, 1.0, bogons[1])
-      end
-      on_result(#bogons > 0 and 'bogon_only' or 'local_only', nil)
+  -- step 3: walk IP list, take first cached verdict, else probe the first one.
+  -- mx_host is the MX RR target (or the from-domain on the A-fallback path);
+  -- it surfaces in probe-outcome symbol options so operators see a name, not
+  -- a raw IP. IP-class symbols (MX_LOCAL_*, MX_BOGON_*) still report IPs --
+  -- that's where IP information is the point.
+  local function step3(ips, mx_host)
+    if #ips == 0 then
+      -- Should not happen — defensive.
+      ctx.host = mx_host
+      done('inv', ctx)
       return
     end
 
-    -- Mixed set: keep the public addresses, flag the rest as informational.
-    if #locals > 0 then
-      task:insert_result(settings.symbol_mx_local_mix, 1.0, locals[1])
+    -- Partition into PUBLIC / LOCAL / BOGON. Only PUBLIC gets probed;
+    -- per-class symbol fires regardless so operators can score the shape.
+    local public_ips = {}
+    local local_ips = {}
+    local bogon_ips = {}
+    for _, ip in ipairs(ips) do
+      local class = classify_ip(ip)
+      if class == 'bogon' then
+        bogon_ips[#bogon_ips + 1] = ip
+      elseif class == 'local' then
+        local_ips[#local_ips + 1] = ip
+      else
+        public_ips[#public_ips + 1] = ip
+      end
+    end
+
+    local has_local = #local_ips > 0
+    local has_bogon = #bogon_ips > 0
+    local has_public = #public_ips > 0
+
+    if has_local or has_bogon then
+      emit_ip_class_symbols(task, mx_domain, local_ips, bogon_ips, has_public, src)
     end
-    if #bogons > 0 then
-      task:insert_result(settings.symbol_mx_bogon_mix, 1.0, bogons[1])
+
+    if not has_public then
+      -- LOCAL/BOGON symbols already emitted; nothing routable to probe.
+      done('ip_class_skipped', ctx)
+      return
     end
 
-    -- Operator skip filter: drop public IPs that match exclude_ips.
-    local probe_list = public
+    -- bad_ips: any public IP matching short-circuits with MX_IP_BAD. Checked
+    -- before exclude_ips so an IP in both is treated as bad (punish wins
+    -- over skip). The matched IP is reported as the symbol option; an
+    -- optional numeric token after the entry ("1.2.3.4 5", "1.2.3.0/24 0.5")
+    -- becomes a weight multiplier on top of the group score, default 1.0.
+    if bad_ips then
+      for _, ip in ipairs(public_ips) do
+        local m = bad_ips:get_key(ip)
+        if m then
+          local mult = (type(m) == 'string') and tonumber(m) or nil
+          done('bad_ip', { key = ip, weight_mult = mult })
+          return
+        end
+      end
+    end
+
+    -- exclude_ips drops matched IPs; full match -> MX_SKIP, partial silent.
     if exclude_ips then
-      probe_list = {}
-      local dropped
-      for _, ip in ipairs(public) do
-        if exclude_ips:get_key(ip) then
-          dropped = dropped or ip
+      local kept = {}
+      local matched_cidrs = {}
+      for _, ip in ipairs(public_ips) do
+        local m = exclude_ips:get_key(ip)
+        if m then
+          matched_cidrs[#matched_cidrs + 1] = (type(m) == 'string' and #m > 0) and m or ip
         else
-          probe_list[#probe_list + 1] = ip
+          kept[#kept + 1] = ip
         end
       end
-      if #probe_list == 0 then
-        -- The filter emptied the probe set — nothing left to check.
-        task:insert_result(settings.symbol_mx_skip, 1.0, dropped)
-        on_result('skip', nil)
+      if #kept == 0 then
+        done('skip', { key = table.concat(matched_cidrs, ',') })
         return
       end
+      public_ips = kept
     end
 
-    step3(probe_list, on_result)
-  end
+    -- Continue with only the public subset.
+    ips = public_ips
 
-  -- step 2: walk the MX list in priority order.  For each MX, resolve its IP
-  -- set (m: cache, else an A lookup) and probe it via step3.  A non-working
-  -- verdict moves on to the next MX so a reachable backup MX still scores the
-  -- domain as MX_GOOD — only after every selected MX fails do we emit the
-  -- failure.  If no MX target resolves to an address at all, emit MX_BROKEN.
-  local function step2(mx_list)
-    table.sort(mx_list, function(a, b)
-      return a.priority < b.priority -- RFC 5321: lowest preference first
-    end)
-    local limit = math.min(#mx_list, settings.max_mx_a_records)
-    if limit < #mx_list then
-      local trimmed = {}
-      for k = 1, limit do
-        trimmed[k] = mx_list[k]
+    local i = 1
+
+    local function do_probe(ip)
+      local function on_probe(verdict, _extra)
+        cache_set(task, 'i', ip, verdict, ttl_for_verdict(verdict))
+        ctx.host = mx_host
+        done(verdict, ctx)
+      end
+      if settings.verify_greeting then
+        probe_with_greeting(task, ip, on_probe)
+      else
+        probe_connect_only(task, ip, on_probe)
       end
-      mx_list = trimmed
     end
 
-    local idx = 0
-    -- First non-working probe verdict, set once at least one MX was reachable.
-    -- Its absence at exhaustion means no MX target resolved at all (broken).
-    local first_fail
+    -- Claim the lock for `ip` and probe; on race-loss the post-claim GET
+    -- decides between inheriting a freshly-published verdict and deferring
+    -- via MX_INFLIGHT.
+    local function probe_with_lock(ip)
+      try_claim_probe_lock(task, ip,
+        function() do_probe(ip) end,
+        function(verdict)
+          ctx.host = mx_host
+          if verdict then
+            ctx.from_cache = true
+            done(verdict, ctx)
+          else
+            done('inflight', ctx)
+          end
+        end,
+        function()
+          ctx.host = mx_host
+          done('redis_error', ctx)
+        end)
+    end
 
-    local process_mx -- forward declaration
+    -- Heal an invalid cache entry at `ip`: force-claim the lock (overwrite
+    -- the bad value with 'l' so parallel workers see the in-flight state)
+    -- then probe. Bypasses SET NX because the bad value would fail it.
+    local function heal_and_probe(ip)
+      force_claim_probe_lock(task, ip,
+        function() do_probe(ip) end,
+        function()
+          ctx.host = mx_host
+          done('redis_error', ctx)
+        end)
+    end
 
-    -- step3 result for the MX currently at `idx`.
-    local function on_mx_result(verdict, host)
-      if verdict == 'good' or lua_util.str_startswith(verdict, 'error:') then
-        -- A working MX (real SMTP, including a 4xx/5xx greeting): stop here so
-        -- a reachable backup MX still scores the domain as good.
-        ctx.host = host
-        done(verdict, ctx)
+    local function try_next()
+      if i > #ips then
+        -- Every IP missed cache; probe the highest-priority one.
+        probe_with_lock(ips[1])
         return
       end
-      -- Non-working MX: remember the first failure, then try the next MX.
-      if not first_fail then
-        first_fail = verdict
-        ctx.host = host
-      end
-      process_mx()
-    end
 
-    -- Resolve A records for `mx`, cache them under m:, then probe via step3.
-    local function resolve_and_probe(mx)
-      local r = task:get_resolver()
-      r:resolve('a', {
-        name = mx.name,
-        task = task,
-        forced = true,
-        callback = function(_, _, results, err)
-          if (err and err ~= DNS_ERR_NOREC and err ~= DNS_ERR_NXDOMAIN)
-              or not results or #results == 0 then
-            -- This MX target resolves to no address: a broken reference.
-            cache_set(task, 'm', mx.name, 'nxd', settings.expire_novalid)
-            process_mx()
+      local ip = ips[i]
+      cache_get(task, 'i', ip, function(err, data)
+        if not err and type(data) == 'string' and #data > 0 then
+          if data == 'l' then
+            -- Another worker is probing this IP; defer via MX_INFLIGHT.
+            ctx.host = mx_host
+            done('inflight', ctx)
             return
           end
-          lua_util.shuffle(results) -- match today's per-IP picking behaviour
-          local ip_strs = {}
-          for _, addr in ipairs(results) do
-            ip_strs[#ip_strs + 1] = addr:to_string()
+          if is_valid_cache_value('i', data) then
+            ctx.host = mx_host
+            ctx.from_cache = true
+            done(data, ctx)
+            return
           end
-          cache_set(task, 'm', mx.name, encode_ip_list(ip_strs), settings.expire)
-          probe_ip_set(ip_strs, on_mx_result)
-        end,
-      })
+          lua_util.debugm(N, task,
+            "unexpected i: cache value at %s: '%s', force-claiming to overwrite",
+            ip, data)
+          heal_and_probe(ip)
+          return
+        end
+        i = i + 1
+        try_next()
+      end)
     end
 
-    process_mx = function()
-      idx = idx + 1
-      if idx > #mx_list then
-        -- Every selected MX has been tried.
-        if first_fail then
-          -- At least one MX was reachable but none accepted the probe.
-          done(first_fail, ctx)
-        else
-          -- No MX target resolved to an address at all.  Not cached under d:
-          -- as 'nxd' on purpose — the domain and its MX RRs do exist, so a
-          -- later lookup must not be told the domain is NXDOMAIN.
-          done('broken', ctx)
+    try_next()
+  end
+
+  -- step 2: walk MX list for cached IPs; resolve A for the top MX otherwise.
+  local function step2(mx_list)
+    -- bad_mxs: any matching MX hostname short-circuits with MX_BAD. Checked
+    -- before exclude_mxs so a hostname listed in both is treated as bad
+    -- (punish wins over trust); operators shouldn't list the same name in
+    -- both anyway. An optional numeric token after the glob entry
+    -- ("trapmx.example.com 3", "*.bad.example 0.5") becomes a weight
+    -- multiplier on top of the group score; default 1.0.
+    if bad_mxs then
+      for _, mx in ipairs(mx_list) do
+        local m = bad_mxs:get_key(mx.name)
+        if m then
+          local mult = (type(m) == 'string') and tonumber(m) or nil
+          done('bad_mx', { key = mx.name, weight_mult = mult })
+          return
+        end
+      end
+    end
+
+    -- exclude_mxs: any matching MX hostname short-circuits with MX_WHITE.
+    if exclude_mxs then
+      for _, mx in ipairs(mx_list) do
+        if exclude_mxs:get_key(mx.name) then
+          done('white', { key = mx.name })
+          return
         end
-        return
       end
+    end
 
-      local mx = mx_list[idx]
-      if exclude_mxs and exclude_mxs:get_key(mx.name) then
-        -- Trusted MX host: short-circuit the whole domain with MX_WHITE.
-        done('white', { key = mx.name })
+    local i = 1
+    local broken_count = 0  -- targets that returned 'no' or 'df'
+    local df_count = 0      -- subset of broken_count: targets that returned 'df'
+
+    local function resolve_uncached()
+      -- Resolve A for the highest-priority MX without a cache entry.
+      local target
+      for _, mx in ipairs(mx_list) do
+        if not mx._cache_checked or mx._cache_value == nil then
+          target = mx.name
+          break
+        end
+      end
+      if not target then
+        -- Every target was already cache-broken (m-layer 'no' or 'df').
+        done(df_count > 0 and 'df' or 'bkn', ctx)
         return
       end
-      cache_get(task, 'm', mx.name, function(err, data)
-        if not err and type(data) == 'string' and #data > 0 then
-          if data == 'nxd' or data == 'none' then
-            -- Cached: this MX target does not resolve.  Try the next MX.
-            process_mx()
-            return
+
+      resolve_addresses(task, target, function(ip_strs, err_code)
+        if not ip_strs or #ip_strs == 0 then
+          local m_value, is_df
+          if err_code == 'df' then
+            -- Transient DNS path failure for THIS target; cache at m-layer
+            -- (cache_set no-ops when expire_dns = 0) and iterate.
+            m_value, is_df = 'df', true
+            cache_set(task, 'm', target, 'df', settings.expire_dns)
+          else
+            -- MX target has no usable address (NXDOMAIN/NOREC).
+            m_value, is_df = 'no', false
+            cache_set(task, 'm', target, 'no', settings.expire_dns)
           end
-          local ips = decode_ip_list(data)
-          if #ips > 0 then
-            probe_ip_set(ips, on_mx_result)
+          if is_df then df_count = df_count + 1 end
+          broken_count = broken_count + 1
+          for _, mx in ipairs(mx_list) do
+            if mx.name == target then
+              mx._cache_checked = true
+              mx._cache_value = m_value
+            end
+          end
+          if broken_count >= #mx_list then
+            done(df_count > 0 and 'df' or 'bkn', ctx)
             return
           end
+          resolve_uncached()
+          return
         end
-        -- Cache miss (or an unusable value): resolve A for this MX.
-        resolve_and_probe(mx)
+
+        cache_set(task, 'm', target,
+          family_prefix() .. ':' .. encode_ip_list(ip_strs), settings.expire_dns)
+        step3(ip_strs, target)
       end)
     end
 
-    process_mx()
+    local function step()
+      if i > #mx_list then
+        if broken_count >= #mx_list then
+          done(df_count > 0 and 'df' or 'bkn', ctx)
+          return
+        end
+        resolve_uncached()
+        return
+      end
+      local mx = mx_list[i]
+      cache_get(task, 'm', mx.name, function(err, data)
+        i = i + 1
+        if err or type(data) ~= 'string' or #data == 0 then
+          mx._cache_checked = true
+          mx._cache_value = nil
+          step()
+          return
+        end
+        if not is_valid_cache_value('m', data) then
+          lua_util.debugm(N, task,
+            "unexpected m: cache value at %s: '%s', treating as miss", mx.name, data)
+          -- resolve_uncached will cache_set a fresh value over the bad entry.
+          mx._cache_checked = true
+          mx._cache_value = nil
+          step()
+          return
+        end
+        if data == 'no' then
+          mx._cache_checked = true
+          mx._cache_value = data
+          broken_count = broken_count + 1
+          step()
+          return
+        end
+        if data == 'df' then
+          mx._cache_checked = true
+          mx._cache_value = data
+          df_count = df_count + 1
+          broken_count = broken_count + 1
+          step()
+          return
+        end
+        local queried, all_ips = decode_ip_list_with_family(data)
+        if not queried then
+          -- Unrecognised / legacy IP list; treat as miss so resolve_uncached
+          -- re-queries under current flags.
+          mx._cache_checked = true
+          mx._cache_value = nil
+          step()
+          return
+        end
+        if not family_coverage_ok(queried) then
+          -- Cache was written under a partial family set that doesn't cover
+          -- current flags; re-resolve to fill the missing family.
+          mx._cache_checked = true
+          mx._cache_value = nil
+          step()
+          return
+        end
+        local ips = filter_by_family(all_ips)
+        if #ips == 0 then
+          -- Cache covers current flags but has no IPs in them -- definitive
+          -- "broken" for THIS target (DNS authoritatively says so).
+          mx._cache_checked = true
+          mx._cache_value = data
+          broken_count = broken_count + 1
+          step()
+          return
+        end
+        step3(ips, mx.name)
+      end)
+    end
+
+    step()
   end
 
   -- step 1.5: A-fallback (no MX RR found at domain).
   local function fallback_a()
     ctx.mx_missing = true
-    local r = task:get_resolver()
-    r:resolve('a', {
-      name = mx_domain,
-      task = task,
-      forced = true,
-      callback = function(_, _, results, err)
-        if err == DNS_ERR_NXDOMAIN then
-          -- The domain itself does not exist.
-          cache_set(task, 'd', mx_domain, 'nxd', settings.expire_novalid)
-          done('nxdomain', ctx)
+    resolve_addresses(task, mx_domain, function(ip_strs, err_code)
+      if not ip_strs or #ip_strs == 0 then
+        if err_code == 'df' then
+          cache_set(task, 'd', mx_domain, 'df', settings.expire_dns)
+          done('df', ctx)
           return
         end
-        if (err and err ~= DNS_ERR_NOREC) or not results or #results == 0 then
-          -- The domain exists but publishes neither MX nor A records (NODATA,
-          -- empty answer, or a soft resolver error): there is no mail
-          -- destination.  This is NOT NXDOMAIN, so we must not poison the
-          -- d-cache with 'nxd' — that would make later lookups emit
-          -- MX_NXDOMAIN for a domain that does exist.  Emit a missing/invalid
-          -- outcome instead; ctx.mx_missing makes emit_outcome fire MX_MISSING
-          -- alongside MX_INVALID, matching today's no-resolvable-MX behaviour.
-          done('invalid', ctx)
-          return
-        end
-        lua_util.shuffle(results)
-        local ip_strs = {}
-        for _, addr in ipairs(results) do
-          ip_strs[#ip_strs + 1] = addr:to_string()
-        end
-        cache_set(task, 'd', mx_domain,
-          'mx_miss:' .. encode_ip_list(ip_strs), settings.expire)
-        probe_ip_set(ip_strs, finish_single)
-      end,
-    })
+        cache_set(task, 'd', mx_domain, 'no', settings.expire_dns)
+        done('no', ctx)
+        return
+      end
+      cache_set(task, 'd', mx_domain,
+        'a:' .. family_prefix() .. ':' .. encode_ip_list(ip_strs),
+        settings.expire_dns)
+      step3(ip_strs, mx_domain)
+    end)
   end
 
   -- step 1: d-layer cache, else MX resolution.
@@ -845,18 +1379,56 @@ local function lookup(task, mx_domain, done)
       task = task,
       forced = true,
       callback = function(_, _, results, err)
-        if results and #results > 0 then
-          if is_null_mx(results) then
-            cache_set(task, 'd', mx_domain, 'null', settings.expire_novalid)
-            done('null', ctx)
+        if not results or #results == 0 then
+          if is_dns_real_failure(err) then
+            -- DNS-path failure on MX -- don't A-fallback (could produce a
+            -- misleading verdict via a different resolver path).
+            cache_set(task, 'd', mx_domain, 'df', settings.expire_dns)
+            done('df', ctx)
             return
           end
-          cache_set(task, 'd', mx_domain, encode_mx_list(results), settings.expire)
-          step2(results)
+          -- NXDOMAIN/NOREC at MX -> legitimate "no MX", A-fallback per §5.1.
+          fallback_a()
           return
         end
-        -- No MX → A-fallback per RFC 5321 §5.1
-        fallback_a()
+
+        if is_null_mx(results) then
+          cache_set(task, 'd', mx_domain, 'null', settings.expire_dns)
+          done('null', ctx)
+          return
+        end
+
+        -- Drop MX targets with labels starting with '_' (RFC 952/1123:
+        -- invalid as hostnames; covers Domain Connect placeholders
+        -- like _dc-mx.*). Publisher published MX records, so this is
+        -- not the no-MX case -- all-malformed surfaces as 'bkn'.
+        -- This is THE canonical normalisation point: encode_mx_list,
+        -- exclude_mxs glob match, and m-layer cache keys all trust the
+        -- names to be lowercase from here on.
+        local valid = {}
+        for _, mx in ipairs(results) do
+          local name = norm_name(mx.name)
+          if name and not name:match('^_') then
+            mx.name = name
+            valid[#valid + 1] = mx
+          end
+        end
+        if #valid == 0 then
+          cache_set(task, 'd', mx_domain, 'bkn', settings.expire_dns)
+          done('bkn', ctx)
+          return
+        end
+        -- Sort by RFC 5321 preference (lowest first) and cap before caching:
+        -- canonical encoding makes Redis entries debuggable in priority order
+        -- and saves bytes on records with many MX entries.
+        table.sort(valid, function(a, b) return a.priority < b.priority end)
+        if #valid > settings.max_mx_a_records then
+          local trimmed = {}
+          for k = 1, settings.max_mx_a_records do trimmed[k] = valid[k] end
+          valid = trimmed
+        end
+        cache_set(task, 'd', mx_domain, encode_mx_list(valid), settings.expire_dns)
+        step2(valid)
       end,
     })
   end
@@ -866,14 +1438,29 @@ local function lookup(task, mx_domain, done)
       step1_resolve_mx()
       return
     end
-    if data == 'nxd' then
-      done('nxdomain', ctx)
+    if not is_valid_cache_value('d', data) then
+      lua_util.debugm(N, task,
+        "unexpected d: cache value at %s: '%s', treating as miss", mx_domain, data)
+      -- step1_resolve_mx will cache_set a fresh value over the bad entry.
+      step1_resolve_mx()
+      return
+    end
+    if data == 'no' then
+      done('no', ctx)
+      return
+    end
+    if data == 'bkn' then
+      done('bkn', ctx)
       return
     end
     if data == 'null' then
       done('null', ctx)
       return
     end
+    if data == 'df' then
+      done('df', ctx)
+      return
+    end
     if lua_util.str_startswith(data, 'mx:') then
       local mx_list = decode_mx_list(string.sub(data, 4))
       if #mx_list == 0 then
@@ -883,75 +1470,98 @@ local function lookup(task, mx_domain, done)
       step2(mx_list)
       return
     end
-    if lua_util.str_startswith(data, 'mx_miss:') then
+    if lua_util.str_startswith(data, 'a:') then
       ctx.mx_missing = true
-      local ips = decode_ip_list(string.sub(data, 9))
+      local queried, all_ips = decode_ip_list_with_family(string.sub(data, 3)) -- #'a:' == 3
+      if not queried then
+        -- Unrecognised body; re-resolve from scratch.
+        fallback_a()
+        return
+      end
+      if not family_coverage_ok(queried) then
+        -- Cache didn't query a currently-enabled family; re-resolve.
+        fallback_a()
+        return
+      end
+      local ips = filter_by_family(all_ips)
       if #ips == 0 then
-        step1_resolve_mx()
+        -- Cache covers current flags and authoritatively has no IPs -> MX_NONE.
+        done('no', ctx)
         return
       end
-      probe_ip_set(ips, finish_single)
+      step3(ips, mx_domain)
       return
     end
-    -- Unknown value; treat as miss.
     step1_resolve_mx()
   end)
 end
 
--- ---------------------------------------------------------------------------
--- Module entry.
--- ---------------------------------------------------------------------------
-
 local function mx_check(task)
-  local ip_addr = task:get_ip()
-  -- By default we only check inbound mail from untrusted senders. Authenticated
-  -- and local-network senders are our own traffic; ESP/outbound deployments
-  -- can opt into checking them via check_authorized / check_local.
-  if not settings.check_authorized and task:get_user() then
-    lua_util.debugm(N, task, 'skip mx check for an authenticated sender')
+  -- Skip authenticated / locally-originated traffic unless explicitly opted in.
+  if task:get_user() and not settings.check_authorized then
     return
   end
-  if not settings.check_local and ip_addr and ip_addr:is_local() then
-    lua_util.debugm(N, task, 'skip mx check for a local-network sender')
+  local ip_addr = task:get_ip()
+  if ip_addr and ip_addr:is_local() and not settings.check_local then
     return
   end
 
-  local from = task:get_from('smtp')
-  local mx_domain
-  if ((from or E)[1] or E).domain and not from[2] then
-    mx_domain = from[1]['domain']
-  else
-    mx_domain = task:get_helo()
-    if mx_domain then
-      mx_domain = rspamd_util.get_tld(mx_domain)
+  -- Collect candidate domains; dedup by normalised domain, keeping the
+  -- highest-priority source (envelope > reply-to > mime-from).
+  local domains = {}
+  local function record(domain, source)
+    domain = norm_name(domain)
+    if not domain then return end
+    local current = domains[domain]
+    if not current or SOURCE_PRIORITY[source] < SOURCE_PRIORITY[current] then
+      domains[domain] = source
     end
   end
 
-  if not mx_domain then
-    return
+  if settings.check_from then
+    local from = task:get_from('smtp')
+    if ((from or E)[1] or E).domain and not from[2] then
+      record(from[1].domain, 'from')
+    else
+      record(task:get_helo(), 'from')
+    end
   end
-
-  if exclude_domains then
-    if exclude_domains:get_key(mx_domain) then
-      rspamd_logger.infox(task, 'skip mx check for %s, excluded', mx_domain)
-      emit_outcome(task, mx_domain, 'white', { key = mx_domain })
-      return
+  if settings.check_mime_from then
+    local mime_from = task:get_from('mime')
+    if mime_from then
+      for _, m in ipairs(mime_from) do
+        record(m.domain, 'mime_from')
+      end
+    end
+  end
+  if settings.check_reply_to then
+    local rt_hdr = task:get_header('Reply-To')
+    if rt_hdr then
+      local addrs = rspamd_util.parse_mail_address(rt_hdr, task:get_mempool())
+      if addrs then
+        for _, a in ipairs(addrs) do
+          record(a.domain, 'reply_to')
+        end
+      end
     end
   end
 
-  lookup(task, mx_domain, function(outcome, info)
-    -- `outcome` is one of:
-    --   good | refused | timeout_connect | timeout_read | invalid
-    --   error:<code> | nxdomain | null | broken | white
-    --   local_only | bogon_only | skip
-    lua_util.debugm(N, task, 'mx_check verdict for %s: %s', mx_domain, outcome)
-    emit_outcome(task, mx_domain, outcome, info)
-  end)
+  -- One probe + one symbol per unique domain. Pipelines run concurrently
+  -- via rspamd's event loop.
+  for mx_domain, src in pairs(domains) do
+    if exclude_domains and exclude_domains:get_key(mx_domain) then
+      rspamd_logger.infox(task, 'skip mx check for %s, excluded (%s)', mx_domain, src)
+      emit_outcome(task, mx_domain, 'white', { key = mx_domain }, src)
+    else
+      lookup(task, mx_domain, src, function(outcome, info)
+        lua_util.debugm(N, task, 'verdict for %s (%s): %s', mx_domain, src, outcome)
+        emit_outcome(task, mx_domain, outcome, info, src)
+      end)
+    end
+  end
 end
 
--- ---------------------------------------------------------------------------
 -- Module setup.
--- ---------------------------------------------------------------------------
 
 local opts = rspamd_config:get_all_opt('mx_check')
 if not (opts and type(opts) == 'table') then
@@ -959,45 +1569,20 @@ if not (opts and type(opts) == 'table') then
   return
 end
 
-redis_params = lua_redis.parse_redis_server('mx_check')
-if not redis_params then
-  rspamd_logger.errx(rspamd_config, 'no redis servers are specified, disabling module')
-  lua_util.disable_module(N, "redis")
-  return
-end
-
 -- Honour deprecated keys: legacy `timeout` and `wait_for_greeting`.
---
--- This relies on the shipped modules.d/mx_check.conf NOT setting
--- `connect_timeout` / `verify_greeting`: their defaults live in the `settings`
--- table above, so `opts` carries the new keys only when an operator set them
--- explicitly. If the shipped config pre-populated them, the merged `opts`
--- would always carry them and a legacy alias in local.d would be silently
--- ignored.
 do
   local legacy_timeout = opts.timeout
   local legacy_wfg = opts.wait_for_greeting
-  if legacy_timeout ~= nil then
-    if opts.connect_timeout == nil then
-      opts.connect_timeout = legacy_timeout
-      rspamd_logger.warnx(rspamd_config,
-        'mx_check: `timeout` is deprecated; use `connect_timeout` (mapped automatically)')
-    else
-      rspamd_logger.warnx(rspamd_config,
-        'mx_check: both `timeout` (deprecated) and `connect_timeout` are set; ignoring `timeout`')
-    end
+  if legacy_timeout ~= nil and opts.connect_timeout == nil then
+    opts.connect_timeout = legacy_timeout
+    rspamd_logger.warnx(rspamd_config,
+      'mx_check: `timeout` is deprecated; use `connect_timeout` (mapped automatically)')
   end
-  if legacy_wfg ~= nil then
-    if opts.verify_greeting == nil then
-      opts.verify_greeting = legacy_wfg
-      rspamd_logger.warnx(rspamd_config,
-        'mx_check: `wait_for_greeting` is deprecated; use `verify_greeting` (mapped automatically). '
-          .. 'Note: the new flag also adds multi-line banner parsing and reply-code validation.')
-    else
-      rspamd_logger.warnx(rspamd_config,
-        'mx_check: both `wait_for_greeting` (deprecated) and `verify_greeting` are set; '
-          .. 'ignoring `wait_for_greeting`')
-    end
+  if legacy_wfg ~= nil and opts.verify_greeting == nil then
+    opts.verify_greeting = legacy_wfg
+    rspamd_logger.warnx(rspamd_config,
+      'mx_check: `wait_for_greeting` is deprecated; use `verify_greeting` (mapped automatically). '
+        .. 'Note: the new flag also adds multi-line banner parsing and reply-code validation.')
   end
   opts.timeout = nil
   opts.wait_for_greeting = nil
@@ -1005,141 +1590,244 @@ end
 
 settings = lua_util.override_defaults(settings, opts)
 
+redis_params = lua_redis.parse_redis_server('mx_check')
+if not redis_params then
+  rspamd_logger.errx(rspamd_config, 'no redis servers are specified, disabling module')
+  lua_util.disable_module(N, "redis")
+  return
+end
+
+if not settings.probe_ipv4 and not settings.probe_ipv6 then
+  rspamd_logger.errx(rspamd_config,
+    'mx_check: both probe_ipv4 and probe_ipv6 are disabled — nothing to probe; disabling module')
+  lua_util.disable_module(N, 'config')
+  return
+end
+
+if not settings.check_from and not settings.check_mime_from and not settings.check_reply_to then
+  rspamd_logger.errx(rspamd_config,
+    'mx_check: check_from / check_mime_from / check_reply_to are all disabled — no source to check; disabling module')
+  lua_util.disable_module(N, 'config')
+  return
+end
+
+-- i-layer TTLs must be positive (zero would mean every task re-probes
+-- every IP). expire_dns = 0 is allowed (disables d:/m: caching only);
+-- negative rejected (Redis would refuse a negative TTL).
+for _, k in ipairs({ 'expire', 'expire_novalid', 'expire_timeout' }) do
+  if not (settings[k] and settings[k] > 0) then
+    rspamd_logger.errx(rspamd_config,
+      'mx_check: %s must be > 0 (got %s); disabling module', k, settings[k])
+    lua_util.disable_module(N, 'config')
+    return
+  end
+end
+if not (settings.expire_dns and settings.expire_dns >= 0) then
+  rspamd_logger.errx(rspamd_config,
+    'mx_check: expire_dns must be >= 0 (got %s); disabling module', settings.expire_dns)
+  lua_util.disable_module(N, 'config')
+  return
+end
+
+-- max_mx_a_records caps both the MX list and the per-MX A/AAAA fan-out. Must
+-- be >= 1 always, and >= 2 when both probe families are on (otherwise we
+-- couldn't fit at least one A and one AAAA in the combined list).
+if not (settings.max_mx_a_records and settings.max_mx_a_records >= 1) then
+  rspamd_logger.errx(rspamd_config,
+    'mx_check: max_mx_a_records must be >= 1 (got %s); disabling module',
+    settings.max_mx_a_records)
+  lua_util.disable_module(N, 'config')
+  return
+end
+if settings.probe_ipv4 and settings.probe_ipv6 and settings.max_mx_a_records < 2 then
+  rspamd_logger.errx(rspamd_config,
+    'mx_check: max_mx_a_records must be >= 2 when both probe_ipv4 and probe_ipv6 are enabled (got %s); disabling module',
+    settings.max_mx_a_records)
+  lua_util.disable_module(N, 'config')
+  return
+end
+
 lua_redis.register_prefix(settings.key_prefix .. ':*', N,
   'MX check cache (three-layer: d:/m:/i:)', { type = 'string' })
 
--- Augmentation budget: worst case is one DNS round + connect + read.
+-- Augmentation budget: DNS + Redis + connect + read (worst case). Redis
+-- timeout sourced from parse_redis_server's resolved value (nested redis{}
+-- > global redis.conf > 1.0 default).
 local dns_to = rspamd_config:get_dns_timeout() or 0.0
-local budget = settings.connect_timeout + settings.read_timeout + dns_to
-
+local redis_to = (redis_params and redis_params.timeout) or 0.0
+local budget = settings.connect_timeout + settings.read_timeout
+    + dns_to + redis_to
+
+-- Stable callback parent. All per-source variants (envelope-from, MIME From,
+-- Reply-To) register as virtual children of MX_CHECK so symbols_enabled or
+-- disabled toggles and group / dependency declarations operate on a single
+-- name that doesn't shift with symbols renames via settings.
 local id = rspamd_config:register_symbol({
-  name = settings.symbol_bad_mx,
-  type = 'normal',
+  name = 'MX_CHECK',
+  group = 'mx',
+  type = 'callback',
   callback = mx_check,
   flags = 'empty',
   augmentations = { string.format("timeout=%f", budget) },
 })
 
-local function register_virtual(name)
-  rspamd_config:register_symbol({
-    name = name,
-    type = 'virtual',
-    parent = id,
-  })
+local function register_all_sources(base_name)
+  local prefixes = {
+    settings.symbol_prefix_from,
+    settings.symbol_prefix_mime_from,
+    settings.symbol_prefix_reply_to,
+  }
+  for _, prefix in ipairs(prefixes) do
+    rspamd_config:register_symbol({ name = prefix .. base_name, type = 'virtual', parent = id })
+  end
 end
 
-register_virtual(settings.symbol_no_mx)
-register_virtual(settings.symbol_good_mx)
-register_virtual(settings.symbol_white_mx)
-register_virtual(settings.symbol_mx_refused)
-register_virtual(settings.symbol_mx_timeout_connect)
-register_virtual(settings.symbol_mx_timeout_read)
-register_virtual(settings.symbol_mx_error)
-register_virtual(settings.symbol_mx_nxdomain)
-register_virtual(settings.symbol_mx_null)
-register_virtual(settings.symbol_mx_broken)
-register_virtual(settings.symbol_mx_local_only)
-register_virtual(settings.symbol_mx_local_mix)
-register_virtual(settings.symbol_mx_bogon_only)
-register_virtual(settings.symbol_mx_bogon_mix)
-register_virtual(settings.symbol_mx_skip)
-
--- Primary metric symbols (today's scores).
-rspamd_config:set_metric_symbol({
-  name = settings.symbol_bad_mx,
-  score = 0.5,
-  description = 'Domain has no working MX',
-  group = 'MX',
-  one_shot = true,
-  one_param = true,
-})
-rspamd_config:set_metric_symbol({
-  name = settings.symbol_good_mx,
-  score = -0.01,
-  description = 'Domain has working MX',
-  group = 'MX',
-  one_shot = true,
-  one_param = true,
-})
-rspamd_config:set_metric_symbol({
-  name = settings.symbol_white_mx,
-  score = 0.0,
-  description = 'Domain is whitelisted from MX check',
-  group = 'MX',
-  one_shot = true,
-  one_param = true,
-})
-rspamd_config:set_metric_symbol({
-  name = settings.symbol_no_mx,
-  score = 3.5,
-  description = 'Domain has no resolvable MX',
-  group = 'MX',
-  one_shot = true,
-  one_param = true,
-})
-
--- Finer symbols: registered at score 0 in Phase A.  Phase B flips them on
--- with real defaults; operators can override scores today.
-local function set_finer(name, description)
-  rspamd_config:set_metric_symbol({
-    name = name,
-    score = 0.0,
-    description = description,
-    group = 'MX',
-    one_shot = true,
-    one_param = true,
-  })
+register_all_sources(settings.symbol_bad_mx)
+register_all_sources(settings.symbol_good_mx)
+register_all_sources(settings.symbol_white_mx)
+register_all_sources(settings.symbol_mx_refused)
+register_all_sources(settings.symbol_mx_timeout_connect)
+register_all_sources(settings.symbol_mx_timeout_read)
+register_all_sources(settings.symbol_mx_error)
+register_all_sources(settings.symbol_mx_none)
+register_all_sources(settings.symbol_mx_null)
+register_all_sources(settings.symbol_mx_broken)
+register_all_sources(settings.symbol_mx_dns_fail)
+register_all_sources(settings.symbol_mx_local_only)
+register_all_sources(settings.symbol_mx_local_mix)
+register_all_sources(settings.symbol_mx_bogon_only)
+register_all_sources(settings.symbol_mx_bogon_mix)
+register_all_sources(settings.symbol_mx_skip)
+register_all_sources(settings.symbol_mx_bad)
+register_all_sources(settings.symbol_mx_ip_bad)
+register_all_sources(settings.symbol_mx_inflight)
+register_all_sources(settings.symbol_mx_redis_error)
+register_all_sources(settings.symbol_mx_a_good)
+register_all_sources(settings.symbol_mx_a_refused)
+register_all_sources(settings.symbol_mx_a_timeout_connect)
+register_all_sources(settings.symbol_mx_a_timeout_read)
+register_all_sources(settings.symbol_mx_a_error)
+register_all_sources(settings.symbol_mx_a_invalid)
+
+-- Metric defaults fan out across the 3 source prefixes with equal weight.
+local function set_metric_all_sources(base_name, score, description)
+  local prefixes = {
+    settings.symbol_prefix_from,
+    settings.symbol_prefix_mime_from,
+    settings.symbol_prefix_reply_to,
+  }
+  for _, prefix in ipairs(prefixes) do
+    rspamd_config:set_metric_symbol({
+      name = prefix .. base_name,
+      score = score,
+      description = description,
+      group = 'mx',
+      one_shot = true,
+    })
+  end
 end
 
-set_finer(settings.symbol_mx_refused, 'MX target sent TCP RST (port 25 closed)')
-set_finer(settings.symbol_mx_timeout_connect, 'MX target did not respond to connect attempt')
-set_finer(settings.symbol_mx_timeout_read, 'MX target accepted TCP but did not send greeting')
-set_finer(settings.symbol_mx_error, 'MX target greeted with 4xx/5xx (real SMTP, rejected probe)')
-set_finer(settings.symbol_mx_nxdomain, 'Domain itself does not exist (NXDOMAIN)')
-set_finer(settings.symbol_mx_null, 'Domain published RFC 7505 Null MX')
-set_finer(settings.symbol_mx_broken, 'All MX RRs point at hostnames that do not resolve')
-set_finer(settings.symbol_mx_local_only, 'MX resolves only to private (RFC1918/CGNAT/ULA) addresses')
-set_finer(settings.symbol_mx_local_mix, 'MX resolves to a mix of public and private addresses')
-set_finer(settings.symbol_mx_bogon_only, 'MX resolves only to non-routable/reserved addresses')
-set_finer(settings.symbol_mx_bogon_mix, 'MX resolves to a mix of public and non-routable addresses')
-set_finer(settings.symbol_mx_skip, 'MX probe skipped: every candidate IP matched exclude_ips')
+set_metric_all_sources(settings.symbol_bad_mx, 3.0,
+  'MX target accepted TCP but listener does not speak SMTP')
+set_metric_all_sources(settings.symbol_good_mx, -0.1,
+  'Domain has working MX')
+set_metric_all_sources(settings.symbol_white_mx, -0.1,
+  'Domain is whitelisted from MX check')
+
+-- Default symbol weights. Operators can override any per-deployment via
+-- local.d/mx_group.conf or override.d/mx_group.conf.
+set_metric_all_sources(settings.symbol_mx_refused, 3.0,
+  'MX target sent TCP RST (port 25 closed)')
+set_metric_all_sources(settings.symbol_mx_timeout_connect, 2.0,
+  'MX target did not respond to connect attempt')
+set_metric_all_sources(settings.symbol_mx_timeout_read, 0.1,
+  'MX target accepted TCP but did not send greeting')
+set_metric_all_sources(settings.symbol_mx_error, 0.0,
+  'MX target greeted with 4xx/5xx (real SMTP, rejected probe)')
+set_metric_all_sources(settings.symbol_mx_none, 4.0,
+  'From domain has no MX/A/AAAA records (covers NXDOMAIN and NOREC)')
+set_metric_all_sources(settings.symbol_mx_null, 6.0,
+  'Domain published RFC 7505 Null MX')
+set_metric_all_sources(settings.symbol_mx_broken, 4.0,
+  'All MX RRs point at hostnames that do not resolve')
+set_metric_all_sources(settings.symbol_mx_dns_fail, 0.0,
+  'Transient DNS path failure (SERVFAIL/REFUSED/timeout); sender not at fault')
+set_metric_all_sources(settings.symbol_mx_local_only, 3.0,
+  'All resolved MX IPs are in private ranges (RFC1918 / CGNAT / ULA); no probe run')
+set_metric_all_sources(settings.symbol_mx_local_mix, 3.0,
+  'Some resolved MX IPs are in private ranges; public subset probed')
+set_metric_all_sources(settings.symbol_mx_bogon_only, 8.0,
+  'All resolved MX IPs are bogon / non-routable (loopback, TEST-NET, multicast, etc.); no probe run')
+set_metric_all_sources(settings.symbol_mx_bogon_mix, 5.0,
+  'Some resolved MX IPs are bogon / non-routable; public subset probed')
+set_metric_all_sources(settings.symbol_mx_skip, 0.0,
+  'exclude_ips filtered every routable MX IP away; no probe run')
+set_metric_all_sources(settings.symbol_mx_bad, 6.0,
+  'MX hostname listed in bad_mxs (operator-defined punishment glob)')
+set_metric_all_sources(settings.symbol_mx_ip_bad, 6.0,
+  'Resolved MX IP listed in bad_ips (operator-defined punishment radix)')
+set_metric_all_sources(settings.symbol_mx_inflight, 0.0,
+  'Another rspamd worker holds the i-layer probe lock; verdict will land via that worker')
+set_metric_all_sources(settings.symbol_mx_redis_error, 0.0,
+  'Redis error during probe-lock claim; probe skipped (module cache layer degraded)')
+
+-- A-fallback path. Failure shapes are stronger than MX-RR equivalents (no
+-- published mail intent + no working A listener = textbook forgery / parked
+-- domain). MX_A_GOOD stays neutral (legitimate RFC 5321 §5.1 deployment).
+set_metric_all_sources(settings.symbol_mx_a_good, 0.0,
+  'A-fallback target accepted SMTP (no MX RR; RFC 5321 §5.1 compliant)')
+set_metric_all_sources(settings.symbol_mx_a_refused, 3.0,
+  'A-fallback target sent TCP RST (port 25 closed)')
+set_metric_all_sources(settings.symbol_mx_a_timeout_connect, 2.5,
+  'A-fallback target did not respond to connect attempt')
+set_metric_all_sources(settings.symbol_mx_a_timeout_read, 0.1,
+  'A-fallback target accepted TCP but did not send SMTP greeting')
+set_metric_all_sources(settings.symbol_mx_a_error, 0.0,
+  'A-fallback target greeted with 4xx/5xx (real SMTP, rejected probe)')
+set_metric_all_sources(settings.symbol_mx_a_invalid, 3.0,
+  'A-fallback target accepted TCP but listener does not speak SMTP')
+
+-- Static radix maps for IP-class classification. test_mode lifts loopback
+-- out of the bogon set so the probe path stays exercisable against a local
+-- listener; production must NEVER enable this.
+local bogon_cidrs = BOGON_CIDRS
+if settings.test_mode then
+  rspamd_logger.warnx(rspamd_config,
+    'mx_check: test_mode is ON, loopback is treated as probeable; '
+      .. 'do NOT use this in production')
+  bogon_cidrs = {}
+  for _, r in ipairs(BOGON_CIDRS) do
+    if not LOOPBACK_CIDRS[r] then
+      bogon_cidrs[#bogon_cidrs + 1] = r
+    end
+  end
+end
+local_ip_map = lua_maps.map_add_from_ucl(LOCAL_CIDRS, 'radix',
+  'mx_check LOCAL ranges (RFC1918, CGNAT, ULA)')
+bogon_ip_map = lua_maps.map_add_from_ucl(bogon_cidrs, 'radix',
+  'mx_check BOGON ranges (loopback, link-local, TEST-NET, multicast, etc.)')
 
 if settings.exclude_domains then
-  exclude_domains = rspamd_config:add_map {
-    type = 'set',
-    description = 'Exclude specific domains from MX checks',
-    url = settings.exclude_domains,
-  }
+  exclude_domains = lua_maps.map_add('mx_check', 'exclude_domains', 'glob',
+    'Exclude specific domains from MX checks')
 end
 
--- Per-layer trust/skip maps (Phase C). exclude_mxs is a glob map so a bare
--- hostname matches exactly while an explicit `*.foo` matches subdomains.
 if settings.exclude_mxs then
-  exclude_mxs = lua_maps.map_add_from_ucl(settings.exclude_mxs, 'glob',
-    'mx_check: trusted MX hostnames (bare = exact, *.glob = subdomains)')
+  exclude_mxs = lua_maps.map_add('mx_check', 'exclude_mxs', 'glob',
+    'Exclude specific MX hostnames from MX checks (m-layer trust)')
 end
+
 if settings.exclude_ips then
-  exclude_ips = lua_maps.map_add_from_ucl(settings.exclude_ips, 'radix',
-    'mx_check: IP/CIDR ranges to drop from the probe set')
+  exclude_ips = lua_maps.map_add('mx_check', 'exclude_ips', 'radix',
+    'Exclude specific IPs/CIDRs from MX probing (i-layer skip)')
 end
 
--- Module-private IP-class radix maps. test_mode lifts loopback out of the
--- bogon set so the probe path stays exercisable against a local listener.
-do
-  local bogon_ranges = BOGON_RANGES
-  if settings.test_mode then
-    rspamd_logger.warnx(rspamd_config,
-      'mx_check: test_mode is ON — loopback is treated as probeable; '
-        .. 'do NOT use this in production')
-    bogon_ranges = {}
-    for _, r in ipairs(BOGON_RANGES) do
-      if not LOOPBACK_RANGES[r] then
-        bogon_ranges[#bogon_ranges + 1] = r
-      end
-    end
-  end
-  local_map = lua_maps.map_add_from_ucl(LOCAL_RANGES, 'radix',
-    'mx_check: private/CGNAT/ULA MX-target ranges')
-  bogon_map = lua_maps.map_add_from_ucl(bogon_ranges, 'radix',
-    'mx_check: non-routable/reserved MX-target ranges')
+if settings.bad_mxs then
+  bad_mxs = lua_maps.map_add('mx_check', 'bad_mxs', 'glob',
+    'Punish specific MX hostnames (short-circuits with MX_BAD)')
+end
+
+if settings.bad_ips then
+  bad_ips = lua_maps.map_add('mx_check', 'bad_ips', 'radix',
+    'Punish specific IPs/CIDRs (short-circuits with MX_IP_BAD)')
 end
index 585185a9c57f74ff268a95654738304994cb4be0..4d55b77bc06ccf6081a13c2e8700b5ef57bafd83 100644 (file)
@@ -18,44 +18,34 @@ ${SETTINGS}        {symbols_enabled = [MX_INVALID]}
 Null MX domain emits MX_NULL
   Scan File  ${MESSAGE}  From=test@nullmx.test  Settings=${SETTINGS}
   Expect Symbol  MX_NULL
-  # MX_INVALID is the primary skip-mail signal for Null MX domains
-  Expect Symbol  MX_INVALID
 
-NXDOMAIN domain emits MX_NXDOMAIN
+NXDOMAIN domain emits MX_NONE
   Scan File  ${MESSAGE}  From=test@nxdmx.test  Settings=${SETTINGS}
-  Expect Symbol  MX_NXDOMAIN
-  Expect Symbol  MX_INVALID
+  Expect Symbol  MX_NONE
 
 Broken MX target emits MX_BROKEN
   Scan File  ${MESSAGE}  From=test@brokenmx.test  Settings=${SETTINGS}
   Expect Symbol  MX_BROKEN
-  Expect Symbol  MX_INVALID
 
 Refused MX target emits MX_REFUSED
   Scan File  ${MESSAGE}  From=test@refused.test  Settings=${SETTINGS}
   Expect Symbol  MX_REFUSED
-  Expect Symbol  MX_INVALID
 
-A-fallback (no MX, A points at closed port) emits MX_MISSING and MX_INVALID
+A-fallback (no MX, A points at closed port) emits MX_A_REFUSED
   Scan File  ${MESSAGE}  From=test@amxmiss.test  Settings=${SETTINGS}
-  Expect Symbol  MX_MISSING
-  Expect Symbol  MX_INVALID
+  Expect Symbol  MX_A_REFUSED
 
-Domain with no MX and no A (NODATA) is not treated as NXDOMAIN
+Domain with no MX and no A (NODATA) emits MX_NONE
   Scan File  ${MESSAGE}  From=test@noaddr.test  Settings=${SETTINGS}
-  Expect Symbol  MX_MISSING
-  Expect Symbol  MX_INVALID
-  Do Not Expect Symbol  MX_NXDOMAIN
+  Expect Symbol  MX_NONE
 
 MX resolving only to private addresses emits MX_LOCAL_ONLY
   Scan File  ${MESSAGE}  From=test@localmx.test  Settings=${SETTINGS}
   Expect Symbol  MX_LOCAL_ONLY
-  Expect Symbol  MX_INVALID
 
 MX resolving only to non-routable addresses emits MX_BOGON_ONLY
   Scan File  ${MESSAGE}  From=test@bogonmx.test  Settings=${SETTINGS}
   Expect Symbol  MX_BOGON_ONLY
-  Expect Symbol  MX_INVALID
 
 MX hostname in exclude_mxs short-circuits with MX_WHITE
   Scan File  ${MESSAGE}  From=test@trustedmx.test  Settings=${SETTINGS}
@@ -67,9 +57,19 @@ MX whose only IP is in exclude_ips emits MX_SKIP
   Expect Symbol  MX_SKIP
   Do Not Expect Symbol  MX_INVALID
 
+MX hostname in bad_mxs short-circuits with MX_BAD
+  Scan File  ${MESSAGE}  From=test@badmx.test  Settings=${SETTINGS}
+  Expect Symbol  MX_BAD
+  Do Not Expect Symbol  MX_INVALID
+
+MX whose IP is in bad_ips short-circuits with MX_IP_BAD
+  Scan File  ${MESSAGE}  From=test@badipmx.test  Settings=${SETTINGS}
+  Expect Symbol  MX_IP_BAD
+  Do Not Expect Symbol  MX_INVALID
+
 Local-network sender is skipped unless check_local is set
   Scan File  ${MESSAGE}  From=test@nxdmx.test  IP=127.0.0.1  Settings=${SETTINGS}
-  Do Not Expect Symbol  MX_NXDOMAIN
+  Do Not Expect Symbol  MX_NONE
   Do Not Expect Symbol  MX_INVALID
 
 *** Keywords ***
diff --git a/test/functional/cases/168_mx_check_greeting.robot b/test/functional/cases/168_mx_check_greeting.robot
new file mode 100644 (file)
index 0000000..6a58105
--- /dev/null
@@ -0,0 +1,67 @@
+*** Settings ***
+Suite Setup     Mx Greeting Setup
+Suite Teardown  Mx Greeting Teardown
+Library         OperatingSystem
+Library         Process
+Library         ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource        ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables       ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG}             ${RSPAMD_TESTDIR}/configs/mx_check-greeting.conf
+${MESSAGE}            ${RSPAMD_TESTDIR}/messages/spam_message.eml
+${REDIS_SCOPE}        Suite
+${RSPAMD_SCOPE}       Suite
+${RSPAMD_URL_TLD}     ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+${SETTINGS}           {symbols_enabled = [MX_INVALID]}
+${SINGLE_STATUS}      /tmp/dummy_smtp_greeting_single.status
+
+*** Test Cases ***
+Silent SMTP listener triggers MX_TIMEOUT_READ
+  Scan File  ${MESSAGE}  From=test@silentsmtp.test  Settings=${SETTINGS}
+  Expect Symbol  MX_TIMEOUT_READ
+
+Continuation 220- without follow-up (send_quit=false) emits MX_GOOD
+  # The dummy emits "220-Greeting" then holds the connection longer than
+  # read_timeout without sending a second line. A regression that waits
+  # for a continuation under send_quit=false would surface here as
+  # MX_TIMEOUT_READ.
+  Scan File  ${MESSAGE}  From=test@greetingsmtp.test  Settings=${SETTINGS}
+  Expect Symbol  MX_GOOD
+  Do Not Expect Symbol  MX_TIMEOUT_READ
+  Sleep  0.5s
+  ${status} =  Get File  ${SINGLE_STATUS}
+  Should Contain  ${status}  OK_NO_QUIT
+
+5xx greeting triggers MX_ERROR
+  Scan File  ${MESSAGE}  From=test@errorsmtp.test  Settings=${SETTINGS}
+  Expect Symbol  MX_ERROR
+
+Non-SMTP line triggers MX_INVALID
+  Scan File  ${MESSAGE}  From=test@messysmtp.test  Settings=${SETTINGS}
+  Expect Symbol  MX_INVALID
+
+*** Keywords ***
+Start Plain Dummy
+  [Arguments]  ${mode}  ${host}
+  Start Process  ${RSPAMD_TESTDIR}/util/dummy_smtp.py
+  ...  --port  11125  --mode  ${mode}  --host  ${host}
+  ...  stderr=/tmp/dummy_smtp_${mode}.log
+  ...  stdout=/tmp/dummy_smtp_${mode}.log
+  Wait Until Created  /tmp/dummy_smtp_${mode}.pid  timeout=2 second
+
+Mx Greeting Setup
+  Start Plain Dummy  silent  127.0.0.1
+  Start Process  ${RSPAMD_TESTDIR}/util/dummy_smtp.py
+  ...  --port  11125  --mode  greeting_single  --host  127.0.0.2
+  ...  --status-file  ${SINGLE_STATUS}
+  ...  stderr=/tmp/dummy_smtp_greeting_single.log
+  ...  stdout=/tmp/dummy_smtp_greeting_single.log
+  Wait Until Created  /tmp/dummy_smtp_greeting_single.pid  timeout=2 second
+  Start Plain Dummy  error   127.0.0.3
+  Start Plain Dummy  messy   127.0.0.4
+  Rspamd Redis Setup
+
+Mx Greeting Teardown
+  Rspamd Redis Teardown
+  Terminate All Processes  kill=True
diff --git a/test/functional/cases/169_mx_check_greeting_quit.robot b/test/functional/cases/169_mx_check_greeting_quit.robot
new file mode 100644 (file)
index 0000000..0d2e96a
--- /dev/null
@@ -0,0 +1,53 @@
+*** Settings ***
+Suite Setup     Mx Quit Setup
+Suite Teardown  Mx Quit Teardown
+Library         OperatingSystem
+Library         Process
+Library         ${RSPAMD_TESTDIR}/lib/rspamd.py
+Resource        ${RSPAMD_TESTDIR}/lib/rspamd.robot
+Variables       ${RSPAMD_TESTDIR}/lib/vars.py
+
+*** Variables ***
+${CONFIG}             ${RSPAMD_TESTDIR}/configs/mx_check-greeting-quit.conf
+${MESSAGE}            ${RSPAMD_TESTDIR}/messages/spam_message.eml
+${REDIS_SCOPE}        Suite
+${RSPAMD_SCOPE}       Suite
+${RSPAMD_URL_TLD}     ${RSPAMD_TESTDIR}/../lua/unit/test_tld.dat
+${SETTINGS}           {symbols_enabled = [MX_INVALID]}
+${PROPER_STATUS}      /tmp/dummy_smtp_greeting_proper.status
+${SLOW_STATUS}        /tmp/dummy_smtp_greeting_slow.status
+
+*** Test Cases ***
+Multi-line greeting with send_quit=true emits MX_GOOD with QUIT after final line
+  Scan File  ${MESSAGE}  From=test@multigreetsmtp.test  Settings=${SETTINGS}
+  Expect Symbol  MX_GOOD
+  Sleep  0.5s
+  ${status} =  Get File  ${PROPER_STATUS}
+  Should Contain  ${status}  QUIT_AFTER_FINAL
+
+Slow second banner line triggers MX_TIMEOUT_READ
+  Scan File  ${MESSAGE}  From=test@slowsmtp.test  Settings=${SETTINGS}
+  Expect Symbol  MX_TIMEOUT_READ
+
+*** Keywords ***
+Start Greeting Dummy
+  [Arguments]  ${host}  ${between_wait}  ${status_file}  ${pid_suffix}
+  Start Process  ${RSPAMD_TESTDIR}/util/dummy_smtp.py
+  ...  --port  11125
+  ...  --mode  greeting_multi
+  ...  --host  ${host}
+  ...  --between-wait  ${between_wait}
+  ...  --status-file  ${status_file}
+  ...  --pid-file  /tmp/dummy_smtp_${pid_suffix}.pid
+  ...  stderr=/tmp/dummy_smtp_${pid_suffix}.log
+  ...  stdout=/tmp/dummy_smtp_${pid_suffix}.log
+  Wait Until Created  /tmp/dummy_smtp_${pid_suffix}.pid  timeout=2 second
+
+Mx Quit Setup
+  Start Greeting Dummy  127.0.0.6  0.2  ${PROPER_STATUS}  greeting_proper
+  Start Greeting Dummy  127.0.0.5  2.0  ${SLOW_STATUS}    greeting_slow
+  Rspamd Redis Setup
+
+Mx Quit Teardown
+  Rspamd Redis Teardown
+  Terminate All Processes  kill=True
index 4b5f072f0e79e41641227302f8fbb96893a8f2c3..48b5218083ebd4a5ec939849234f9d5874d84442 100644 (file)
@@ -101,6 +101,102 @@ options {
         name = "host.skipmx.test";
         type = "a";
         replies = ["127.0.0.9"];
+      },
+      {
+        # MX hostname matches bad_mxs (*.bad.test) -> MX_BAD.
+        # No A record needed: bad_mxs short-circuits before resolving.
+        name = "badmx.test";
+        type = "mx";
+        replies = ["10 trapmx.bad.test"];
+      },
+      {
+        # MX target IP matches bad_ips -> MX_IP_BAD short-circuit at step3.
+        # 1.2.3.4 is public (outside LOCAL/BOGON), so it survives IP-class
+        # classification and reaches the bad_ips check.
+        name = "badipmx.test";
+        type = "mx";
+        replies = ["10 victim.badipmx.test"];
+      },
+      {
+        name = "victim.badipmx.test";
+        type = "a";
+        replies = ["1.2.3.4"];
+      },
+      {
+        # Paired with dummy_smtp.py silent listener (127.0.0.1) in
+        # 168_mx_check_greeting.robot -> MX_TIMEOUT_READ on verify_greeting.
+        name = "silentsmtp.test";
+        type = "mx";
+        replies = ["10 silent.smtpok.test"];
+      },
+      {
+        name = "silent.smtpok.test";
+        type = "a";
+        replies = ["127.0.0.1"];
+      },
+      {
+        # 127.0.0.2 hosts a greeting listener whose exact behaviour is
+        # decided by the test suite: greeting_single under
+        # mx_check-greeting.conf (send_quit=false -> MX_GOOD plus dummy
+        # status OK_NO_QUIT), or greeting_multi under
+        # mx_check-greeting-quit.conf (send_quit=true -> MX_GOOD plus
+        # dummy status QUIT_AFTER_FINAL).
+        name = "greetingsmtp.test";
+        type = "mx";
+        replies = ["10 greeting.smtpok.test"];
+      },
+      {
+        name = "greeting.smtpok.test";
+        type = "a";
+        replies = ["127.0.0.2"];
+      },
+      {
+        # 127.0.0.5 hosts a greeting_multi listener with
+        # --between-wait > read_timeout, so the second-line read times
+        # out in rspamd (-> MX_TIMEOUT_READ on the verify_greeting path).
+        name = "slowsmtp.test";
+        type = "mx";
+        replies = ["10 slow.smtpok.test"];
+      },
+      {
+        name = "slow.smtpok.test";
+        type = "a";
+        replies = ["127.0.0.5"];
+      },
+      {
+        # 127.0.0.6 hosts a greeting_multi listener with send_quit=true.
+        name = "multigreetsmtp.test";
+        type = "mx";
+        replies = ["10 multigreet.smtpok.test"];
+      },
+      {
+        name = "multigreet.smtpok.test";
+        type = "a";
+        replies = ["127.0.0.6"];
+      },
+      {
+        # Paired with dummy_smtp.py error listener (127.0.0.3) -- sends a
+        # 5xx code instead of a 220 greeting (-> MX_ERROR).
+        name = "errorsmtp.test";
+        type = "mx";
+        replies = ["10 error.smtpok.test"];
+      },
+      {
+        name = "error.smtpok.test";
+        type = "a";
+        replies = ["127.0.0.3"];
+      },
+      {
+        # Paired with dummy_smtp.py messy listener (127.0.0.4) -- sends a
+        # non-SMTP-shaped line so banner parsing fails (-> MX_INVALID).
+        name = "messysmtp.test";
+        type = "mx";
+        replies = ["10 messy.smtpok.test"];
+      },
+      {
+        name = "messy.smtpok.test";
+        type = "a";
+        replies = ["127.0.0.4"];
       }
     ]
   }
diff --git a/test/functional/configs/mx_check-greeting-quit.conf b/test/functional/configs/mx_check-greeting-quit.conf
new file mode 100644 (file)
index 0000000..0a87d52
--- /dev/null
@@ -0,0 +1,23 @@
+.include(duplicate=merge,priority=0) "{= env.TESTDIR =}/configs/plugins.conf"
+.include(duplicate=merge,priority=1) "{= env.TESTDIR =}/configs/mx_check-dns.conf"
+
+redis {
+  servers = "{= env.REDIS_ADDR =}:{= env.REDIS_PORT =}";
+}
+
+mx_check {
+  enabled = true;
+  # verify_greeting=true, send_quit=true. Paired with two greeting_multi
+  # listeners: the "proper" one waits 0.2s between banner lines (fits
+  # within read_timeout), the "slow" one waits 2s (exceeds read_timeout to
+  # surface MX_TIMEOUT_READ on the second-line read).
+  port = 11125;
+  connect_timeout = 2;
+  read_timeout = 1;
+  greylist_invalid = false;
+  test_mode = true;
+  verify_greeting = true;
+  send_quit = true;
+  check_mime_from = false;
+  check_reply_to = false;
+}
diff --git a/test/functional/configs/mx_check-greeting.conf b/test/functional/configs/mx_check-greeting.conf
new file mode 100644 (file)
index 0000000..e25a134
--- /dev/null
@@ -0,0 +1,22 @@
+.include(duplicate=merge,priority=0) "{= env.TESTDIR =}/configs/plugins.conf"
+.include(duplicate=merge,priority=1) "{= env.TESTDIR =}/configs/mx_check-dns.conf"
+
+redis {
+  servers = "{= env.REDIS_ADDR =}:{= env.REDIS_PORT =}";
+}
+
+mx_check {
+  enabled = true;
+  # verify_greeting=true, send_quit=false. Paired with dummy_smtp.py listeners
+  # on this port (silent, error, messy, greeting_single).
+  port = 11125;
+  connect_timeout = 2;
+  read_timeout = 1;
+  greylist_invalid = false;
+  test_mode = true;
+  verify_greeting = true;
+  send_quit = false;
+  # Single-source for deterministic assertions.
+  check_mime_from = false;
+  check_reply_to = false;
+}
index d30e209088aafbbeb8fe978d78bb301495f54687..648cfba5e267db44597ca3fca1b19009e27f07f2 100644 (file)
@@ -17,6 +17,11 @@ mx_check {
   # exercised against 127.0.0.1. Other reserved ranges (RFC1918, TEST-NET)
   # still classify, so MX_LOCAL_* / MX_BOGON_* stay testable too.
   test_mode = true;
+  # Focus on envelope-from for deterministic single-symbol assertions.
+  check_mime_from = false;
+  check_reply_to = false;
   exclude_mxs = ["*.trusted.test"];
   exclude_ips = ["127.0.0.9/32"];
+  bad_mxs = ["*.bad.test"];
+  bad_ips = ["1.2.3.4/32"];
 }
diff --git a/test/functional/util/dummy_smtp.py b/test/functional/util/dummy_smtp.py
new file mode 100755 (executable)
index 0000000..afc8783
--- /dev/null
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+# Dummy SMTP listener for mx_check verify_greeting / send_quit tests.
+#
+# Modes:
+#   silent          - accept the TCP connection and never write a banner.
+#                     Exercises the read-timeout path (-> MX_TIMEOUT_READ).
+#   error           - send a 5xx code, no banner negotiation (-> MX_ERROR).
+#   messy           - send a non-SMTP-shaped line so banner parsing fails
+#                     (-> MX_INVALID).
+#   greeting_single - send a single-line "220 hello" banner. Used to verify
+#                     the no-send_quit path: rspamd should fire MX_GOOD and
+#                     close without sending anything. The dummy waits briefly
+#                     after the banner; if the client transmits anything it's
+#                     recorded as UNEXPECTED_QUIT, otherwise OK_NO_QUIT.
+#   greeting_multi  - send a multi-line 220 banner with --between-wait
+#                     between lines. Detects whether QUIT arrives between
+#                     lines (QUIT_BEFORE_FINAL), after the final line
+#                     (QUIT_AFTER_FINAL), or not at all (NO_QUIT / EOF_BEFORE
+#                     _FINAL when the client closes early). Pair with
+#                     --between-wait < read_timeout for a healthy run and
+#                     --between-wait > read_timeout to exercise the slow-
+#                     second-line MX_TIMEOUT_READ case.
+#
+# CLI:
+#   --port PORT          (default 11125)
+#   --mode MODE          (default silent)
+#   --host HOST          (default 127.0.0.1)
+#   --pre-wait SEC       (default 0.0)  delay before sending anything
+#   --between-wait SEC   (default 0.2)  greeting_multi: wait between lines
+#                                        (also the early-QUIT detection window)
+#   --status-file PATH   (optional)     write final status here for the test
+#                                        to verify QUIT timing out-of-band
+#   --pid-file PATH      (default /tmp/dummy_smtp_<mode>.pid)
+
+import argparse
+import os
+import select
+import socket
+import socketserver
+import sys
+import time
+
+import dummy_killer
+
+
+def _write_status(path, value):
+    if not path:
+        return
+    try:
+        with open(path, "w") as f:
+            f.write(value + "\n")
+    except Exception:
+        pass
+
+
+def _make_handler(args):
+
+    class Handler(socketserver.BaseRequestHandler):
+
+        def handle(self):
+            if args.pre_wait > 0:
+                time.sleep(args.pre_wait)
+
+            if args.mode == "silent":
+                try:
+                    time.sleep(30)
+                except Exception:
+                    pass
+                return
+
+            if args.mode == "error":
+                self.request.sendall(b"550 not interested\r\n")
+                return
+
+            if args.mode == "messy":
+                self.request.sendall(
+                    b"This is a messy message to you, no smtp here\r\n")
+                return
+
+            if args.mode == "greeting_single":
+                # Send a CONTINUATION banner line on purpose (220-, not "220 ")
+                # so the test exercises mx_check's send_quit=false branch:
+                # rspamd should fire MX_GOOD off the first line without
+                # waiting for a continuation. Then hold the connection well
+                # past read_timeout without sending anything more -- a
+                # regression that re-queued the read would surface as
+                # MX_TIMEOUT_READ from rspamd's side.
+                self.request.sendall(b"220-Greeting\r\n")
+                ready, _, _ = select.select([self.request], [], [], 2.0)
+                if ready:
+                    try:
+                        data = self.request.recv(1024)
+                    except Exception:
+                        data = b""
+                    if data:
+                        _write_status(args.status_file, "UNEXPECTED_QUIT")
+                    else:
+                        _write_status(args.status_file, "OK_NO_QUIT")
+                else:
+                    _write_status(args.status_file, "NO_CLOSE")
+                return
+
+            if args.mode == "greeting_multi":
+                self.request.sendall(b"220-Greeting\r\n")
+                # Wait between lines. Any data here means QUIT was sent
+                # before the banner finished.
+                ready, _, _ = select.select([self.request], [], [],
+                                            args.between_wait)
+                if ready:
+                    try:
+                        data = self.request.recv(1024)
+                    except Exception:
+                        data = b""
+                    if data:
+                        _write_status(args.status_file, "QUIT_BEFORE_FINAL")
+                    else:
+                        # Peer closed before the final line was sent (e.g.
+                        # rspamd timed out and closed mid-banner).
+                        _write_status(args.status_file, "EOF_BEFORE_FINAL")
+                    return
+                # Final line; under correct mx_check behaviour QUIT should
+                # follow shortly.
+                try:
+                    self.request.sendall(b"220 Now you can speak\r\n")
+                except Exception:
+                    _write_status(args.status_file, "EOF_BEFORE_FINAL")
+                    return
+                self.request.settimeout(2)
+                try:
+                    data = self.request.recv(1024)
+                except Exception:
+                    data = b""
+                if data:
+                    _write_status(args.status_file, "QUIT_AFTER_FINAL")
+                    try:
+                        self.request.sendall(b"221 bye\r\n")
+                    except Exception:
+                        pass
+                else:
+                    _write_status(args.status_file, "NO_QUIT")
+                return
+
+    return Handler
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--port", type=int, default=11125)
+    parser.add_argument("--mode",
+                        choices=["silent", "error", "messy",
+                                 "greeting_single", "greeting_multi"],
+                        default="silent")
+    parser.add_argument("--host", default="127.0.0.1")
+    parser.add_argument("--pre-wait", dest="pre_wait", type=float, default=0.0)
+    parser.add_argument("--between-wait", dest="between_wait", type=float,
+                        default=0.2)
+    parser.add_argument("--status-file", dest="status_file", default=None)
+    parser.add_argument("--pid-file", dest="pid_file", default=None)
+    args = parser.parse_args()
+
+    if not args.pid_file:
+        args.pid_file = "/tmp/dummy_smtp_%s.pid" % args.mode
+
+    # Clear stale status file from a previous run so tests don't trip on it.
+    if args.status_file and os.path.exists(args.status_file):
+        try:
+            os.remove(args.status_file)
+        except Exception:
+            pass
+
+    handler = _make_handler(args)
+    server = socketserver.TCPServer((args.host, args.port), handler,
+                                    bind_and_activate=False)
+    server.allow_reuse_address = True
+    server.server_bind()
+    server.server_activate()
+
+    dummy_killer.setup_killer(server)
+    dummy_killer.write_pid(args.pid_file)
+
+    try:
+        server.serve_forever()
+    except socket.error:
+        print("Socket closed")
+    server.server_close()