]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] upstream: linear slow start on revive
authorVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:37:37 +0000 (09:37 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:37:37 +0000 (09:37 +0100)
Newly revived upstreams previously rejoined the alive list at full
weight, producing a thundering herd that would land on a backend that
just came back up and was still warming caches/connection pools — the
same backend that had been failing minutes before. This often caused
immediate re-failure and a flap loop.

Add an opt-in slow_start_ms window (default 0 = disabled) configurable
via rspamd_upstreams_set_slow_start. While the window is open, both
round-robin (effective weight = weight * factor) and P2C (effective
load score = base / factor + warmup penalty) bias selection away from
the warming upstream linearly over time.

Hashed (Ketama) intentionally not integrated: scaling vnode counts
during the window would defeat the consistency property that hashed
selection exists for. Token bucket likewise unaffected — its
inflight-based fairness already handles cold buckets gracefully.

revived_at is set in the two real revive paths: the timer-based
revive_cb and the half-open probe success path in ok(). The initial
add_upstream activation is left unmarked so cold starts after a
config reload aren't artificially throttled.

src/libutil/upstream.c
src/libutil/upstream.h
test/rspamd_cxx_unit.cxx
test/rspamd_cxx_unit_upstream_slow_start.hxx [new file with mode: 0644]

index f45a8cc73c1390c03d1048c7c65c9f8529c01807..59a959068113d39cbff5f260f6fb30bf0117dcaf 100644 (file)
@@ -77,6 +77,13 @@ struct upstream {
        double next_probe_at;
        double probe_backoff;
        unsigned int half_open_inflight;
+       /*
+        * Wall time (ev_now/ticks) of the most recent revive. Zero when the
+        * upstream is in steady state. While non-zero and within the configured
+        * slow_start window, selection scales the upstream's effective weight
+        * up linearly from 0 to 1.
+        */
+       double revived_at;
        gpointer ud;
        enum rspamd_upstream_flag flags;
        struct upstream_list *ls;
@@ -127,6 +134,14 @@ struct upstream_limits {
        gsize token_bucket_min;          /* Min tokens for selection (default: 1) */
        gsize token_bucket_base_cost;    /* Base cost per request (default: 10) */
        gsize token_bucket_refill_per_s; /* Lazy refill rate (default: max/60) */
+
+       /*
+        * Slow start window (milliseconds). When non-zero, a freshly revived
+        * upstream's effective weight ramps linearly from 0 to its configured
+        * weight over this window, smoothing the thundering herd that otherwise
+        * lands on the just-revived backend. Default 0 (disabled).
+        */
+       unsigned int slow_start_ms;
 };
 
 struct upstream_list {
@@ -836,6 +851,8 @@ rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents)
        msg_debug_upstream("revive upstream %s", upstream->name);
 
        if (upstream->ls) {
+               /* Mark the time so selection paths can apply slow-start ramping. */
+               upstream->revived_at = ev_now(loop);
                rspamd_upstream_set_active(upstream->ls, upstream);
        }
 
@@ -1279,7 +1296,10 @@ void rspamd_upstream_ok(struct upstream *upstream)
                upstream->probe_backoff = upstream->ls ? upstream->ls->limits->revive_time : default_revive_time;
                upstream->next_probe_at = 0;
                if (upstream->ls && upstream->active_idx == -1) {
-                       /* Activate this upstream */
+                       /* Activate this upstream; mark for slow-start ramping. */
+                       upstream->revived_at = upstream->ctx && upstream->ctx->event_loop
+                                                                          ? ev_now(upstream->ctx->event_loop)
+                                                                          : rspamd_get_ticks(FALSE);
                        rspamd_upstream_set_active(upstream->ls, upstream);
                }
        }
@@ -2009,16 +2029,65 @@ rspamd_upstream_get_random(struct upstream_list *ups,
        }
 }
 
+/*
+ * Slow start factor in [0, 1]: 1.0 in steady state, ramping linearly from
+ * 0 toward 1 over `slow_start_ms` after a revive. Returns 1.0 when slow
+ * start is disabled or the upstream has never been revived. Mutates
+ * revived_at to clear the cache once the window expires.
+ */
+static inline double
+rspamd_upstream_slow_start_factor(struct upstream *up, double now)
+{
+       const struct upstream_limits *limits;
+       double elapsed_ms;
+       double factor;
+
+       if (up->ls == NULL || up->revived_at <= 0) {
+               return 1.0;
+       }
+       limits = up->ls->limits;
+       if (limits->slow_start_ms == 0) {
+               return 1.0;
+       }
+
+       elapsed_ms = (now - up->revived_at) * 1000.0;
+       if (elapsed_ms <= 0) {
+               return 0.0;
+       }
+       if (elapsed_ms >= (double) limits->slow_start_ms) {
+               up->revived_at = 0; /* clear: no further work for this upstream */
+               return 1.0;
+       }
+
+       factor = elapsed_ms / (double) limits->slow_start_ms;
+       if (factor < 0.0) factor = 0.0;
+       return factor;
+}
+
 /*
  * Load score used by P2C: combines passive in-flight count with a small
  * penalty for recent errors. Lower is better. The errors term keeps
  * P2C biased away from a flapping upstream that hasn't yet accumulated
  * enough failures to be marked dead.
+ *
+ * During slow start, scale the score *up* by the inverse factor so that
+ * a barely-warmed-up upstream looks loaded relative to its peers and
+ * receives proportionally less traffic.
  */
-static inline unsigned int
-rspamd_upstream_load_score(const struct upstream *up)
+static inline double
+rspamd_upstream_load_score(struct upstream *up, double now)
 {
-       return up->inflight + up->errors * 2;
+       double base = (double) up->inflight + (double) up->errors * 2.0;
+       double factor = rspamd_upstream_slow_start_factor(up, now);
+
+       if (factor < 1.0) {
+               /* As factor -> 0, score -> infinity (heavily deprioritised). */
+               if (factor < 0.01) {
+                       factor = 0.01;
+               }
+               return base / factor + (1.0 - factor) * 100.0;
+       }
+       return base;
 }
 
 /*
@@ -2032,6 +2101,7 @@ rspamd_upstream_get_p2c(struct upstream_list *ups, struct upstream *except)
 {
        unsigned int n = ups->alive->len;
        struct upstream *a, *b;
+       double now;
 
        if (n == 0) {
                return NULL;
@@ -2064,7 +2134,14 @@ rspamd_upstream_get_p2c(struct upstream_list *ups, struct upstream *except)
                if (b == except) return a;
        }
 
-       return rspamd_upstream_load_score(a) <= rspamd_upstream_load_score(b) ? a : b;
+       if (ups->ctx && ups->ctx->event_loop) {
+               now = ev_now(ups->ctx->event_loop);
+       }
+       else {
+               now = rspamd_get_ticks(FALSE);
+       }
+
+       return rspamd_upstream_load_score(a, now) <= rspamd_upstream_load_score(b, now) ? a : b;
 }
 
 static struct upstream *
@@ -2075,28 +2152,38 @@ rspamd_upstream_get_round_robin(struct upstream_list *ups,
        unsigned int max_weight = 0, min_checked = G_MAXUINT;
        struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL;
        unsigned int i;
+       double now;
 
        /* Select upstream with the maximum cur_weight */
        RSPAMD_UPSTREAM_LOCK(ups);
 
+       if (ups->ctx && ups->ctx->event_loop) {
+               now = ev_now(ups->ctx->event_loop);
+       }
+       else {
+               now = rspamd_get_ticks(FALSE);
+       }
+
        for (i = 0; i < ups->alive->len; i++) {
+               unsigned int eff;
+               double factor;
+
                up = g_ptr_array_index(ups->alive, i);
 
                if (except != NULL && up == except) {
                        continue;
                }
 
-               if (use_cur) {
-                       if (up->cur_weight > max_weight) {
-                               selected = up;
-                               max_weight = up->cur_weight;
-                       }
+               factor = rspamd_upstream_slow_start_factor(up, now);
+               eff = use_cur ? up->cur_weight : up->weight;
+               if (factor < 1.0) {
+                       /* Scale weight down during the slow-start ramp. */
+                       eff = (unsigned int) ((double) eff * factor);
                }
-               else {
-                       if (up->weight > max_weight) {
-                               selected = up;
-                               max_weight = up->weight;
-                       }
+
+               if (eff > max_weight) {
+                       selected = up;
+                       max_weight = eff;
                }
 
                /*
@@ -2633,6 +2720,19 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups,
        ups->limits = nlimits;
 }
 
+void rspamd_upstreams_set_slow_start(struct upstream_list *ups,
+                                                                        unsigned int slow_start_ms)
+{
+       struct upstream_limits *nlimits;
+       g_assert(ups != NULL);
+       g_assert(ups->ctx != NULL && ups->ctx->pool != NULL);
+
+       nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
+       memcpy(nlimits, ups->limits, sizeof(*nlimits));
+       nlimits->slow_start_ms = slow_start_ms;
+       ups->limits = nlimits;
+}
+
 /*
  * Calculate token cost for a message of given size
  */
index 406bd2e7776dadee0ca92533af115adf7b9b3689..f9061eb7bdc773b83ba1b2ced7fd74240d4b2bb2 100644 (file)
@@ -374,6 +374,17 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups,
                                                                           gsize min_tokens,
                                                                           gsize base_cost);
 
+/**
+ * Configure slow-start window for revived upstreams.
+ * When set, a freshly revived upstream's effective weight ramps linearly
+ * from 0 to its configured weight over the given window. Avoids the
+ * thundering herd that would otherwise hit the just-revived backend.
+ * @param ups upstream list
+ * @param slow_start_ms ramp duration in milliseconds (0 = disabled)
+ */
+void rspamd_upstreams_set_slow_start(struct upstream_list *ups,
+                                                                        unsigned int slow_start_ms);
+
 /**
  * Get upstream using token bucket algorithm.
  * Selects upstream with lowest inflight tokens (weighted by message size).
index 681b1c8864e43c1db4ca1e156a6a701d0e64cb2b..e48a8c726d3d8a74695a1eeaa37563e067f514ef 100644 (file)
@@ -32,6 +32,7 @@
 #include "rspamd_cxx_unit_upstream_ring_hash.hxx"
 #include "rspamd_cxx_unit_upstream_round_robin.hxx"
 #include "rspamd_cxx_unit_upstream_p2c.hxx"
+#include "rspamd_cxx_unit_upstream_slow_start.hxx"
 #include "rspamd_cxx_unit_multipart.hxx"
 #include "rspamd_cxx_unit_settings_merge.hxx"
 
diff --git a/test/rspamd_cxx_unit_upstream_slow_start.hxx b/test/rspamd_cxx_unit_upstream_slow_start.hxx
new file mode 100644 (file)
index 0000000..c3668ba
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Unit tests for upstream slow-start ramping after revive */
+
+#ifndef RSPAMD_CXX_UNIT_UPSTREAM_SLOW_START_HXX
+#define RSPAMD_CXX_UNIT_UPSTREAM_SLOW_START_HXX
+
+#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL
+#include "doctest/doctest.h"
+
+#include "libutil/upstream.h"
+
+#include <map>
+#include <string>
+
+TEST_SUITE("upstream_slow_start")
+{
+       TEST_CASE("setter accepts and applies the window")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+               for (unsigned i = 0; i < 3; i++) {
+                       char addr[32];
+                       snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1);
+                       auto ok = rspamd_upstreams_add_upstream(ups, addr, 11333,
+                                                                                                       RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr);
+                       REQUIRE(ok);
+               }
+
+               rspamd_upstreams_set_slow_start(ups, 5000);
+
+               /* Without revive events the slow-start factor is 1.0, so selection
+                * should still cover all upstreams. */
+               std::map<std::string, int> hits;
+               for (int i = 0; i < 600; i++) {
+                       auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(up != nullptr);
+                       hits[rspamd_upstream_name(up)]++;
+                       rspamd_upstream_ok(up);
+               }
+               CHECK(hits.size() == 3);
+
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("disabled (default) is a no-op")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+               for (unsigned i = 0; i < 3; i++) {
+                       char addr[32];
+                       snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1);
+                       REQUIRE(rspamd_upstreams_add_upstream(ups, addr, 11333,
+                                                                                                 RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+               }
+
+               /* Don't set slow_start. Distribution must stay roughly uniform. */
+               std::map<std::string, int> hits;
+               for (int i = 0; i < 900; i++) {
+                       auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(up != nullptr);
+                       hits[rspamd_upstream_name(up)]++;
+                       rspamd_upstream_ok(up);
+               }
+               CHECK(hits.size() == 3);
+               for (const auto &[name, count]: hits) {
+                       CHECK(count > 200);
+               }
+
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("slow-start setter is idempotent under repeated calls")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_ROUND_ROBIN);
+
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               rspamd_upstreams_set_slow_start(ups, 1000);
+               rspamd_upstreams_set_slow_start(ups, 2000);
+               rspamd_upstreams_set_slow_start(ups, 0);
+
+               auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
+               REQUIRE(up != nullptr);
+               rspamd_upstream_ok(up);
+
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+}
+
+#endif