From: Vsevolod Stakhov Date: Thu, 4 Sep 2025 16:22:48 +0000 (+0100) Subject: [Project] Implement backoff for upstreams revival X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F5603%2Fhead;p=thirdparty%2Frspamd.git [Project] Implement backoff for upstreams revival --- diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 355046cac0..76062e9b1e 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -461,6 +461,8 @@ struct rspamd_config { double upstream_revive_time; /**< revive timeout for upstreams */ double upstream_lazy_resolve_time; /**< lazy resolve time for upstreams */ double upstream_resolve_min_interval; /**< minimum interval for resolving attempts (60 seconds by default) */ + double upstream_probe_max_backoff; /**< maximum backoff for probe retries when all upstreams are down */ + double upstream_probe_jitter; /**< jitter coefficient applied to probe backoff scheduling */ struct upstream_ctx *ups_ctx; /**< upstream context */ struct rspamd_dns_resolver *dns_resolver; /**< dns resolver if loaded */ diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index ea6bc9ad71..1dd2d9f62e 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -62,6 +62,10 @@ struct upstream { ev_timer ev; double last_fail; double last_resolve; + /* Probe/half-open state */ + double next_probe_at; + double probe_backoff; + unsigned int half_open_inflight; gpointer ud; enum rspamd_upstream_flag flags; struct upstream_list *ls; @@ -89,6 +93,8 @@ struct upstream_limits { double dns_timeout; double lazy_resolve_time; double resolve_min_interval; + double probe_max_backoff; + double probe_jitter; unsigned int max_errors; unsigned int dns_retransmits; }; @@ -163,6 +169,10 @@ static const unsigned int default_dns_retransmits = DEFAULT_DNS_RETRANSMITS; static const double default_lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME; #define DEFAULT_RESOLVE_MIN_INTERVAL 60.0 static const double default_resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL; +#define DEFAULT_PROBE_MAX_BACKOFF 600.0 +static const double default_probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF; +#define DEFAULT_PROBE_JITTER 0.3 +static const double default_probe_jitter = DEFAULT_PROBE_JITTER; static const struct upstream_limits default_limits = { .revive_time = DEFAULT_REVIVE_TIME, @@ -173,6 +183,8 @@ static const struct upstream_limits default_limits = { .max_errors = DEFAULT_MAX_ERRORS, .lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME, .resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL, + .probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF, + .probe_jitter = DEFAULT_PROBE_JITTER, }; static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int); @@ -206,6 +218,12 @@ void rspamd_upstreams_library_config(struct rspamd_config *cfg, if (cfg->upstream_resolve_min_interval) { ctx->limits.resolve_min_interval = cfg->upstream_resolve_min_interval; } + if (cfg->upstream_probe_max_backoff) { + ctx->limits.probe_max_backoff = cfg->upstream_probe_max_backoff; + } + if (cfg->upstream_probe_jitter) { + ctx->limits.probe_jitter = cfg->upstream_probe_jitter; + } /* Some sanity checks */ if (ctx->limits.resolve_min_interval > ctx->limits.revive_time) { @@ -868,6 +886,20 @@ rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream msg_debug_upstream("mark upstream %s inactive; revive in %.0f seconds", upstream->name, ntim); + /* Initialize probe scheduling */ + if (upstream->probe_backoff <= 0) { + upstream->probe_backoff = ls->limits->revive_time; + } + if (upstream->ctx && upstream->ctx->event_loop) { + upstream->next_probe_at = ev_now(upstream->ctx->event_loop) + + rspamd_time_jitter(upstream->probe_backoff, + upstream->probe_backoff * ls->limits->probe_jitter); + } + else { + double now = rspamd_get_ticks(FALSE); + upstream->next_probe_at = now + rspamd_time_jitter(upstream->probe_backoff, + upstream->probe_backoff * ls->limits->probe_jitter); + } ev_timer_init(&upstream->ev, rspamd_upstream_revive_cb, ntim, 0); upstream->ev.data = upstream; @@ -1001,6 +1033,18 @@ void rspamd_upstream_fail(struct upstream *upstream, } } + /* If this was a half-open probe, schedule next probe with backoff */ + if (upstream->half_open_inflight > 0) { + double now = upstream->ctx && upstream->ctx->event_loop ? ev_now(upstream->ctx->event_loop) : rspamd_get_ticks(FALSE); + if (upstream->probe_backoff <= 0) { + upstream->probe_backoff = upstream->ls->limits->revive_time; + } + upstream->probe_backoff = MIN(upstream->probe_backoff * 2.0, upstream->ls->limits->probe_max_backoff); + upstream->next_probe_at = now + rspamd_time_jitter(upstream->probe_backoff, + upstream->probe_backoff * upstream->ls->limits->probe_jitter); + upstream->half_open_inflight = 0; + } + RSPAMD_UPSTREAM_UNLOCK(upstream); } } @@ -1011,6 +1055,18 @@ void rspamd_upstream_ok(struct upstream *upstream) struct upstream_list_watcher *w; RSPAMD_UPSTREAM_LOCK(upstream); + /* Success handling */ + if (upstream->half_open_inflight > 0) { + /* Successful probe: mark alive and reset backoff */ + upstream->half_open_inflight = 0; + upstream->probe_backoff = upstream->ls ? upstream->ls->limits->revive_time : default_revive_time; + upstream->next_probe_at = 0; + if (upstream->ls && upstream->active_idx == -1) { + /* Activate this upstream */ + rspamd_upstream_set_active(upstream->ls, upstream); + } + } + if (upstream->errors > 0 && upstream->active_idx != -1 && upstream->ls) { /* We touch upstream if and only if it is active */ msg_debug_upstream("reset errors on upstream %s (was %ud)", upstream->name, upstream->errors); @@ -1743,9 +1799,42 @@ rspamd_upstream_get_common(struct upstream_list *ups, RSPAMD_UPSTREAM_LOCK(ups); if (ups->alive->len == 0) { - /* No alive upstreams; do not force-revive to avoid tight retry loops */ + /* Probe mode: find the earliest probe-ready upstream and allow one inflight */ + double now = ups->ctx && ups->ctx->event_loop ? ev_now(ups->ctx->event_loop) : rspamd_get_ticks(FALSE); + struct upstream *candidate = NULL; + double min_probe = HUGE_VAL; + + for (unsigned int i = 0; i < ups->ups->len; i++) { + struct upstream *cur = g_ptr_array_index(ups->ups, i); + if (cur->active_idx >= 0 || (except && cur == except)) { + continue; + } + + if (cur->next_probe_at == 0) { + /* Initialize probe schedule based on revive_time */ + cur->probe_backoff = cur->probe_backoff > 0 ? cur->probe_backoff : ups->limits->revive_time; + cur->next_probe_at = now + rspamd_time_jitter(cur->probe_backoff, + cur->probe_backoff * ups->limits->probe_jitter); + } + + if (cur->next_probe_at <= now && cur->half_open_inflight == 0) { + candidate = cur; + break; + } + + if (cur->next_probe_at < min_probe) { + min_probe = cur->next_probe_at; + } + } + + if (candidate) { + candidate->half_open_inflight = 1; /* allow one request */ + up = candidate; + } + RSPAMD_UPSTREAM_UNLOCK(ups); - return NULL; + + return up; /* can be NULL if not ready yet */ } RSPAMD_UPSTREAM_UNLOCK(ups);