double next_probe_at;
double probe_backoff;
unsigned int half_open_inflight;
+ /*
+ * Wall time (ev_now/ticks) of the most recent revive. Zero when the
+ * upstream is in steady state. While non-zero and within the configured
+ * slow_start window, selection scales the upstream's effective weight
+ * up linearly from 0 to 1.
+ */
+ double revived_at;
gpointer ud;
enum rspamd_upstream_flag flags;
struct upstream_list *ls;
gsize token_bucket_min; /* Min tokens for selection (default: 1) */
gsize token_bucket_base_cost; /* Base cost per request (default: 10) */
gsize token_bucket_refill_per_s; /* Lazy refill rate (default: max/60) */
+
+ /*
+ * Slow start window (milliseconds). When non-zero, a freshly revived
+ * upstream's effective weight ramps linearly from 0 to its configured
+ * weight over this window, smoothing the thundering herd that otherwise
+ * lands on the just-revived backend. Default 0 (disabled).
+ */
+ unsigned int slow_start_ms;
};
struct upstream_list {
msg_debug_upstream("revive upstream %s", upstream->name);
if (upstream->ls) {
+ /* Mark the time so selection paths can apply slow-start ramping. */
+ upstream->revived_at = ev_now(loop);
rspamd_upstream_set_active(upstream->ls, upstream);
}
upstream->probe_backoff = upstream->ls ? upstream->ls->limits->revive_time : default_revive_time;
upstream->next_probe_at = 0;
if (upstream->ls && upstream->active_idx == -1) {
- /* Activate this upstream */
+ /* Activate this upstream; mark for slow-start ramping. */
+ upstream->revived_at = upstream->ctx && upstream->ctx->event_loop
+ ? ev_now(upstream->ctx->event_loop)
+ : rspamd_get_ticks(FALSE);
rspamd_upstream_set_active(upstream->ls, upstream);
}
}
}
}
+/*
+ * Slow start factor in [0, 1]: 1.0 in steady state, ramping linearly from
+ * 0 toward 1 over `slow_start_ms` after a revive. Returns 1.0 when slow
+ * start is disabled or the upstream has never been revived. Mutates
+ * revived_at to clear the cache once the window expires.
+ */
+static inline double
+rspamd_upstream_slow_start_factor(struct upstream *up, double now)
+{
+ const struct upstream_limits *limits;
+ double elapsed_ms;
+ double factor;
+
+ if (up->ls == NULL || up->revived_at <= 0) {
+ return 1.0;
+ }
+ limits = up->ls->limits;
+ if (limits->slow_start_ms == 0) {
+ return 1.0;
+ }
+
+ elapsed_ms = (now - up->revived_at) * 1000.0;
+ if (elapsed_ms <= 0) {
+ return 0.0;
+ }
+ if (elapsed_ms >= (double) limits->slow_start_ms) {
+ up->revived_at = 0; /* clear: no further work for this upstream */
+ return 1.0;
+ }
+
+ factor = elapsed_ms / (double) limits->slow_start_ms;
+ if (factor < 0.0) factor = 0.0;
+ return factor;
+}
+
/*
* Load score used by P2C: combines passive in-flight count with a small
* penalty for recent errors. Lower is better. The errors term keeps
* P2C biased away from a flapping upstream that hasn't yet accumulated
* enough failures to be marked dead.
+ *
+ * During slow start, scale the score *up* by the inverse factor so that
+ * a barely-warmed-up upstream looks loaded relative to its peers and
+ * receives proportionally less traffic.
*/
-static inline unsigned int
-rspamd_upstream_load_score(const struct upstream *up)
+static inline double
+rspamd_upstream_load_score(struct upstream *up, double now)
{
- return up->inflight + up->errors * 2;
+ double base = (double) up->inflight + (double) up->errors * 2.0;
+ double factor = rspamd_upstream_slow_start_factor(up, now);
+
+ if (factor < 1.0) {
+ /* As factor -> 0, score -> infinity (heavily deprioritised). */
+ if (factor < 0.01) {
+ factor = 0.01;
+ }
+ return base / factor + (1.0 - factor) * 100.0;
+ }
+ return base;
}
/*
{
unsigned int n = ups->alive->len;
struct upstream *a, *b;
+ double now;
if (n == 0) {
return NULL;
if (b == except) return a;
}
- return rspamd_upstream_load_score(a) <= rspamd_upstream_load_score(b) ? a : b;
+ if (ups->ctx && ups->ctx->event_loop) {
+ now = ev_now(ups->ctx->event_loop);
+ }
+ else {
+ now = rspamd_get_ticks(FALSE);
+ }
+
+ return rspamd_upstream_load_score(a, now) <= rspamd_upstream_load_score(b, now) ? a : b;
}
static struct upstream *
unsigned int max_weight = 0, min_checked = G_MAXUINT;
struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL;
unsigned int i;
+ double now;
/* Select upstream with the maximum cur_weight */
RSPAMD_UPSTREAM_LOCK(ups);
+ if (ups->ctx && ups->ctx->event_loop) {
+ now = ev_now(ups->ctx->event_loop);
+ }
+ else {
+ now = rspamd_get_ticks(FALSE);
+ }
+
for (i = 0; i < ups->alive->len; i++) {
+ unsigned int eff;
+ double factor;
+
up = g_ptr_array_index(ups->alive, i);
if (except != NULL && up == except) {
continue;
}
- if (use_cur) {
- if (up->cur_weight > max_weight) {
- selected = up;
- max_weight = up->cur_weight;
- }
+ factor = rspamd_upstream_slow_start_factor(up, now);
+ eff = use_cur ? up->cur_weight : up->weight;
+ if (factor < 1.0) {
+ /* Scale weight down during the slow-start ramp. */
+ eff = (unsigned int) ((double) eff * factor);
}
- else {
- if (up->weight > max_weight) {
- selected = up;
- max_weight = up->weight;
- }
+
+ if (eff > max_weight) {
+ selected = up;
+ max_weight = eff;
}
/*
ups->limits = nlimits;
}
+void rspamd_upstreams_set_slow_start(struct upstream_list *ups,
+ unsigned int slow_start_ms)
+{
+ struct upstream_limits *nlimits;
+ g_assert(ups != NULL);
+ g_assert(ups->ctx != NULL && ups->ctx->pool != NULL);
+
+ nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
+ memcpy(nlimits, ups->limits, sizeof(*nlimits));
+ nlimits->slow_start_ms = slow_start_ms;
+ ups->limits = nlimits;
+}
+
/*
* Calculate token cost for a message of given size
*/
--- /dev/null
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Unit tests for upstream slow-start ramping after revive */
+
+#ifndef RSPAMD_CXX_UNIT_UPSTREAM_SLOW_START_HXX
+#define RSPAMD_CXX_UNIT_UPSTREAM_SLOW_START_HXX
+
+#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL
+#include "doctest/doctest.h"
+
+#include "libutil/upstream.h"
+
+#include <map>
+#include <string>
+
+TEST_SUITE("upstream_slow_start")
+{
+ TEST_CASE("setter accepts and applies the window")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+ for (unsigned i = 0; i < 3; i++) {
+ char addr[32];
+ snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1);
+ auto ok = rspamd_upstreams_add_upstream(ups, addr, 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr);
+ REQUIRE(ok);
+ }
+
+ rspamd_upstreams_set_slow_start(ups, 5000);
+
+ /* Without revive events the slow-start factor is 1.0, so selection
+ * should still cover all upstreams. */
+ std::map<std::string, int> hits;
+ for (int i = 0; i < 600; i++) {
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+ hits[rspamd_upstream_name(up)]++;
+ rspamd_upstream_ok(up);
+ }
+ CHECK(hits.size() == 3);
+
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("disabled (default) is a no-op")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+ for (unsigned i = 0; i < 3; i++) {
+ char addr[32];
+ snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1);
+ REQUIRE(rspamd_upstreams_add_upstream(ups, addr, 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+ }
+
+ /* Don't set slow_start. Distribution must stay roughly uniform. */
+ std::map<std::string, int> hits;
+ for (int i = 0; i < 900; i++) {
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+ hits[rspamd_upstream_name(up)]++;
+ rspamd_upstream_ok(up);
+ }
+ CHECK(hits.size() == 3);
+ for (const auto &[name, count]: hits) {
+ CHECK(count > 200);
+ }
+
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("slow-start setter is idempotent under repeated calls")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_ROUND_ROBIN);
+
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ rspamd_upstreams_set_slow_start(ups, 1000);
+ rspamd_upstreams_set_slow_start(ups, 2000);
+ rspamd_upstreams_set_slow_start(ups, 0);
+
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
+ REQUIRE(up != nullptr);
+ rspamd_upstream_ok(up);
+
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+}
+
+#endif