]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] upstream: defer DNS resolution for unreachable hosts
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 25 Apr 2026 18:42:27 +0000 (19:42 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 25 Apr 2026 18:42:27 +0000 (19:42 +0100)
Previously rspamd_upstreams_add_upstream() returned FALSE whenever
rspamd_parse_host_port_priority() could not resolve a hostname, so a
single DNS hiccup at config time would drop the upstream list and
cascade into module init failures (issue #6000 was one symptom).

Introduce RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE: when DNS fails for a
hostname-style input we now keep the parsed host and port on the
upstream, mark it pending, and let the existing async lazy-resolve
machinery retry. Pending upstreams are deliberately kept out of the
`alive` list so existing selectors (round-robin, hashed, master/slave)
and the ring/heap rotators do not need to learn a new state - they
continue to see only usable upstreams. The probe-mode fallback that
walks `ups` directly skips pending entries explicitly.

set_active() schedules a fast initial resolve (1s with jitter) for
pending upstreams; lazy_resolve_cb() backs off exponentially up to
60s while the upstream stays pending. update_addrs() handles the
empty-initial case by reading the port from the stashed
`deferred_port` field, and on success rspamd_upstream_promote_pending
clears the flag and inserts the upstream into `alive` (initialising
token-bucket state if needed) and fires the WATCH_ONLINE event.

This changes the failure mode for every consumer of upstreams (Redis,
ClamAV, ICAP, ClickHouse, ...): a misconfigured or briefly-down DNS
no longer prevents the daemon from starting, and recovers automa-
tically without a restart. Callers that pick from an alive-empty list
already had to handle nil from rspamd_upstream_get; later commits
audit and tighten the Lua callers that did not.

src/libutil/upstream.c
src/libutil/upstream.h

index a26e86e0257d7fe13d48f235d079a1d029a1b8af..3b2a62f0798d83fae6597f80dc70f75477462af3 100644 (file)
@@ -95,6 +95,12 @@ struct upstream {
        struct upstream_inet_addr_entry *new_addrs;
        gpointer data;
        char uid[8];
+       /*
+        * Port to apply to addresses returned by the first DNS resolution when
+        * the upstream was created in PENDING_RESOLVE state (no initial addrs
+        * to copy the port from). Zero otherwise.
+        */
+       uint16_t deferred_port;
        ref_entry_t ref;
 
        /* Token bucket fields for weighted load balancing */
@@ -216,6 +222,13 @@ static const double default_probe_jitter = DEFAULT_PROBE_JITTER;
 #define DEFAULT_TOKEN_BUCKET_MIN 1
 #define DEFAULT_TOKEN_BUCKET_BASE_COST 10
 
+/*
+ * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the
+ * cap for the exponential back-off used while the upstream stays pending.
+ */
+#define UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY 1.0
+#define UPSTREAM_PENDING_RESOLVE_MAX_DELAY 60.0
+
 static const struct upstream_limits default_limits = {
        .revive_time = DEFAULT_REVIVE_TIME,
        .revive_jitter = DEFAULT_REVIVE_JITTER,
@@ -299,6 +312,10 @@ void rspamd_upstreams_library_config(struct rspamd_config *cfg,
                                        /* Resolve them immediately ! */
                                        when = 0.0;
                                }
+                               else if (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) {
+                                       when = rspamd_time_jitter(UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY,
+                                                                                         UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY * .25);
+                               }
                                else {
                                        when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
                                                                                          upstream->ls->limits->lazy_resolve_time * .1);
@@ -402,31 +419,42 @@ rspamd_upstream_addr_sort_func(gconstpointer a, gconstpointer b)
 static void
 rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
 {
+       gboolean is_pending;
+
        RSPAMD_UPSTREAM_LOCK(ls);
-       g_ptr_array_add(ls->alive, upstream);
-       upstream->active_idx = ls->alive->len - 1;
 
-       /* Invalidate ring hash */
-       ls->ring_dirty = TRUE;
+       is_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0;
 
-       /* Initialize token bucket state */
-       upstream->heap_idx = UINT_MAX;
-       if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
-               upstream->max_tokens = ls->limits->token_bucket_max;
-               upstream->available_tokens = upstream->max_tokens;
-               upstream->inflight_tokens = 0;
+       if (!is_pending) {
+               g_ptr_array_add(ls->alive, upstream);
+               upstream->active_idx = ls->alive->len - 1;
 
-               /* Add to token heap if already initialized */
-               if (ls->token_bucket_initialized) {
-                       struct upstream_token_heap_entry entry;
-                       entry.pri = 0;
-                       entry.idx = 0;
-                       entry.up = upstream;
-                       rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
-                       upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
-               skip_heap:;
+               /* Invalidate ring hash */
+               ls->ring_dirty = TRUE;
+
+               /* Initialize token bucket state */
+               upstream->heap_idx = UINT_MAX;
+               if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
+                       upstream->max_tokens = ls->limits->token_bucket_max;
+                       upstream->available_tokens = upstream->max_tokens;
+                       upstream->inflight_tokens = 0;
+
+                       /* Add to token heap if already initialized */
+                       if (ls->token_bucket_initialized) {
+                               struct upstream_token_heap_entry entry;
+                               entry.pri = 0;
+                               entry.idx = 0;
+                               entry.up = upstream;
+                               rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
+                               upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
+                       skip_heap:;
+                       }
                }
        }
+       else {
+               upstream->active_idx = -1;
+               upstream->heap_idx = UINT_MAX;
+       }
 
        if (upstream->ctx && upstream->ctx->configured &&
                !((upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE) ||
@@ -443,6 +471,11 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
                        /* Resolve them immediately ! */
                        when = 0.0;
                }
+               else if (is_pending) {
+                       /* Retry quickly while we have no addresses */
+                       when = rspamd_time_jitter(UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY,
+                                                                         UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY * .25);
+               }
                else {
                        when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
                                                                          upstream->ls->limits->lazy_resolve_time * .1);
@@ -450,7 +483,8 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
                ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb,
                                          when, 0);
                upstream->ev.data = upstream;
-               msg_debug_upstream("start lazy resolving for %s in %.0f seconds",
+               msg_debug_upstream("start %s resolving for %s in %.0f seconds",
+                                                  is_pending ? "deferred" : "lazy",
                                                   upstream->name, when);
                ev_timer_start(upstream->ctx->event_loop, &upstream->ev);
        }
@@ -469,25 +503,41 @@ rspamd_upstream_addr_elt_dtor(gpointer a)
        }
 }
 
+/* Forward decl: defined a few lines below */
+static void rspamd_upstream_promote_pending(struct upstream *upstream);
+
 static void
 rspamd_upstream_update_addrs(struct upstream *upstream)
 {
        unsigned int addr_cnt, i, port;
-       gboolean seen_addr, reset_errors = FALSE;
+       gboolean seen_addr, reset_errors = FALSE, was_pending = FALSE;
        struct upstream_inet_addr_entry *cur, *tmp;
        GPtrArray *new_addrs;
        struct upstream_addr_elt *addr_elt, *naddr;
 
        /*
         * We need first of all get the saved port, since DNS gives us no
-        * idea about what port has been used previously
+        * idea about what port has been used previously. For PENDING_RESOLVE
+        * upstreams there is no prior address: use the port stashed at parse
+        * time on `upstream->deferred_port`.
         */
        RSPAMD_UPSTREAM_LOCK(upstream);
 
-       if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
+       if (upstream->new_addrs &&
+               (upstream->addrs.addr == NULL || upstream->addrs.addr->len == 0)) {
+               was_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0;
+               port = upstream->deferred_port;
+       }
+       else if (upstream->addrs.addr && upstream->addrs.addr->len > 0 &&
+                        upstream->new_addrs) {
                addr_elt = g_ptr_array_index(upstream->addrs.addr, 0);
                port = rspamd_inet_address_get_port(addr_elt->addr);
+       }
+       else {
+               port = 0;
+       }
 
+       if (upstream->new_addrs) {
                /* Now calculate new addrs count */
                addr_cnt = 0;
                LL_FOREACH(upstream->new_addrs, cur)
@@ -512,15 +562,17 @@ rspamd_upstream_update_addrs(struct upstream *upstream)
                        /* Ports are problematic, set to compare in the next block */
                        rspamd_inet_address_set_port(cur->addr, port);
 
-                       PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt)
-                       {
-                               if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) {
-                                       naddr = g_malloc0(sizeof(*naddr));
-                                       naddr->addr = cur->addr;
-                                       naddr->errors = reset_errors ? 0 : addr_elt->errors;
-                                       seen_addr = TRUE;
+                       if (upstream->addrs.addr) {
+                               PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt)
+                               {
+                                       if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) {
+                                               naddr = g_malloc0(sizeof(*naddr));
+                                               naddr->addr = cur->addr;
+                                               naddr->errors = reset_errors ? 0 : addr_elt->errors;
+                                               seen_addr = TRUE;
 
-                                       break;
+                                               break;
+                                       }
                                }
                        }
 
@@ -542,7 +594,9 @@ rspamd_upstream_update_addrs(struct upstream *upstream)
                }
 
                /* Free old addresses */
-               g_ptr_array_free(upstream->addrs.addr, TRUE);
+               if (upstream->addrs.addr) {
+                       g_ptr_array_free(upstream->addrs.addr, TRUE);
+               }
 
                upstream->addrs.cur = 0;
                upstream->addrs.addr = new_addrs;
@@ -557,6 +611,70 @@ rspamd_upstream_update_addrs(struct upstream *upstream)
 
        upstream->new_addrs = NULL;
        RSPAMD_UPSTREAM_UNLOCK(upstream);
+
+       if (was_pending && upstream->addrs.addr && upstream->addrs.addr->len > 0) {
+               rspamd_upstream_promote_pending(upstream);
+       }
+}
+
+/*
+ * Move a previously PENDING_RESOLVE upstream into the alive list now that
+ * addresses have been resolved. Mirrors the alive-side bookkeeping of
+ * rspamd_upstream_set_active without re-entering the resolution-scheduling
+ * branch (the lazy-resolve timer is already running).
+ */
+static void
+rspamd_upstream_promote_pending(struct upstream *upstream)
+{
+       struct upstream_list *ls = upstream->ls;
+       struct upstream_list_watcher *w;
+
+       if (ls == NULL) {
+               return;
+       }
+
+       RSPAMD_UPSTREAM_LOCK(ls);
+
+       if (!(upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE)) {
+               RSPAMD_UPSTREAM_UNLOCK(ls);
+               return;
+       }
+
+       upstream->flags &= ~RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE;
+
+       g_ptr_array_add(ls->alive, upstream);
+       upstream->active_idx = ls->alive->len - 1;
+       ls->ring_dirty = TRUE;
+
+       upstream->heap_idx = UINT_MAX;
+       if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
+               upstream->max_tokens = ls->limits->token_bucket_max;
+               upstream->available_tokens = upstream->max_tokens;
+               upstream->inflight_tokens = 0;
+
+               if (ls->token_bucket_initialized) {
+                       struct upstream_token_heap_entry entry;
+                       entry.pri = 0;
+                       entry.idx = 0;
+                       entry.up = upstream;
+                       rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
+                       upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
+               skip_heap:;
+               }
+       }
+
+       msg_info_upstream("resolved deferred upstream %s; promoted to alive",
+                                         upstream->name);
+
+       DL_FOREACH(ls->watchers, w)
+       {
+               if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
+                       w->func(upstream, RSPAMD_UPSTREAM_WATCH_ONLINE, upstream->errors,
+                                       w->ud);
+               }
+       }
+
+       RSPAMD_UPSTREAM_UNLOCK(ls);
 }
 
 static void
@@ -908,7 +1026,22 @@ rspamd_upstream_lazy_resolve_cb(struct ev_loop *loop, ev_timer *w, int revents)
        if (up->ls) {
                rspamd_upstream_resolve_addrs(up->ls, up);
 
-               if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
+               if (up->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) {
+                       /*
+                        * Still no addresses — back off exponentially but cap so we keep
+                        * trying every minute or so. Once update_addrs runs successfully
+                        * the flag is cleared and the next branch takes over.
+                        */
+                       double next = w->repeat * 2.0;
+                       if (next < UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY) {
+                               next = UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY;
+                       }
+                       if (next > UPSTREAM_PENDING_RESOLVE_MAX_DELAY) {
+                               next = UPSTREAM_PENDING_RESOLVE_MAX_DELAY;
+                       }
+                       w->repeat = rspamd_time_jitter(next, next * .25);
+               }
+               else if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
                        w->repeat = rspamd_time_jitter(up->ls->limits->lazy_resolve_time,
                                                                                   up->ls->limits->lazy_resolve_time * .1);
                }
@@ -1353,6 +1486,62 @@ int rspamd_upstream_port(struct upstream *up)
        return rspamd_inet_address_get_port(elt->addr);
 }
 
+/*
+ * Fallback parser used when DNS resolution fails for a hostname-style upstream.
+ * Extracts host and port from "host[:port[:priority]]" so the upstream can be
+ * created in PENDING_RESOLVE state and resolved later.
+ *
+ * Returns TRUE on a syntactically valid hostname; FALSE otherwise (in which
+ * case the upstream cannot be deferred and creation must fail).
+ */
+static gboolean
+rspamd_upstream_parse_pending_host(const char *str, char **out_host,
+                                                                  uint16_t *out_port, uint16_t def_port,
+                                                                  rspamd_mempool_t *pool)
+{
+       const char *colon, *host_end;
+       size_t hlen;
+       uint16_t port = def_port;
+
+       if (str == NULL || str[0] == '\0' || str[0] == '[' || str[0] == '/' ||
+               str[0] == '.' || str[0] == '*' || str[0] == ':') {
+               return FALSE;
+       }
+
+       colon = strchr(str, ':');
+
+       if (colon != NULL) {
+               host_end = colon;
+               if (colon[1] != '\0') {
+                       char *endptr = NULL;
+                       unsigned long pn = strtoul(colon + 1, &endptr, 10);
+                       if (pn == 0 || pn > UINT16_MAX) {
+                               return FALSE;
+                       }
+                       port = (uint16_t) pn;
+               }
+       }
+       else {
+               host_end = str + strlen(str);
+       }
+
+       hlen = host_end - str;
+       if (hlen == 0 || hlen > 253) {
+               return FALSE;
+       }
+
+       if (pool) {
+               *out_host = rspamd_mempool_alloc(pool, hlen + 1);
+       }
+       else {
+               *out_host = g_malloc(hlen + 1);
+       }
+       rspamd_strlcpy(*out_host, str, hlen + 1);
+       *out_port = port;
+
+       return TRUE;
+}
+
 gboolean
 rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str,
                                                          uint16_t def_port, enum rspamd_upstream_parse_type parse_type,
@@ -1419,6 +1608,38 @@ rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str,
                                                                                                  &upstream->name, def_port,
                                                                                                  FALSE,
                                                                                                  ups->ctx ? ups->ctx->pool : NULL);
+
+                       if (ret == RSPAMD_PARSE_ADDR_FAIL) {
+                               char *pending_host = NULL;
+                               uint16_t pending_port = 0;
+
+                               if (rspamd_upstream_parse_pending_host(str, &pending_host,
+                                                                                                          &pending_port, def_port,
+                                                                                                          ups->ctx ? ups->ctx->pool : NULL)) {
+                                       /*
+                                        * DNS failed but the input looks like a hostname.
+                                        * Create the upstream in PENDING_RESOLVE state so the
+                                        * lazy resolver can populate addresses later. The upstream
+                                        * stays out of the alive list until resolution succeeds.
+                                        */
+                                       upstream->name = pending_host;
+                                       upstream->deferred_port = pending_port;
+                                       upstream->flags |= RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE;
+                                       addrs = g_ptr_array_sized_new(0);
+                                       if (ups->ctx) {
+                                               rspamd_mempool_add_destructor(ups->ctx->pool,
+                                                                                                         (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard,
+                                                                                                         addrs);
+                                       }
+                                       ret = RSPAMD_PARSE_ADDR_RESOLVED;
+                                       rspamd_default_log_function(G_LOG_LEVEL_WARNING,
+                                                                                               "upstream", NULL, G_STRFUNC,
+                                                                                               "address resolution for %s "
+                                                                                               "failed at config time; "
+                                                                                               "deferring (will retry asynchronously)",
+                                                                                               pending_host);
+                               }
+                       }
                }
                break;
        case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
@@ -1561,8 +1782,10 @@ rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str,
                                                         upstream->uid, sizeof(upstream->uid) - 1, RSPAMD_BASE32_DEFAULT);
 
        msg_debug_upstream("added upstream %s (%s)", upstream->name,
-                                          upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name");
-       g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
+                                          upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE ? "DNS name (deferred)" : "DNS name"));
+       if (upstream->addrs.addr) {
+               g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
+       }
        rspamd_upstream_set_active(ups, upstream);
 
        return TRUE;
@@ -2069,6 +2292,14 @@ rspamd_upstream_get_common(struct upstream_list *ups,
                        if (cur->active_idx >= 0 || (except && cur == except)) {
                                continue;
                        }
+                       /*
+                        * Pending-resolve upstreams have no addresses yet — they can't
+                        * be probed. The lazy resolver will move them into `alive` once
+                        * DNS comes back.
+                        */
+                       if (cur->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) {
+                               continue;
+                       }
 
                        if (cur->next_probe_at == 0) {
                                /* Initialize probe schedule based on revive_time */
index cbebbb59a431cd6b43020b3c42ac906263b348ff..5af3a1b6326b2c93b73e2952c4fa7831010607a7 100644 (file)
@@ -43,6 +43,13 @@ enum rspamd_upstream_flag {
        RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0),
        RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1),
        RSPAMD_UPSTREAM_FLAG_DNS = (1 << 2),
+       /*
+        * Upstream was created with a hostname that could not be resolved at
+        * config-parse time. It is kept out of the `alive` list until a later
+        * lazy/forced resolution succeeds, at which point the flag is cleared
+        * and the upstream becomes selectable.
+        */
+       RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE = (1 << 3),
 };
 
 struct rspamd_config;