]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] upstream: per-upstream latency EWMA + P2C integration
authorVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:57:32 +0000 (09:57 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:57:32 +0000 (09:57 +0100)
Track an exponentially-weighted moving average of per-request latency
on each upstream, with a configurable half-life (default 60s) so older
samples decay and a once-slow-now-recovered backend isn't permanently
penalised. Updates are time-weighted: alpha = 1 - exp(-dt/tau) where
tau = half_life / ln(2). Setting half_life to 0 falls back to a flat
moving average where every sample has equal weight.

Wire it into the P2C load score:
  score = latency * (inflight + 1) + errors * 5 * latency
when at least one sample exists; fall back to the existing
inflight + errors*2 form otherwise. This is a lightweight approximation
of PeakEWMA — a slow backend with low load loses to a fast one with
comparable load, but a fast backend can still lose if it gets too busy.

New public API:
  rspamd_upstream_record_latency(up, seconds)
  rspamd_upstream_get_latency(up)
  rspamd_upstreams_set_latency_half_life(ups, seconds)

Callers opt in by recording observed RTT alongside their existing
ok()/fail() calls. The score function falls back gracefully to Phase 1
behaviour for upstream lists where no caller has wired up sampling
yet, so this commit is a no-op for current users.

src/libutil/upstream.c
src/libutil/upstream.h
test/rspamd_cxx_unit.cxx
test/rspamd_cxx_unit_upstream_latency.hxx [new file with mode: 0644]

index 59a959068113d39cbff5f260f6fb30bf0117dcaf..bead2cf87dfcbfdc4e60158c9264df01d676ec14 100644 (file)
@@ -84,6 +84,15 @@ struct upstream {
         * up linearly from 0 to 1.
         */
        double revived_at;
+
+       /*
+        * Latency EWMA in seconds. Zero when no samples have been recorded.
+        * Updated by rspamd_upstream_record_latency with time-weighted decay
+        * controlled by upstream_limits.latency_half_life_s.
+        */
+       double latency_ewma;
+       double latency_last_at;
+       unsigned int latency_n;
        gpointer ud;
        enum rspamd_upstream_flag flags;
        struct upstream_list *ls;
@@ -142,6 +151,13 @@ struct upstream_limits {
         * lands on the just-revived backend. Default 0 (disabled).
         */
        unsigned int slow_start_ms;
+
+       /*
+        * Latency EWMA half-life in seconds. Larger = slower to react, smaller
+        * = noisier. Default 60.0. Set to 0 to weight every sample equally
+        * (degrades to a 1/n moving average regardless of inter-arrival time).
+        */
+       double latency_half_life_s;
 };
 
 struct upstream_list {
@@ -231,6 +247,8 @@ static const double default_probe_jitter = DEFAULT_PROBE_JITTER;
 #define DEFAULT_TOKEN_BUCKET_BASE_COST 10
 /* Default refill rate: full bucket regenerates in 60s of wall time. */
 #define DEFAULT_TOKEN_BUCKET_REFILL_PER_S (DEFAULT_TOKEN_BUCKET_MAX / 60)
+/* EWMA half-life: stale samples lose half their weight every 60s. */
+#define DEFAULT_LATENCY_HALF_LIFE_S 60.0
 
 /*
  * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the
@@ -255,6 +273,7 @@ static const struct upstream_limits default_limits = {
        .token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN,
        .token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST,
        .token_bucket_refill_per_s = DEFAULT_TOKEN_BUCKET_REFILL_PER_S,
+       .latency_half_life_s = DEFAULT_LATENCY_HALF_LIFE_S,
 };
 
 static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
@@ -2066,20 +2085,39 @@ rspamd_upstream_slow_start_factor(struct upstream *up, double now)
 
 /*
  * Load score used by P2C: combines passive in-flight count with a small
- * penalty for recent errors. Lower is better. The errors term keeps
- * P2C biased away from a flapping upstream that hasn't yet accumulated
- * enough failures to be marked dead.
+ * penalty for recent errors and (when available) latency EWMA. Lower is
+ * better.
+ *
+ * Phase 2 score, when latency samples exist:
+ *   score = latency * (inflight + 1) + errors_penalty
+ *
+ * This is a lightweight approximation of PeakEWMA used by Linkerd/Finagle:
+ * a slow backend with low load still loses to a fast one with comparable
+ * load; a fast backend with high load can still lose to an idle slow one
+ * if the latency gap is small enough.
  *
- * During slow start, scale the score *up* by the inverse factor so that
- * a barely-warmed-up upstream looks loaded relative to its peers and
+ * Phase 1 fallback (no latency yet):
+ *   score = inflight + errors * 2
+ *
+ * During slow start the score is scaled *up* by the inverse factor so a
+ * barely-warmed-up upstream looks loaded relative to its peers and
  * receives proportionally less traffic.
  */
 static inline double
 rspamd_upstream_load_score(struct upstream *up, double now)
 {
-       double base = (double) up->inflight + (double) up->errors * 2.0;
-       double factor = rspamd_upstream_slow_start_factor(up, now);
+       double base;
+       double factor;
+
+       if (up->latency_n > 0 && up->latency_ewma > 0) {
+               base = up->latency_ewma * (double) (up->inflight + 1) +
+                          (double) up->errors * 5.0 * up->latency_ewma;
+       }
+       else {
+               base = (double) up->inflight + (double) up->errors * 2.0;
+       }
 
+       factor = rspamd_upstream_slow_start_factor(up, now);
        if (factor < 1.0) {
                /* As factor -> 0, score -> infinity (heavily deprioritised). */
                if (factor < 0.01) {
@@ -2733,6 +2771,92 @@ void rspamd_upstreams_set_slow_start(struct upstream_list *ups,
        ups->limits = nlimits;
 }
 
+void rspamd_upstreams_set_latency_half_life(struct upstream_list *ups,
+                                                                                       double half_life_s)
+{
+       struct upstream_limits *nlimits;
+       g_assert(ups != NULL);
+       g_assert(ups->ctx != NULL && ups->ctx->pool != NULL);
+
+       if (half_life_s < 0) {
+               half_life_s = 0;
+       }
+       nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
+       memcpy(nlimits, ups->limits, sizeof(*nlimits));
+       nlimits->latency_half_life_s = half_life_s;
+       ups->limits = nlimits;
+}
+
+/*
+ * Time-weighted EWMA for latency. Older samples decay so that a
+ * once-slow-but-recovered upstream isn't forever penalised.
+ *
+ * Mathematically: alpha = 1 - exp(-dt / tau), where tau is set so the
+ * weight halves over `latency_half_life_s` of wall time. tau = hl/ln(2).
+ *
+ * If half_life is 0 we degrade to a flat moving average where every
+ * sample has equal weight regardless of arrival time.
+ */
+void rspamd_upstream_record_latency(struct upstream *up, double seconds)
+{
+       double now;
+       double dt;
+       double tau;
+       double alpha;
+       double half_life;
+
+       if (up == NULL || seconds < 0) {
+               return;
+       }
+
+       RSPAMD_UPSTREAM_LOCK(up);
+
+       if (up->ctx && up->ctx->event_loop) {
+               now = ev_now(up->ctx->event_loop);
+       }
+       else {
+               now = rspamd_get_ticks(FALSE);
+       }
+
+       if (up->latency_n == 0 || up->latency_last_at <= 0) {
+               up->latency_ewma = seconds;
+       }
+       else {
+               half_life = up->ls ? up->ls->limits->latency_half_life_s : DEFAULT_LATENCY_HALF_LIFE_S;
+               if (half_life <= 0) {
+                       /* Flat moving average. */
+                       alpha = 1.0 / (double) (up->latency_n + 1);
+               }
+               else {
+                       dt = now - up->latency_last_at;
+                       if (dt < 0.0) {
+                               dt = 0.0;
+                       }
+                       tau = half_life / 0.6931471805599453; /* ln(2) */
+                       alpha = 1.0 - exp(-dt / tau);
+                       /* Cap so we never wholly forget the prior estimate. */
+                       if (alpha > 0.5) alpha = 0.5;
+                       if (alpha < 0.01) alpha = 0.01;
+               }
+               up->latency_ewma = alpha * seconds + (1.0 - alpha) * up->latency_ewma;
+       }
+
+       up->latency_last_at = now;
+       if (up->latency_n < UINT_MAX) {
+               up->latency_n++;
+       }
+
+       RSPAMD_UPSTREAM_UNLOCK(up);
+}
+
+double rspamd_upstream_get_latency(const struct upstream *up)
+{
+       if (up == NULL) {
+               return 0.0;
+       }
+       return up->latency_ewma;
+}
+
 /*
  * Calculate token cost for a message of given size
  */
index f9061eb7bdc773b83ba1b2ced7fd74240d4b2bb2..246a0f800adcc7a4ce51e83bdcabc3c5ef6361d6 100644 (file)
@@ -385,6 +385,31 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups,
 void rspamd_upstreams_set_slow_start(struct upstream_list *ups,
                                                                         unsigned int slow_start_ms);
 
+/**
+ * Record a per-request latency observation for the upstream.
+ * Updates a time-weighted EWMA that decays old samples on a
+ * configurable half-life. The EWMA feeds into P2C selection so
+ * faster backends are preferred when load is otherwise comparable.
+ * Cheap to call; no allocation.
+ * @param up upstream
+ * @param seconds observed latency (e.g. request RTT)
+ */
+void rspamd_upstream_record_latency(struct upstream *up, double seconds);
+
+/**
+ * Read the current latency EWMA in seconds. Zero if no samples yet.
+ */
+double rspamd_upstream_get_latency(const struct upstream *up);
+
+/**
+ * Configure latency EWMA half-life. Defaults to 60s; setting to 0
+ * disables time-weighting (becomes a flat moving average).
+ * @param ups upstream list
+ * @param half_life_s decay half-life in seconds
+ */
+void rspamd_upstreams_set_latency_half_life(struct upstream_list *ups,
+                                                                                       double half_life_s);
+
 /**
  * Get upstream using token bucket algorithm.
  * Selects upstream with lowest inflight tokens (weighted by message size).
index e48a8c726d3d8a74695a1eeaa37563e067f514ef..c0beddf386bfac7aedde1a4900e1367a744529f1 100644 (file)
@@ -33,6 +33,7 @@
 #include "rspamd_cxx_unit_upstream_round_robin.hxx"
 #include "rspamd_cxx_unit_upstream_p2c.hxx"
 #include "rspamd_cxx_unit_upstream_slow_start.hxx"
+#include "rspamd_cxx_unit_upstream_latency.hxx"
 #include "rspamd_cxx_unit_multipart.hxx"
 #include "rspamd_cxx_unit_settings_merge.hxx"
 
diff --git a/test/rspamd_cxx_unit_upstream_latency.hxx b/test/rspamd_cxx_unit_upstream_latency.hxx
new file mode 100644 (file)
index 0000000..78a0053
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Unit tests for upstream latency EWMA tracking and P2C integration */
+
+#ifndef RSPAMD_CXX_UNIT_UPSTREAM_LATENCY_HXX
+#define RSPAMD_CXX_UNIT_UPSTREAM_LATENCY_HXX
+
+#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL
+#include "doctest/doctest.h"
+
+#include "libutil/upstream.h"
+
+#include <cmath>
+#include <map>
+#include <string>
+
+TEST_SUITE("upstream_latency")
+{
+       TEST_CASE("first sample sets the EWMA exactly")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+
+               CHECK(rspamd_upstream_get_latency(up) == 0.0);
+               rspamd_upstream_record_latency(up, 0.123);
+               CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.123));
+
+               rspamd_upstream_ok(up);
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("repeated samples converge toward steady value")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+
+               /* Feed 200 identical 0.05s samples; EWMA must converge to 0.05. */
+               for (int i = 0; i < 200; i++) {
+                       rspamd_upstream_record_latency(up, 0.05);
+               }
+               CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.05).epsilon(0.01));
+
+               rspamd_upstream_ok(up);
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("step change is reflected after enough samples")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+
+               /* Steady at 1.0s, then a step to 0.1s. After many samples at 0.1
+                * the EWMA should be much closer to 0.1 than to 1.0. */
+               for (int i = 0; i < 50; i++) {
+                       rspamd_upstream_record_latency(up, 1.0);
+               }
+               double slow = rspamd_upstream_get_latency(up);
+               CHECK(slow > 0.5);
+
+               for (int i = 0; i < 500; i++) {
+                       rspamd_upstream_record_latency(up, 0.1);
+               }
+               double fast = rspamd_upstream_get_latency(up);
+               CHECK(fast < slow);
+               CHECK(fast < 0.5);
+
+               rspamd_upstream_ok(up);
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("negative samples are ignored")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+
+               rspamd_upstream_record_latency(up, 0.05);
+               double before = rspamd_upstream_get_latency(up);
+               rspamd_upstream_record_latency(up, -1.0);
+               double after = rspamd_upstream_get_latency(up);
+               CHECK(after == doctest::Approx(before));
+
+               rspamd_upstream_ok(up);
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("set_latency_half_life accepts and clamps")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               rspamd_upstreams_set_latency_half_life(ups, 30.0);
+               rspamd_upstreams_set_latency_half_life(ups, 0);
+               rspamd_upstreams_set_latency_half_life(ups, -5);
+
+               auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+               rspamd_upstream_record_latency(up, 0.2);
+               CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.2));
+               rspamd_upstream_ok(up);
+
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("P2C prefers the lower-latency upstream when load matches")
+       {
+               auto *ctx = rspamd_upstreams_library_init();
+               auto *ups = rspamd_upstreams_create(ctx);
+               rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+               REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.2:11333", 11333,
+                                                                                         RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr));
+
+               /* Collect the two distinct upstream pointers via repeated selection. */
+               std::map<std::string, struct upstream *> seen;
+               for (int tries = 0; tries < 200 && seen.size() < 2; tries++) {
+                       auto *u = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(u != nullptr);
+                       seen[rspamd_upstream_name(u)] = u;
+                       rspamd_upstream_ok(u);
+               }
+               REQUIRE(seen.size() == 2);
+
+               auto it = seen.begin();
+               struct upstream *fast = it->second;
+               std::string fast_name = it->first;
+               ++it;
+               struct upstream *slow = it->second;
+               std::string slow_name = it->first;
+
+               /* Plant clear EWMAs: fast = 10ms, slow = 500ms. */
+               for (int i = 0; i < 30; i++) {
+                       rspamd_upstream_record_latency(fast, 0.01);
+                       rspamd_upstream_record_latency(slow, 0.5);
+               }
+               CHECK(rspamd_upstream_get_latency(fast) < rspamd_upstream_get_latency(slow));
+
+               std::map<std::string, int> hits;
+               for (int i = 0; i < 1000; i++) {
+                       auto *u = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(u != nullptr);
+                       hits[rspamd_upstream_name(u)]++;
+                       rspamd_upstream_ok(u);
+               }
+
+               /* Latency-aware P2C should heavily favour the fast upstream. */
+               CHECK(hits[fast_name] > hits[slow_name] * 2);
+
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       TEST_CASE("null/zero arguments handled safely")
+       {
+               rspamd_upstream_record_latency(nullptr, 0.1);
+               CHECK(rspamd_upstream_get_latency(nullptr) == 0.0);
+       }
+}
+
+#endif