From: Vsevolod Stakhov Date: Sat, 25 Apr 2026 18:42:27 +0000 (+0100) Subject: [Feature] upstream: defer DNS resolution for unreachable hosts X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=904fd62183f264dc97862ec0219aceda71c4f2b2;p=thirdparty%2Frspamd.git [Feature] upstream: defer DNS resolution for unreachable hosts Previously rspamd_upstreams_add_upstream() returned FALSE whenever rspamd_parse_host_port_priority() could not resolve a hostname, so a single DNS hiccup at config time would drop the upstream list and cascade into module init failures (issue #6000 was one symptom). Introduce RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE: when DNS fails for a hostname-style input we now keep the parsed host and port on the upstream, mark it pending, and let the existing async lazy-resolve machinery retry. Pending upstreams are deliberately kept out of the `alive` list so existing selectors (round-robin, hashed, master/slave) and the ring/heap rotators do not need to learn a new state - they continue to see only usable upstreams. The probe-mode fallback that walks `ups` directly skips pending entries explicitly. set_active() schedules a fast initial resolve (1s with jitter) for pending upstreams; lazy_resolve_cb() backs off exponentially up to 60s while the upstream stays pending. update_addrs() handles the empty-initial case by reading the port from the stashed `deferred_port` field, and on success rspamd_upstream_promote_pending clears the flag and inserts the upstream into `alive` (initialising token-bucket state if needed) and fires the WATCH_ONLINE event. This changes the failure mode for every consumer of upstreams (Redis, ClamAV, ICAP, ClickHouse, ...): a misconfigured or briefly-down DNS no longer prevents the daemon from starting, and recovers automa- tically without a restart. Callers that pick from an alive-empty list already had to handle nil from rspamd_upstream_get; later commits audit and tighten the Lua callers that did not. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index a26e86e025..3b2a62f079 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -95,6 +95,12 @@ struct upstream { struct upstream_inet_addr_entry *new_addrs; gpointer data; char uid[8]; + /* + * Port to apply to addresses returned by the first DNS resolution when + * the upstream was created in PENDING_RESOLVE state (no initial addrs + * to copy the port from). Zero otherwise. + */ + uint16_t deferred_port; ref_entry_t ref; /* Token bucket fields for weighted load balancing */ @@ -216,6 +222,13 @@ static const double default_probe_jitter = DEFAULT_PROBE_JITTER; #define DEFAULT_TOKEN_BUCKET_MIN 1 #define DEFAULT_TOKEN_BUCKET_BASE_COST 10 +/* + * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the + * cap for the exponential back-off used while the upstream stays pending. + */ +#define UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY 1.0 +#define UPSTREAM_PENDING_RESOLVE_MAX_DELAY 60.0 + static const struct upstream_limits default_limits = { .revive_time = DEFAULT_REVIVE_TIME, .revive_jitter = DEFAULT_REVIVE_JITTER, @@ -299,6 +312,10 @@ void rspamd_upstreams_library_config(struct rspamd_config *cfg, /* Resolve them immediately ! */ when = 0.0; } + else if (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) { + when = rspamd_time_jitter(UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY, + UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY * .25); + } else { when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time, upstream->ls->limits->lazy_resolve_time * .1); @@ -402,31 +419,42 @@ rspamd_upstream_addr_sort_func(gconstpointer a, gconstpointer b) static void rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream) { + gboolean is_pending; + RSPAMD_UPSTREAM_LOCK(ls); - g_ptr_array_add(ls->alive, upstream); - upstream->active_idx = ls->alive->len - 1; - /* Invalidate ring hash */ - ls->ring_dirty = TRUE; + is_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0; - /* Initialize token bucket state */ - upstream->heap_idx = UINT_MAX; - if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) { - upstream->max_tokens = ls->limits->token_bucket_max; - upstream->available_tokens = upstream->max_tokens; - upstream->inflight_tokens = 0; + if (!is_pending) { + g_ptr_array_add(ls->alive, upstream); + upstream->active_idx = ls->alive->len - 1; - /* Add to token heap if already initialized */ - if (ls->token_bucket_initialized) { - struct upstream_token_heap_entry entry; - entry.pri = 0; - entry.idx = 0; - entry.up = upstream; - rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap); - upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1; - skip_heap:; + /* Invalidate ring hash */ + ls->ring_dirty = TRUE; + + /* Initialize token bucket state */ + upstream->heap_idx = UINT_MAX; + if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) { + upstream->max_tokens = ls->limits->token_bucket_max; + upstream->available_tokens = upstream->max_tokens; + upstream->inflight_tokens = 0; + + /* Add to token heap if already initialized */ + if (ls->token_bucket_initialized) { + struct upstream_token_heap_entry entry; + entry.pri = 0; + entry.idx = 0; + entry.up = upstream; + rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap); + upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1; + skip_heap:; + } } } + else { + upstream->active_idx = -1; + upstream->heap_idx = UINT_MAX; + } if (upstream->ctx && upstream->ctx->configured && !((upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE) || @@ -443,6 +471,11 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream) /* Resolve them immediately ! */ when = 0.0; } + else if (is_pending) { + /* Retry quickly while we have no addresses */ + when = rspamd_time_jitter(UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY, + UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY * .25); + } else { when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time, upstream->ls->limits->lazy_resolve_time * .1); @@ -450,7 +483,8 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream) ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb, when, 0); upstream->ev.data = upstream; - msg_debug_upstream("start lazy resolving for %s in %.0f seconds", + msg_debug_upstream("start %s resolving for %s in %.0f seconds", + is_pending ? "deferred" : "lazy", upstream->name, when); ev_timer_start(upstream->ctx->event_loop, &upstream->ev); } @@ -469,25 +503,41 @@ rspamd_upstream_addr_elt_dtor(gpointer a) } } +/* Forward decl: defined a few lines below */ +static void rspamd_upstream_promote_pending(struct upstream *upstream); + static void rspamd_upstream_update_addrs(struct upstream *upstream) { unsigned int addr_cnt, i, port; - gboolean seen_addr, reset_errors = FALSE; + gboolean seen_addr, reset_errors = FALSE, was_pending = FALSE; struct upstream_inet_addr_entry *cur, *tmp; GPtrArray *new_addrs; struct upstream_addr_elt *addr_elt, *naddr; /* * We need first of all get the saved port, since DNS gives us no - * idea about what port has been used previously + * idea about what port has been used previously. For PENDING_RESOLVE + * upstreams there is no prior address: use the port stashed at parse + * time on `upstream->deferred_port`. */ RSPAMD_UPSTREAM_LOCK(upstream); - if (upstream->addrs.addr->len > 0 && upstream->new_addrs) { + if (upstream->new_addrs && + (upstream->addrs.addr == NULL || upstream->addrs.addr->len == 0)) { + was_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0; + port = upstream->deferred_port; + } + else if (upstream->addrs.addr && upstream->addrs.addr->len > 0 && + upstream->new_addrs) { addr_elt = g_ptr_array_index(upstream->addrs.addr, 0); port = rspamd_inet_address_get_port(addr_elt->addr); + } + else { + port = 0; + } + if (upstream->new_addrs) { /* Now calculate new addrs count */ addr_cnt = 0; LL_FOREACH(upstream->new_addrs, cur) @@ -512,15 +562,17 @@ rspamd_upstream_update_addrs(struct upstream *upstream) /* Ports are problematic, set to compare in the next block */ rspamd_inet_address_set_port(cur->addr, port); - PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt) - { - if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) { - naddr = g_malloc0(sizeof(*naddr)); - naddr->addr = cur->addr; - naddr->errors = reset_errors ? 0 : addr_elt->errors; - seen_addr = TRUE; + if (upstream->addrs.addr) { + PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt) + { + if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) { + naddr = g_malloc0(sizeof(*naddr)); + naddr->addr = cur->addr; + naddr->errors = reset_errors ? 0 : addr_elt->errors; + seen_addr = TRUE; - break; + break; + } } } @@ -542,7 +594,9 @@ rspamd_upstream_update_addrs(struct upstream *upstream) } /* Free old addresses */ - g_ptr_array_free(upstream->addrs.addr, TRUE); + if (upstream->addrs.addr) { + g_ptr_array_free(upstream->addrs.addr, TRUE); + } upstream->addrs.cur = 0; upstream->addrs.addr = new_addrs; @@ -557,6 +611,70 @@ rspamd_upstream_update_addrs(struct upstream *upstream) upstream->new_addrs = NULL; RSPAMD_UPSTREAM_UNLOCK(upstream); + + if (was_pending && upstream->addrs.addr && upstream->addrs.addr->len > 0) { + rspamd_upstream_promote_pending(upstream); + } +} + +/* + * Move a previously PENDING_RESOLVE upstream into the alive list now that + * addresses have been resolved. Mirrors the alive-side bookkeeping of + * rspamd_upstream_set_active without re-entering the resolution-scheduling + * branch (the lazy-resolve timer is already running). + */ +static void +rspamd_upstream_promote_pending(struct upstream *upstream) +{ + struct upstream_list *ls = upstream->ls; + struct upstream_list_watcher *w; + + if (ls == NULL) { + return; + } + + RSPAMD_UPSTREAM_LOCK(ls); + + if (!(upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE)) { + RSPAMD_UPSTREAM_UNLOCK(ls); + return; + } + + upstream->flags &= ~RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE; + + g_ptr_array_add(ls->alive, upstream); + upstream->active_idx = ls->alive->len - 1; + ls->ring_dirty = TRUE; + + upstream->heap_idx = UINT_MAX; + if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) { + upstream->max_tokens = ls->limits->token_bucket_max; + upstream->available_tokens = upstream->max_tokens; + upstream->inflight_tokens = 0; + + if (ls->token_bucket_initialized) { + struct upstream_token_heap_entry entry; + entry.pri = 0; + entry.idx = 0; + entry.up = upstream; + rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap); + upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1; + skip_heap:; + } + } + + msg_info_upstream("resolved deferred upstream %s; promoted to alive", + upstream->name); + + DL_FOREACH(ls->watchers, w) + { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) { + w->func(upstream, RSPAMD_UPSTREAM_WATCH_ONLINE, upstream->errors, + w->ud); + } + } + + RSPAMD_UPSTREAM_UNLOCK(ls); } static void @@ -908,7 +1026,22 @@ rspamd_upstream_lazy_resolve_cb(struct ev_loop *loop, ev_timer *w, int revents) if (up->ls) { rspamd_upstream_resolve_addrs(up->ls, up); - if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) { + if (up->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) { + /* + * Still no addresses — back off exponentially but cap so we keep + * trying every minute or so. Once update_addrs runs successfully + * the flag is cleared and the next branch takes over. + */ + double next = w->repeat * 2.0; + if (next < UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY) { + next = UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY; + } + if (next > UPSTREAM_PENDING_RESOLVE_MAX_DELAY) { + next = UPSTREAM_PENDING_RESOLVE_MAX_DELAY; + } + w->repeat = rspamd_time_jitter(next, next * .25); + } + else if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) { w->repeat = rspamd_time_jitter(up->ls->limits->lazy_resolve_time, up->ls->limits->lazy_resolve_time * .1); } @@ -1353,6 +1486,62 @@ int rspamd_upstream_port(struct upstream *up) return rspamd_inet_address_get_port(elt->addr); } +/* + * Fallback parser used when DNS resolution fails for a hostname-style upstream. + * Extracts host and port from "host[:port[:priority]]" so the upstream can be + * created in PENDING_RESOLVE state and resolved later. + * + * Returns TRUE on a syntactically valid hostname; FALSE otherwise (in which + * case the upstream cannot be deferred and creation must fail). + */ +static gboolean +rspamd_upstream_parse_pending_host(const char *str, char **out_host, + uint16_t *out_port, uint16_t def_port, + rspamd_mempool_t *pool) +{ + const char *colon, *host_end; + size_t hlen; + uint16_t port = def_port; + + if (str == NULL || str[0] == '\0' || str[0] == '[' || str[0] == '/' || + str[0] == '.' || str[0] == '*' || str[0] == ':') { + return FALSE; + } + + colon = strchr(str, ':'); + + if (colon != NULL) { + host_end = colon; + if (colon[1] != '\0') { + char *endptr = NULL; + unsigned long pn = strtoul(colon + 1, &endptr, 10); + if (pn == 0 || pn > UINT16_MAX) { + return FALSE; + } + port = (uint16_t) pn; + } + } + else { + host_end = str + strlen(str); + } + + hlen = host_end - str; + if (hlen == 0 || hlen > 253) { + return FALSE; + } + + if (pool) { + *out_host = rspamd_mempool_alloc(pool, hlen + 1); + } + else { + *out_host = g_malloc(hlen + 1); + } + rspamd_strlcpy(*out_host, str, hlen + 1); + *out_port = port; + + return TRUE; +} + gboolean rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str, uint16_t def_port, enum rspamd_upstream_parse_type parse_type, @@ -1419,6 +1608,38 @@ rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str, &upstream->name, def_port, FALSE, ups->ctx ? ups->ctx->pool : NULL); + + if (ret == RSPAMD_PARSE_ADDR_FAIL) { + char *pending_host = NULL; + uint16_t pending_port = 0; + + if (rspamd_upstream_parse_pending_host(str, &pending_host, + &pending_port, def_port, + ups->ctx ? ups->ctx->pool : NULL)) { + /* + * DNS failed but the input looks like a hostname. + * Create the upstream in PENDING_RESOLVE state so the + * lazy resolver can populate addresses later. The upstream + * stays out of the alive list until resolution succeeds. + */ + upstream->name = pending_host; + upstream->deferred_port = pending_port; + upstream->flags |= RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE; + addrs = g_ptr_array_sized_new(0); + if (ups->ctx) { + rspamd_mempool_add_destructor(ups->ctx->pool, + (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard, + addrs); + } + ret = RSPAMD_PARSE_ADDR_RESOLVED; + rspamd_default_log_function(G_LOG_LEVEL_WARNING, + "upstream", NULL, G_STRFUNC, + "address resolution for %s " + "failed at config time; " + "deferring (will retry asynchronously)", + pending_host); + } + } } break; case RSPAMD_UPSTREAM_PARSE_NAMESERVER: @@ -1561,8 +1782,10 @@ rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str, upstream->uid, sizeof(upstream->uid) - 1, RSPAMD_BASE32_DEFAULT); msg_debug_upstream("added upstream %s (%s)", upstream->name, - upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name"); - g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func); + upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE ? "DNS name (deferred)" : "DNS name")); + if (upstream->addrs.addr) { + g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func); + } rspamd_upstream_set_active(ups, upstream); return TRUE; @@ -2069,6 +2292,14 @@ rspamd_upstream_get_common(struct upstream_list *ups, if (cur->active_idx >= 0 || (except && cur == except)) { continue; } + /* + * Pending-resolve upstreams have no addresses yet — they can't + * be probed. The lazy resolver will move them into `alive` once + * DNS comes back. + */ + if (cur->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) { + continue; + } if (cur->next_probe_at == 0) { /* Initialize probe schedule based on revive_time */ diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index cbebbb59a4..5af3a1b632 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -43,6 +43,13 @@ enum rspamd_upstream_flag { RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0), RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1), RSPAMD_UPSTREAM_FLAG_DNS = (1 << 2), + /* + * Upstream was created with a hostname that could not be resolved at + * config-parse time. It is kept out of the `alive` list until a later + * lazy/forced resolution succeeds, at which point the flag is cleared + * and the upstream becomes selectable. + */ + RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE = (1 << 3), }; struct rspamd_config;