]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Implement backoff for upstreams revival 5603/head
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 4 Sep 2025 16:22:48 +0000 (17:22 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 4 Sep 2025 16:22:48 +0000 (17:22 +0100)
src/libserver/cfg_file.h
src/libutil/upstream.c

index 355046cac0cf68606f71c9911ada01551d1142cf..76062e9b1e3fcb72e41f316d8e55e7a9d5fc3943 100644 (file)
@@ -461,6 +461,8 @@ struct rspamd_config {
        double upstream_revive_time;              /**< revive timeout for upstreams                                             */
        double upstream_lazy_resolve_time;        /**< lazy resolve time for upstreams                                  */
        double upstream_resolve_min_interval;     /**< minimum interval for resolving attempts (60 seconds by default) */
+       double upstream_probe_max_backoff;        /**< maximum backoff for probe retries when all upstreams are down */
+       double upstream_probe_jitter;             /**< jitter coefficient applied to probe backoff scheduling */
        struct upstream_ctx *ups_ctx;             /**< upstream context                                                                 */
        struct rspamd_dns_resolver *dns_resolver; /**< dns resolver if loaded                                                           */
 
index ea6bc9ad7116576375d23993db5a471bd558906a..1dd2d9f62e759c646e7baf9054de219346cfceca 100644 (file)
@@ -62,6 +62,10 @@ struct upstream {
        ev_timer ev;
        double last_fail;
        double last_resolve;
+       /* Probe/half-open state */
+       double next_probe_at;
+       double probe_backoff;
+       unsigned int half_open_inflight;
        gpointer ud;
        enum rspamd_upstream_flag flags;
        struct upstream_list *ls;
@@ -89,6 +93,8 @@ struct upstream_limits {
        double dns_timeout;
        double lazy_resolve_time;
        double resolve_min_interval;
+       double probe_max_backoff;
+       double probe_jitter;
        unsigned int max_errors;
        unsigned int dns_retransmits;
 };
@@ -163,6 +169,10 @@ static const unsigned int default_dns_retransmits = DEFAULT_DNS_RETRANSMITS;
 static const double default_lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME;
 #define DEFAULT_RESOLVE_MIN_INTERVAL 60.0
 static const double default_resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL;
+#define DEFAULT_PROBE_MAX_BACKOFF 600.0
+static const double default_probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF;
+#define DEFAULT_PROBE_JITTER 0.3
+static const double default_probe_jitter = DEFAULT_PROBE_JITTER;
 
 static const struct upstream_limits default_limits = {
        .revive_time = DEFAULT_REVIVE_TIME,
@@ -173,6 +183,8 @@ static const struct upstream_limits default_limits = {
        .max_errors = DEFAULT_MAX_ERRORS,
        .lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME,
        .resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL,
+       .probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF,
+       .probe_jitter = DEFAULT_PROBE_JITTER,
 };
 
 static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
@@ -206,6 +218,12 @@ void rspamd_upstreams_library_config(struct rspamd_config *cfg,
        if (cfg->upstream_resolve_min_interval) {
                ctx->limits.resolve_min_interval = cfg->upstream_resolve_min_interval;
        }
+       if (cfg->upstream_probe_max_backoff) {
+               ctx->limits.probe_max_backoff = cfg->upstream_probe_max_backoff;
+       }
+       if (cfg->upstream_probe_jitter) {
+               ctx->limits.probe_jitter = cfg->upstream_probe_jitter;
+       }
 
        /* Some sanity checks */
        if (ctx->limits.resolve_min_interval > ctx->limits.revive_time) {
@@ -868,6 +886,20 @@ rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream
 
                msg_debug_upstream("mark upstream %s inactive; revive in %.0f seconds",
                                                   upstream->name, ntim);
+               /* Initialize probe scheduling */
+               if (upstream->probe_backoff <= 0) {
+                       upstream->probe_backoff = ls->limits->revive_time;
+               }
+               if (upstream->ctx && upstream->ctx->event_loop) {
+                       upstream->next_probe_at = ev_now(upstream->ctx->event_loop) +
+                                                                         rspamd_time_jitter(upstream->probe_backoff,
+                                                                                                                upstream->probe_backoff * ls->limits->probe_jitter);
+               }
+               else {
+                       double now = rspamd_get_ticks(FALSE);
+                       upstream->next_probe_at = now + rspamd_time_jitter(upstream->probe_backoff,
+                                                                                                                          upstream->probe_backoff * ls->limits->probe_jitter);
+               }
                ev_timer_init(&upstream->ev, rspamd_upstream_revive_cb, ntim, 0);
                upstream->ev.data = upstream;
 
@@ -1001,6 +1033,18 @@ void rspamd_upstream_fail(struct upstream *upstream,
                        }
                }
 
+               /* If this was a half-open probe, schedule next probe with backoff */
+               if (upstream->half_open_inflight > 0) {
+                       double now = upstream->ctx && upstream->ctx->event_loop ? ev_now(upstream->ctx->event_loop) : rspamd_get_ticks(FALSE);
+                       if (upstream->probe_backoff <= 0) {
+                               upstream->probe_backoff = upstream->ls->limits->revive_time;
+                       }
+                       upstream->probe_backoff = MIN(upstream->probe_backoff * 2.0, upstream->ls->limits->probe_max_backoff);
+                       upstream->next_probe_at = now + rspamd_time_jitter(upstream->probe_backoff,
+                                                                                                                          upstream->probe_backoff * upstream->ls->limits->probe_jitter);
+                       upstream->half_open_inflight = 0;
+               }
+
                RSPAMD_UPSTREAM_UNLOCK(upstream);
        }
 }
@@ -1011,6 +1055,18 @@ void rspamd_upstream_ok(struct upstream *upstream)
        struct upstream_list_watcher *w;
 
        RSPAMD_UPSTREAM_LOCK(upstream);
+       /* Success handling */
+       if (upstream->half_open_inflight > 0) {
+               /* Successful probe: mark alive and reset backoff */
+               upstream->half_open_inflight = 0;
+               upstream->probe_backoff = upstream->ls ? upstream->ls->limits->revive_time : default_revive_time;
+               upstream->next_probe_at = 0;
+               if (upstream->ls && upstream->active_idx == -1) {
+                       /* Activate this upstream */
+                       rspamd_upstream_set_active(upstream->ls, upstream);
+               }
+       }
+
        if (upstream->errors > 0 && upstream->active_idx != -1 && upstream->ls) {
                /* We touch upstream if and only if it is active */
                msg_debug_upstream("reset errors on upstream %s (was %ud)", upstream->name, upstream->errors);
@@ -1743,9 +1799,42 @@ rspamd_upstream_get_common(struct upstream_list *ups,
 
        RSPAMD_UPSTREAM_LOCK(ups);
        if (ups->alive->len == 0) {
-               /* No alive upstreams; do not force-revive to avoid tight retry loops */
+               /* Probe mode: find the earliest probe-ready upstream and allow one inflight */
+               double now = ups->ctx && ups->ctx->event_loop ? ev_now(ups->ctx->event_loop) : rspamd_get_ticks(FALSE);
+               struct upstream *candidate = NULL;
+               double min_probe = HUGE_VAL;
+
+               for (unsigned int i = 0; i < ups->ups->len; i++) {
+                       struct upstream *cur = g_ptr_array_index(ups->ups, i);
+                       if (cur->active_idx >= 0 || (except && cur == except)) {
+                               continue;
+                       }
+
+                       if (cur->next_probe_at == 0) {
+                               /* Initialize probe schedule based on revive_time */
+                               cur->probe_backoff = cur->probe_backoff > 0 ? cur->probe_backoff : ups->limits->revive_time;
+                               cur->next_probe_at = now + rspamd_time_jitter(cur->probe_backoff,
+                                                                                                                         cur->probe_backoff * ups->limits->probe_jitter);
+                       }
+
+                       if (cur->next_probe_at <= now && cur->half_open_inflight == 0) {
+                               candidate = cur;
+                               break;
+                       }
+
+                       if (cur->next_probe_at < min_probe) {
+                               min_probe = cur->next_probe_at;
+                       }
+               }
+
+               if (candidate) {
+                       candidate->half_open_inflight = 1; /* allow one request */
+                       up = candidate;
+               }
+
                RSPAMD_UPSTREAM_UNLOCK(ups);
-               return NULL;
+
+               return up; /* can be NULL if not ready yet */
        }
        RSPAMD_UPSTREAM_UNLOCK(ups);