* up linearly from 0 to 1.
*/
double revived_at;
+
+ /*
+ * Latency EWMA in seconds. Zero when no samples have been recorded.
+ * Updated by rspamd_upstream_record_latency with time-weighted decay
+ * controlled by upstream_limits.latency_half_life_s.
+ */
+ double latency_ewma;
+ double latency_last_at;
+ unsigned int latency_n;
gpointer ud;
enum rspamd_upstream_flag flags;
struct upstream_list *ls;
* lands on the just-revived backend. Default 0 (disabled).
*/
unsigned int slow_start_ms;
+
+ /*
+ * Latency EWMA half-life in seconds. Larger = slower to react, smaller
+ * = noisier. Default 60.0. Set to 0 to weight every sample equally
+ * (degrades to a 1/n moving average regardless of inter-arrival time).
+ */
+ double latency_half_life_s;
};
struct upstream_list {
#define DEFAULT_TOKEN_BUCKET_BASE_COST 10
/* Default refill rate: full bucket regenerates in 60s of wall time. */
#define DEFAULT_TOKEN_BUCKET_REFILL_PER_S (DEFAULT_TOKEN_BUCKET_MAX / 60)
+/* EWMA half-life: stale samples lose half their weight every 60s. */
+#define DEFAULT_LATENCY_HALF_LIFE_S 60.0
/*
* Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the
.token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN,
.token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST,
.token_bucket_refill_per_s = DEFAULT_TOKEN_BUCKET_REFILL_PER_S,
+ .latency_half_life_s = DEFAULT_LATENCY_HALF_LIFE_S,
};
static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
/*
* Load score used by P2C: combines passive in-flight count with a small
- * penalty for recent errors. Lower is better. The errors term keeps
- * P2C biased away from a flapping upstream that hasn't yet accumulated
- * enough failures to be marked dead.
+ * penalty for recent errors and (when available) latency EWMA. Lower is
+ * better.
+ *
+ * Phase 2 score, when latency samples exist:
+ * score = latency * (inflight + 1) + errors_penalty
+ *
+ * This is a lightweight approximation of PeakEWMA used by Linkerd/Finagle:
+ * a slow backend with low load still loses to a fast one with comparable
+ * load; a fast backend with high load can still lose to an idle slow one
+ * if the latency gap is small enough.
*
- * During slow start, scale the score *up* by the inverse factor so that
- * a barely-warmed-up upstream looks loaded relative to its peers and
+ * Phase 1 fallback (no latency yet):
+ * score = inflight + errors * 2
+ *
+ * During slow start the score is scaled *up* by the inverse factor so a
+ * barely-warmed-up upstream looks loaded relative to its peers and
* receives proportionally less traffic.
*/
static inline double
rspamd_upstream_load_score(struct upstream *up, double now)
{
- double base = (double) up->inflight + (double) up->errors * 2.0;
- double factor = rspamd_upstream_slow_start_factor(up, now);
+ double base;
+ double factor;
+
+ if (up->latency_n > 0 && up->latency_ewma > 0) {
+ base = up->latency_ewma * (double) (up->inflight + 1) +
+ (double) up->errors * 5.0 * up->latency_ewma;
+ }
+ else {
+ base = (double) up->inflight + (double) up->errors * 2.0;
+ }
+ factor = rspamd_upstream_slow_start_factor(up, now);
if (factor < 1.0) {
/* As factor -> 0, score -> infinity (heavily deprioritised). */
if (factor < 0.01) {
ups->limits = nlimits;
}
+void rspamd_upstreams_set_latency_half_life(struct upstream_list *ups,
+ double half_life_s)
+{
+ struct upstream_limits *nlimits;
+ g_assert(ups != NULL);
+ g_assert(ups->ctx != NULL && ups->ctx->pool != NULL);
+
+ if (half_life_s < 0) {
+ half_life_s = 0;
+ }
+ nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
+ memcpy(nlimits, ups->limits, sizeof(*nlimits));
+ nlimits->latency_half_life_s = half_life_s;
+ ups->limits = nlimits;
+}
+
+/*
+ * Time-weighted EWMA for latency. Older samples decay so that a
+ * once-slow-but-recovered upstream isn't forever penalised.
+ *
+ * Mathematically: alpha = 1 - exp(-dt / tau), where tau is set so the
+ * weight halves over `latency_half_life_s` of wall time. tau = hl/ln(2).
+ *
+ * If half_life is 0 we degrade to a flat moving average where every
+ * sample has equal weight regardless of arrival time.
+ */
+void rspamd_upstream_record_latency(struct upstream *up, double seconds)
+{
+ double now;
+ double dt;
+ double tau;
+ double alpha;
+ double half_life;
+
+ if (up == NULL || seconds < 0) {
+ return;
+ }
+
+ RSPAMD_UPSTREAM_LOCK(up);
+
+ if (up->ctx && up->ctx->event_loop) {
+ now = ev_now(up->ctx->event_loop);
+ }
+ else {
+ now = rspamd_get_ticks(FALSE);
+ }
+
+ if (up->latency_n == 0 || up->latency_last_at <= 0) {
+ up->latency_ewma = seconds;
+ }
+ else {
+ half_life = up->ls ? up->ls->limits->latency_half_life_s : DEFAULT_LATENCY_HALF_LIFE_S;
+ if (half_life <= 0) {
+ /* Flat moving average. */
+ alpha = 1.0 / (double) (up->latency_n + 1);
+ }
+ else {
+ dt = now - up->latency_last_at;
+ if (dt < 0.0) {
+ dt = 0.0;
+ }
+ tau = half_life / 0.6931471805599453; /* ln(2) */
+ alpha = 1.0 - exp(-dt / tau);
+ /* Cap so we never wholly forget the prior estimate. */
+ if (alpha > 0.5) alpha = 0.5;
+ if (alpha < 0.01) alpha = 0.01;
+ }
+ up->latency_ewma = alpha * seconds + (1.0 - alpha) * up->latency_ewma;
+ }
+
+ up->latency_last_at = now;
+ if (up->latency_n < UINT_MAX) {
+ up->latency_n++;
+ }
+
+ RSPAMD_UPSTREAM_UNLOCK(up);
+}
+
+double rspamd_upstream_get_latency(const struct upstream *up)
+{
+ if (up == NULL) {
+ return 0.0;
+ }
+ return up->latency_ewma;
+}
+
/*
* Calculate token cost for a message of given size
*/
--- /dev/null
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Unit tests for upstream latency EWMA tracking and P2C integration */
+
+#ifndef RSPAMD_CXX_UNIT_UPSTREAM_LATENCY_HXX
+#define RSPAMD_CXX_UNIT_UPSTREAM_LATENCY_HXX
+
+#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL
+#include "doctest/doctest.h"
+
+#include "libutil/upstream.h"
+
+#include <cmath>
+#include <map>
+#include <string>
+
+TEST_SUITE("upstream_latency")
+{
+ TEST_CASE("first sample sets the EWMA exactly")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+
+ CHECK(rspamd_upstream_get_latency(up) == 0.0);
+ rspamd_upstream_record_latency(up, 0.123);
+ CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.123));
+
+ rspamd_upstream_ok(up);
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("repeated samples converge toward steady value")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+
+ /* Feed 200 identical 0.05s samples; EWMA must converge to 0.05. */
+ for (int i = 0; i < 200; i++) {
+ rspamd_upstream_record_latency(up, 0.05);
+ }
+ CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.05).epsilon(0.01));
+
+ rspamd_upstream_ok(up);
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("step change is reflected after enough samples")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+
+ /* Steady at 1.0s, then a step to 0.1s. After many samples at 0.1
+ * the EWMA should be much closer to 0.1 than to 1.0. */
+ for (int i = 0; i < 50; i++) {
+ rspamd_upstream_record_latency(up, 1.0);
+ }
+ double slow = rspamd_upstream_get_latency(up);
+ CHECK(slow > 0.5);
+
+ for (int i = 0; i < 500; i++) {
+ rspamd_upstream_record_latency(up, 0.1);
+ }
+ double fast = rspamd_upstream_get_latency(up);
+ CHECK(fast < slow);
+ CHECK(fast < 0.5);
+
+ rspamd_upstream_ok(up);
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("negative samples are ignored")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+
+ rspamd_upstream_record_latency(up, 0.05);
+ double before = rspamd_upstream_get_latency(up);
+ rspamd_upstream_record_latency(up, -1.0);
+ double after = rspamd_upstream_get_latency(up);
+ CHECK(after == doctest::Approx(before));
+
+ rspamd_upstream_ok(up);
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("set_latency_half_life accepts and clamps")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ rspamd_upstreams_set_latency_half_life(ups, 30.0);
+ rspamd_upstreams_set_latency_half_life(ups, 0);
+ rspamd_upstreams_set_latency_half_life(ups, -5);
+
+ auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(up != nullptr);
+ rspamd_upstream_record_latency(up, 0.2);
+ CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.2));
+ rspamd_upstream_ok(up);
+
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("P2C prefers the lower-latency upstream when load matches")
+ {
+ auto *ctx = rspamd_upstreams_library_init();
+ auto *ups = rspamd_upstreams_create(ctx);
+ rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+ REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.2:11333", 11333,
+ RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+ /* Collect the two distinct upstream pointers via repeated selection. */
+ std::map<std::string, struct upstream *> seen;
+ for (int tries = 0; tries < 200 && seen.size() < 2; tries++) {
+ auto *u = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(u != nullptr);
+ seen[rspamd_upstream_name(u)] = u;
+ rspamd_upstream_ok(u);
+ }
+ REQUIRE(seen.size() == 2);
+
+ auto it = seen.begin();
+ struct upstream *fast = it->second;
+ std::string fast_name = it->first;
+ ++it;
+ struct upstream *slow = it->second;
+ std::string slow_name = it->first;
+
+ /* Plant clear EWMAs: fast = 10ms, slow = 500ms. */
+ for (int i = 0; i < 30; i++) {
+ rspamd_upstream_record_latency(fast, 0.01);
+ rspamd_upstream_record_latency(slow, 0.5);
+ }
+ CHECK(rspamd_upstream_get_latency(fast) < rspamd_upstream_get_latency(slow));
+
+ std::map<std::string, int> hits;
+ for (int i = 0; i < 1000; i++) {
+ auto *u = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+ REQUIRE(u != nullptr);
+ hits[rspamd_upstream_name(u)]++;
+ rspamd_upstream_ok(u);
+ }
+
+ /* Latency-aware P2C should heavily favour the fast upstream. */
+ CHECK(hits[fast_name] > hits[slow_name] * 2);
+
+ rspamd_upstreams_destroy(ups);
+ rspamd_upstreams_library_unref(ctx);
+ }
+
+ TEST_CASE("null/zero arguments handled safely")
+ {
+ rspamd_upstream_record_latency(nullptr, 0.1);
+ CHECK(rspamd_upstream_get_latency(nullptr) == 0.0);
+ }
+}
+
+#endif