From: Vsevolod Stakhov Date: Fri, 1 May 2026 08:37:37 +0000 (+0100) Subject: [Feature] upstream: linear slow start on revive X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=98f68d9f03cd466782c3511308e5fc7a906ed4f9;p=thirdparty%2Frspamd.git [Feature] upstream: linear slow start on revive Newly revived upstreams previously rejoined the alive list at full weight, producing a thundering herd that would land on a backend that just came back up and was still warming caches/connection pools — the same backend that had been failing minutes before. This often caused immediate re-failure and a flap loop. Add an opt-in slow_start_ms window (default 0 = disabled) configurable via rspamd_upstreams_set_slow_start. While the window is open, both round-robin (effective weight = weight * factor) and P2C (effective load score = base / factor + warmup penalty) bias selection away from the warming upstream linearly over time. Hashed (Ketama) intentionally not integrated: scaling vnode counts during the window would defeat the consistency property that hashed selection exists for. Token bucket likewise unaffected — its inflight-based fairness already handles cold buckets gracefully. revived_at is set in the two real revive paths: the timer-based revive_cb and the half-open probe success path in ok(). The initial add_upstream activation is left unmarked so cold starts after a config reload aren't artificially throttled. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index f45a8cc73c..59a9590681 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -77,6 +77,13 @@ struct upstream { double next_probe_at; double probe_backoff; unsigned int half_open_inflight; + /* + * Wall time (ev_now/ticks) of the most recent revive. Zero when the + * upstream is in steady state. While non-zero and within the configured + * slow_start window, selection scales the upstream's effective weight + * up linearly from 0 to 1. + */ + double revived_at; gpointer ud; enum rspamd_upstream_flag flags; struct upstream_list *ls; @@ -127,6 +134,14 @@ struct upstream_limits { gsize token_bucket_min; /* Min tokens for selection (default: 1) */ gsize token_bucket_base_cost; /* Base cost per request (default: 10) */ gsize token_bucket_refill_per_s; /* Lazy refill rate (default: max/60) */ + + /* + * Slow start window (milliseconds). When non-zero, a freshly revived + * upstream's effective weight ramps linearly from 0 to its configured + * weight over this window, smoothing the thundering herd that otherwise + * lands on the just-revived backend. Default 0 (disabled). + */ + unsigned int slow_start_ms; }; struct upstream_list { @@ -836,6 +851,8 @@ rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents) msg_debug_upstream("revive upstream %s", upstream->name); if (upstream->ls) { + /* Mark the time so selection paths can apply slow-start ramping. */ + upstream->revived_at = ev_now(loop); rspamd_upstream_set_active(upstream->ls, upstream); } @@ -1279,7 +1296,10 @@ void rspamd_upstream_ok(struct upstream *upstream) upstream->probe_backoff = upstream->ls ? upstream->ls->limits->revive_time : default_revive_time; upstream->next_probe_at = 0; if (upstream->ls && upstream->active_idx == -1) { - /* Activate this upstream */ + /* Activate this upstream; mark for slow-start ramping. */ + upstream->revived_at = upstream->ctx && upstream->ctx->event_loop + ? ev_now(upstream->ctx->event_loop) + : rspamd_get_ticks(FALSE); rspamd_upstream_set_active(upstream->ls, upstream); } } @@ -2009,16 +2029,65 @@ rspamd_upstream_get_random(struct upstream_list *ups, } } +/* + * Slow start factor in [0, 1]: 1.0 in steady state, ramping linearly from + * 0 toward 1 over `slow_start_ms` after a revive. Returns 1.0 when slow + * start is disabled or the upstream has never been revived. Mutates + * revived_at to clear the cache once the window expires. + */ +static inline double +rspamd_upstream_slow_start_factor(struct upstream *up, double now) +{ + const struct upstream_limits *limits; + double elapsed_ms; + double factor; + + if (up->ls == NULL || up->revived_at <= 0) { + return 1.0; + } + limits = up->ls->limits; + if (limits->slow_start_ms == 0) { + return 1.0; + } + + elapsed_ms = (now - up->revived_at) * 1000.0; + if (elapsed_ms <= 0) { + return 0.0; + } + if (elapsed_ms >= (double) limits->slow_start_ms) { + up->revived_at = 0; /* clear: no further work for this upstream */ + return 1.0; + } + + factor = elapsed_ms / (double) limits->slow_start_ms; + if (factor < 0.0) factor = 0.0; + return factor; +} + /* * Load score used by P2C: combines passive in-flight count with a small * penalty for recent errors. Lower is better. The errors term keeps * P2C biased away from a flapping upstream that hasn't yet accumulated * enough failures to be marked dead. + * + * During slow start, scale the score *up* by the inverse factor so that + * a barely-warmed-up upstream looks loaded relative to its peers and + * receives proportionally less traffic. */ -static inline unsigned int -rspamd_upstream_load_score(const struct upstream *up) +static inline double +rspamd_upstream_load_score(struct upstream *up, double now) { - return up->inflight + up->errors * 2; + double base = (double) up->inflight + (double) up->errors * 2.0; + double factor = rspamd_upstream_slow_start_factor(up, now); + + if (factor < 1.0) { + /* As factor -> 0, score -> infinity (heavily deprioritised). */ + if (factor < 0.01) { + factor = 0.01; + } + return base / factor + (1.0 - factor) * 100.0; + } + return base; } /* @@ -2032,6 +2101,7 @@ rspamd_upstream_get_p2c(struct upstream_list *ups, struct upstream *except) { unsigned int n = ups->alive->len; struct upstream *a, *b; + double now; if (n == 0) { return NULL; @@ -2064,7 +2134,14 @@ rspamd_upstream_get_p2c(struct upstream_list *ups, struct upstream *except) if (b == except) return a; } - return rspamd_upstream_load_score(a) <= rspamd_upstream_load_score(b) ? a : b; + if (ups->ctx && ups->ctx->event_loop) { + now = ev_now(ups->ctx->event_loop); + } + else { + now = rspamd_get_ticks(FALSE); + } + + return rspamd_upstream_load_score(a, now) <= rspamd_upstream_load_score(b, now) ? a : b; } static struct upstream * @@ -2075,28 +2152,38 @@ rspamd_upstream_get_round_robin(struct upstream_list *ups, unsigned int max_weight = 0, min_checked = G_MAXUINT; struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL; unsigned int i; + double now; /* Select upstream with the maximum cur_weight */ RSPAMD_UPSTREAM_LOCK(ups); + if (ups->ctx && ups->ctx->event_loop) { + now = ev_now(ups->ctx->event_loop); + } + else { + now = rspamd_get_ticks(FALSE); + } + for (i = 0; i < ups->alive->len; i++) { + unsigned int eff; + double factor; + up = g_ptr_array_index(ups->alive, i); if (except != NULL && up == except) { continue; } - if (use_cur) { - if (up->cur_weight > max_weight) { - selected = up; - max_weight = up->cur_weight; - } + factor = rspamd_upstream_slow_start_factor(up, now); + eff = use_cur ? up->cur_weight : up->weight; + if (factor < 1.0) { + /* Scale weight down during the slow-start ramp. */ + eff = (unsigned int) ((double) eff * factor); } - else { - if (up->weight > max_weight) { - selected = up; - max_weight = up->weight; - } + + if (eff > max_weight) { + selected = up; + max_weight = eff; } /* @@ -2633,6 +2720,19 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups, ups->limits = nlimits; } +void rspamd_upstreams_set_slow_start(struct upstream_list *ups, + unsigned int slow_start_ms) +{ + struct upstream_limits *nlimits; + g_assert(ups != NULL); + g_assert(ups->ctx != NULL && ups->ctx->pool != NULL); + + nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits)); + memcpy(nlimits, ups->limits, sizeof(*nlimits)); + nlimits->slow_start_ms = slow_start_ms; + ups->limits = nlimits; +} + /* * Calculate token cost for a message of given size */ diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 406bd2e777..f9061eb7bd 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -374,6 +374,17 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups, gsize min_tokens, gsize base_cost); +/** + * Configure slow-start window for revived upstreams. + * When set, a freshly revived upstream's effective weight ramps linearly + * from 0 to its configured weight over the given window. Avoids the + * thundering herd that would otherwise hit the just-revived backend. + * @param ups upstream list + * @param slow_start_ms ramp duration in milliseconds (0 = disabled) + */ +void rspamd_upstreams_set_slow_start(struct upstream_list *ups, + unsigned int slow_start_ms); + /** * Get upstream using token bucket algorithm. * Selects upstream with lowest inflight tokens (weighted by message size). diff --git a/test/rspamd_cxx_unit.cxx b/test/rspamd_cxx_unit.cxx index 681b1c8864..e48a8c726d 100644 --- a/test/rspamd_cxx_unit.cxx +++ b/test/rspamd_cxx_unit.cxx @@ -32,6 +32,7 @@ #include "rspamd_cxx_unit_upstream_ring_hash.hxx" #include "rspamd_cxx_unit_upstream_round_robin.hxx" #include "rspamd_cxx_unit_upstream_p2c.hxx" +#include "rspamd_cxx_unit_upstream_slow_start.hxx" #include "rspamd_cxx_unit_multipart.hxx" #include "rspamd_cxx_unit_settings_merge.hxx" diff --git a/test/rspamd_cxx_unit_upstream_slow_start.hxx b/test/rspamd_cxx_unit_upstream_slow_start.hxx new file mode 100644 index 0000000000..c3668ba6e2 --- /dev/null +++ b/test/rspamd_cxx_unit_upstream_slow_start.hxx @@ -0,0 +1,115 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Unit tests for upstream slow-start ramping after revive */ + +#ifndef RSPAMD_CXX_UNIT_UPSTREAM_SLOW_START_HXX +#define RSPAMD_CXX_UNIT_UPSTREAM_SLOW_START_HXX + +#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL +#include "doctest/doctest.h" + +#include "libutil/upstream.h" + +#include +#include + +TEST_SUITE("upstream_slow_start") +{ + TEST_CASE("setter accepts and applies the window") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C); + + for (unsigned i = 0; i < 3; i++) { + char addr[32]; + snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1); + auto ok = rspamd_upstreams_add_upstream(ups, addr, 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr); + REQUIRE(ok); + } + + rspamd_upstreams_set_slow_start(ups, 5000); + + /* Without revive events the slow-start factor is 1.0, so selection + * should still cover all upstreams. */ + std::map hits; + for (int i = 0; i < 600; i++) { + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + hits[rspamd_upstream_name(up)]++; + rspamd_upstream_ok(up); + } + CHECK(hits.size() == 3); + + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("disabled (default) is a no-op") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C); + + for (unsigned i = 0; i < 3; i++) { + char addr[32]; + snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1); + REQUIRE(rspamd_upstreams_add_upstream(ups, addr, 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + } + + /* Don't set slow_start. Distribution must stay roughly uniform. */ + std::map hits; + for (int i = 0; i < 900; i++) { + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + hits[rspamd_upstream_name(up)]++; + rspamd_upstream_ok(up); + } + CHECK(hits.size() == 3); + for (const auto &[name, count]: hits) { + CHECK(count > 200); + } + + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("slow-start setter is idempotent under repeated calls") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_ROUND_ROBIN); + + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + rspamd_upstreams_set_slow_start(ups, 1000); + rspamd_upstreams_set_slow_start(ups, 2000); + rspamd_upstreams_set_slow_start(ups, 0); + + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0); + REQUIRE(up != nullptr); + rspamd_upstream_ok(up); + + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } +} + +#endif