struct upstream_inet_addr_entry *new_addrs;
gpointer data;
char uid[8];
+ /*
+ * Port to apply to addresses returned by the first DNS resolution when
+ * the upstream was created in PENDING_RESOLVE state (no initial addrs
+ * to copy the port from). Zero otherwise.
+ */
+ uint16_t deferred_port;
ref_entry_t ref;
/* Token bucket fields for weighted load balancing */
#define DEFAULT_TOKEN_BUCKET_MIN 1
#define DEFAULT_TOKEN_BUCKET_BASE_COST 10
+/*
+ * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the
+ * cap for the exponential back-off used while the upstream stays pending.
+ */
+#define UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY 1.0
+#define UPSTREAM_PENDING_RESOLVE_MAX_DELAY 60.0
+
static const struct upstream_limits default_limits = {
.revive_time = DEFAULT_REVIVE_TIME,
.revive_jitter = DEFAULT_REVIVE_JITTER,
/* Resolve them immediately ! */
when = 0.0;
}
+ else if (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) {
+ when = rspamd_time_jitter(UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY,
+ UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY * .25);
+ }
else {
when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
upstream->ls->limits->lazy_resolve_time * .1);
static void
rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
{
+ gboolean is_pending;
+
RSPAMD_UPSTREAM_LOCK(ls);
- g_ptr_array_add(ls->alive, upstream);
- upstream->active_idx = ls->alive->len - 1;
- /* Invalidate ring hash */
- ls->ring_dirty = TRUE;
+ is_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0;
- /* Initialize token bucket state */
- upstream->heap_idx = UINT_MAX;
- if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
- upstream->max_tokens = ls->limits->token_bucket_max;
- upstream->available_tokens = upstream->max_tokens;
- upstream->inflight_tokens = 0;
+ if (!is_pending) {
+ g_ptr_array_add(ls->alive, upstream);
+ upstream->active_idx = ls->alive->len - 1;
- /* Add to token heap if already initialized */
- if (ls->token_bucket_initialized) {
- struct upstream_token_heap_entry entry;
- entry.pri = 0;
- entry.idx = 0;
- entry.up = upstream;
- rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
- upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
- skip_heap:;
+ /* Invalidate ring hash */
+ ls->ring_dirty = TRUE;
+
+ /* Initialize token bucket state */
+ upstream->heap_idx = UINT_MAX;
+ if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
+ upstream->max_tokens = ls->limits->token_bucket_max;
+ upstream->available_tokens = upstream->max_tokens;
+ upstream->inflight_tokens = 0;
+
+ /* Add to token heap if already initialized */
+ if (ls->token_bucket_initialized) {
+ struct upstream_token_heap_entry entry;
+ entry.pri = 0;
+ entry.idx = 0;
+ entry.up = upstream;
+ rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
+ upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
+ skip_heap:;
+ }
}
}
+ else {
+ upstream->active_idx = -1;
+ upstream->heap_idx = UINT_MAX;
+ }
if (upstream->ctx && upstream->ctx->configured &&
!((upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE) ||
/* Resolve them immediately ! */
when = 0.0;
}
+ else if (is_pending) {
+ /* Retry quickly while we have no addresses */
+ when = rspamd_time_jitter(UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY,
+ UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY * .25);
+ }
else {
when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
upstream->ls->limits->lazy_resolve_time * .1);
ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb,
when, 0);
upstream->ev.data = upstream;
- msg_debug_upstream("start lazy resolving for %s in %.0f seconds",
+ msg_debug_upstream("start %s resolving for %s in %.0f seconds",
+ is_pending ? "deferred" : "lazy",
upstream->name, when);
ev_timer_start(upstream->ctx->event_loop, &upstream->ev);
}
}
}
+/* Forward decl: defined a few lines below */
+static void rspamd_upstream_promote_pending(struct upstream *upstream);
+
static void
rspamd_upstream_update_addrs(struct upstream *upstream)
{
unsigned int addr_cnt, i, port;
- gboolean seen_addr, reset_errors = FALSE;
+ gboolean seen_addr, reset_errors = FALSE, was_pending = FALSE;
struct upstream_inet_addr_entry *cur, *tmp;
GPtrArray *new_addrs;
struct upstream_addr_elt *addr_elt, *naddr;
/*
* We need first of all get the saved port, since DNS gives us no
- * idea about what port has been used previously
+ * idea about what port has been used previously. For PENDING_RESOLVE
+ * upstreams there is no prior address: use the port stashed at parse
+ * time on `upstream->deferred_port`.
*/
RSPAMD_UPSTREAM_LOCK(upstream);
- if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
+ if (upstream->new_addrs &&
+ (upstream->addrs.addr == NULL || upstream->addrs.addr->len == 0)) {
+ was_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0;
+ port = upstream->deferred_port;
+ }
+ else if (upstream->addrs.addr && upstream->addrs.addr->len > 0 &&
+ upstream->new_addrs) {
addr_elt = g_ptr_array_index(upstream->addrs.addr, 0);
port = rspamd_inet_address_get_port(addr_elt->addr);
+ }
+ else {
+ port = 0;
+ }
+ if (upstream->new_addrs) {
/* Now calculate new addrs count */
addr_cnt = 0;
LL_FOREACH(upstream->new_addrs, cur)
/* Ports are problematic, set to compare in the next block */
rspamd_inet_address_set_port(cur->addr, port);
- PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt)
- {
- if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) {
- naddr = g_malloc0(sizeof(*naddr));
- naddr->addr = cur->addr;
- naddr->errors = reset_errors ? 0 : addr_elt->errors;
- seen_addr = TRUE;
+ if (upstream->addrs.addr) {
+ PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt)
+ {
+ if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) {
+ naddr = g_malloc0(sizeof(*naddr));
+ naddr->addr = cur->addr;
+ naddr->errors = reset_errors ? 0 : addr_elt->errors;
+ seen_addr = TRUE;
- break;
+ break;
+ }
}
}
}
/* Free old addresses */
- g_ptr_array_free(upstream->addrs.addr, TRUE);
+ if (upstream->addrs.addr) {
+ g_ptr_array_free(upstream->addrs.addr, TRUE);
+ }
upstream->addrs.cur = 0;
upstream->addrs.addr = new_addrs;
upstream->new_addrs = NULL;
RSPAMD_UPSTREAM_UNLOCK(upstream);
+
+ if (was_pending && upstream->addrs.addr && upstream->addrs.addr->len > 0) {
+ rspamd_upstream_promote_pending(upstream);
+ }
+}
+
+/*
+ * Move a previously PENDING_RESOLVE upstream into the alive list now that
+ * addresses have been resolved. Mirrors the alive-side bookkeeping of
+ * rspamd_upstream_set_active without re-entering the resolution-scheduling
+ * branch (the lazy-resolve timer is already running).
+ */
+static void
+rspamd_upstream_promote_pending(struct upstream *upstream)
+{
+ struct upstream_list *ls = upstream->ls;
+ struct upstream_list_watcher *w;
+
+ if (ls == NULL) {
+ return;
+ }
+
+ RSPAMD_UPSTREAM_LOCK(ls);
+
+ if (!(upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE)) {
+ RSPAMD_UPSTREAM_UNLOCK(ls);
+ return;
+ }
+
+ upstream->flags &= ~RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE;
+
+ g_ptr_array_add(ls->alive, upstream);
+ upstream->active_idx = ls->alive->len - 1;
+ ls->ring_dirty = TRUE;
+
+ upstream->heap_idx = UINT_MAX;
+ if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
+ upstream->max_tokens = ls->limits->token_bucket_max;
+ upstream->available_tokens = upstream->max_tokens;
+ upstream->inflight_tokens = 0;
+
+ if (ls->token_bucket_initialized) {
+ struct upstream_token_heap_entry entry;
+ entry.pri = 0;
+ entry.idx = 0;
+ entry.up = upstream;
+ rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
+ upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
+ skip_heap:;
+ }
+ }
+
+ msg_info_upstream("resolved deferred upstream %s; promoted to alive",
+ upstream->name);
+
+ DL_FOREACH(ls->watchers, w)
+ {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
+ w->func(upstream, RSPAMD_UPSTREAM_WATCH_ONLINE, upstream->errors,
+ w->ud);
+ }
+ }
+
+ RSPAMD_UPSTREAM_UNLOCK(ls);
}
static void
if (up->ls) {
rspamd_upstream_resolve_addrs(up->ls, up);
- if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
+ if (up->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) {
+ /*
+ * Still no addresses — back off exponentially but cap so we keep
+ * trying every minute or so. Once update_addrs runs successfully
+ * the flag is cleared and the next branch takes over.
+ */
+ double next = w->repeat * 2.0;
+ if (next < UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY) {
+ next = UPSTREAM_PENDING_RESOLVE_INITIAL_DELAY;
+ }
+ if (next > UPSTREAM_PENDING_RESOLVE_MAX_DELAY) {
+ next = UPSTREAM_PENDING_RESOLVE_MAX_DELAY;
+ }
+ w->repeat = rspamd_time_jitter(next, next * .25);
+ }
+ else if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
w->repeat = rspamd_time_jitter(up->ls->limits->lazy_resolve_time,
up->ls->limits->lazy_resolve_time * .1);
}
return rspamd_inet_address_get_port(elt->addr);
}
+/*
+ * Fallback parser used when DNS resolution fails for a hostname-style upstream.
+ * Extracts host and port from "host[:port[:priority]]" so the upstream can be
+ * created in PENDING_RESOLVE state and resolved later.
+ *
+ * Returns TRUE on a syntactically valid hostname; FALSE otherwise (in which
+ * case the upstream cannot be deferred and creation must fail).
+ */
+static gboolean
+rspamd_upstream_parse_pending_host(const char *str, char **out_host,
+ uint16_t *out_port, uint16_t def_port,
+ rspamd_mempool_t *pool)
+{
+ const char *colon, *host_end;
+ size_t hlen;
+ uint16_t port = def_port;
+
+ if (str == NULL || str[0] == '\0' || str[0] == '[' || str[0] == '/' ||
+ str[0] == '.' || str[0] == '*' || str[0] == ':') {
+ return FALSE;
+ }
+
+ colon = strchr(str, ':');
+
+ if (colon != NULL) {
+ host_end = colon;
+ if (colon[1] != '\0') {
+ char *endptr = NULL;
+ unsigned long pn = strtoul(colon + 1, &endptr, 10);
+ if (pn == 0 || pn > UINT16_MAX) {
+ return FALSE;
+ }
+ port = (uint16_t) pn;
+ }
+ }
+ else {
+ host_end = str + strlen(str);
+ }
+
+ hlen = host_end - str;
+ if (hlen == 0 || hlen > 253) {
+ return FALSE;
+ }
+
+ if (pool) {
+ *out_host = rspamd_mempool_alloc(pool, hlen + 1);
+ }
+ else {
+ *out_host = g_malloc(hlen + 1);
+ }
+ rspamd_strlcpy(*out_host, str, hlen + 1);
+ *out_port = port;
+
+ return TRUE;
+}
+
gboolean
rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str,
uint16_t def_port, enum rspamd_upstream_parse_type parse_type,
&upstream->name, def_port,
FALSE,
ups->ctx ? ups->ctx->pool : NULL);
+
+ if (ret == RSPAMD_PARSE_ADDR_FAIL) {
+ char *pending_host = NULL;
+ uint16_t pending_port = 0;
+
+ if (rspamd_upstream_parse_pending_host(str, &pending_host,
+ &pending_port, def_port,
+ ups->ctx ? ups->ctx->pool : NULL)) {
+ /*
+ * DNS failed but the input looks like a hostname.
+ * Create the upstream in PENDING_RESOLVE state so the
+ * lazy resolver can populate addresses later. The upstream
+ * stays out of the alive list until resolution succeeds.
+ */
+ upstream->name = pending_host;
+ upstream->deferred_port = pending_port;
+ upstream->flags |= RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE;
+ addrs = g_ptr_array_sized_new(0);
+ if (ups->ctx) {
+ rspamd_mempool_add_destructor(ups->ctx->pool,
+ (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard,
+ addrs);
+ }
+ ret = RSPAMD_PARSE_ADDR_RESOLVED;
+ rspamd_default_log_function(G_LOG_LEVEL_WARNING,
+ "upstream", NULL, G_STRFUNC,
+ "address resolution for %s "
+ "failed at config time; "
+ "deferring (will retry asynchronously)",
+ pending_host);
+ }
+ }
}
break;
case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
upstream->uid, sizeof(upstream->uid) - 1, RSPAMD_BASE32_DEFAULT);
msg_debug_upstream("added upstream %s (%s)", upstream->name,
- upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name");
- g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
+ upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE ? "DNS name (deferred)" : "DNS name"));
+ if (upstream->addrs.addr) {
+ g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
+ }
rspamd_upstream_set_active(ups, upstream);
return TRUE;
if (cur->active_idx >= 0 || (except && cur == except)) {
continue;
}
+ /*
+ * Pending-resolve upstreams have no addresses yet — they can't
+ * be probed. The lazy resolver will move them into `alive` once
+ * DNS comes back.
+ */
+ if (cur->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) {
+ continue;
+ }
if (cur->next_probe_at == 0) {
/* Initialize probe schedule based on revive_time */