From: Vsevolod Stakhov Date: Fri, 1 May 2026 08:57:32 +0000 (+0100) Subject: [Feature] upstream: per-upstream latency EWMA + P2C integration X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=d7607d2b2d4d777cab8af464039e475f984e78b6;p=thirdparty%2Frspamd.git [Feature] upstream: per-upstream latency EWMA + P2C integration Track an exponentially-weighted moving average of per-request latency on each upstream, with a configurable half-life (default 60s) so older samples decay and a once-slow-now-recovered backend isn't permanently penalised. Updates are time-weighted: alpha = 1 - exp(-dt/tau) where tau = half_life / ln(2). Setting half_life to 0 falls back to a flat moving average where every sample has equal weight. Wire it into the P2C load score: score = latency * (inflight + 1) + errors * 5 * latency when at least one sample exists; fall back to the existing inflight + errors*2 form otherwise. This is a lightweight approximation of PeakEWMA — a slow backend with low load loses to a fast one with comparable load, but a fast backend can still lose if it gets too busy. New public API: rspamd_upstream_record_latency(up, seconds) rspamd_upstream_get_latency(up) rspamd_upstreams_set_latency_half_life(ups, seconds) Callers opt in by recording observed RTT alongside their existing ok()/fail() calls. The score function falls back gracefully to Phase 1 behaviour for upstream lists where no caller has wired up sampling yet, so this commit is a no-op for current users. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 59a9590681..bead2cf87d 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -84,6 +84,15 @@ struct upstream { * up linearly from 0 to 1. */ double revived_at; + + /* + * Latency EWMA in seconds. Zero when no samples have been recorded. + * Updated by rspamd_upstream_record_latency with time-weighted decay + * controlled by upstream_limits.latency_half_life_s. + */ + double latency_ewma; + double latency_last_at; + unsigned int latency_n; gpointer ud; enum rspamd_upstream_flag flags; struct upstream_list *ls; @@ -142,6 +151,13 @@ struct upstream_limits { * lands on the just-revived backend. Default 0 (disabled). */ unsigned int slow_start_ms; + + /* + * Latency EWMA half-life in seconds. Larger = slower to react, smaller + * = noisier. Default 60.0. Set to 0 to weight every sample equally + * (degrades to a 1/n moving average regardless of inter-arrival time). + */ + double latency_half_life_s; }; struct upstream_list { @@ -231,6 +247,8 @@ static const double default_probe_jitter = DEFAULT_PROBE_JITTER; #define DEFAULT_TOKEN_BUCKET_BASE_COST 10 /* Default refill rate: full bucket regenerates in 60s of wall time. */ #define DEFAULT_TOKEN_BUCKET_REFILL_PER_S (DEFAULT_TOKEN_BUCKET_MAX / 60) +/* EWMA half-life: stale samples lose half their weight every 60s. */ +#define DEFAULT_LATENCY_HALF_LIFE_S 60.0 /* * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the @@ -255,6 +273,7 @@ static const struct upstream_limits default_limits = { .token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN, .token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST, .token_bucket_refill_per_s = DEFAULT_TOKEN_BUCKET_REFILL_PER_S, + .latency_half_life_s = DEFAULT_LATENCY_HALF_LIFE_S, }; static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int); @@ -2066,20 +2085,39 @@ rspamd_upstream_slow_start_factor(struct upstream *up, double now) /* * Load score used by P2C: combines passive in-flight count with a small - * penalty for recent errors. Lower is better. The errors term keeps - * P2C biased away from a flapping upstream that hasn't yet accumulated - * enough failures to be marked dead. + * penalty for recent errors and (when available) latency EWMA. Lower is + * better. + * + * Phase 2 score, when latency samples exist: + * score = latency * (inflight + 1) + errors_penalty + * + * This is a lightweight approximation of PeakEWMA used by Linkerd/Finagle: + * a slow backend with low load still loses to a fast one with comparable + * load; a fast backend with high load can still lose to an idle slow one + * if the latency gap is small enough. * - * During slow start, scale the score *up* by the inverse factor so that - * a barely-warmed-up upstream looks loaded relative to its peers and + * Phase 1 fallback (no latency yet): + * score = inflight + errors * 2 + * + * During slow start the score is scaled *up* by the inverse factor so a + * barely-warmed-up upstream looks loaded relative to its peers and * receives proportionally less traffic. */ static inline double rspamd_upstream_load_score(struct upstream *up, double now) { - double base = (double) up->inflight + (double) up->errors * 2.0; - double factor = rspamd_upstream_slow_start_factor(up, now); + double base; + double factor; + + if (up->latency_n > 0 && up->latency_ewma > 0) { + base = up->latency_ewma * (double) (up->inflight + 1) + + (double) up->errors * 5.0 * up->latency_ewma; + } + else { + base = (double) up->inflight + (double) up->errors * 2.0; + } + factor = rspamd_upstream_slow_start_factor(up, now); if (factor < 1.0) { /* As factor -> 0, score -> infinity (heavily deprioritised). */ if (factor < 0.01) { @@ -2733,6 +2771,92 @@ void rspamd_upstreams_set_slow_start(struct upstream_list *ups, ups->limits = nlimits; } +void rspamd_upstreams_set_latency_half_life(struct upstream_list *ups, + double half_life_s) +{ + struct upstream_limits *nlimits; + g_assert(ups != NULL); + g_assert(ups->ctx != NULL && ups->ctx->pool != NULL); + + if (half_life_s < 0) { + half_life_s = 0; + } + nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits)); + memcpy(nlimits, ups->limits, sizeof(*nlimits)); + nlimits->latency_half_life_s = half_life_s; + ups->limits = nlimits; +} + +/* + * Time-weighted EWMA for latency. Older samples decay so that a + * once-slow-but-recovered upstream isn't forever penalised. + * + * Mathematically: alpha = 1 - exp(-dt / tau), where tau is set so the + * weight halves over `latency_half_life_s` of wall time. tau = hl/ln(2). + * + * If half_life is 0 we degrade to a flat moving average where every + * sample has equal weight regardless of arrival time. + */ +void rspamd_upstream_record_latency(struct upstream *up, double seconds) +{ + double now; + double dt; + double tau; + double alpha; + double half_life; + + if (up == NULL || seconds < 0) { + return; + } + + RSPAMD_UPSTREAM_LOCK(up); + + if (up->ctx && up->ctx->event_loop) { + now = ev_now(up->ctx->event_loop); + } + else { + now = rspamd_get_ticks(FALSE); + } + + if (up->latency_n == 0 || up->latency_last_at <= 0) { + up->latency_ewma = seconds; + } + else { + half_life = up->ls ? up->ls->limits->latency_half_life_s : DEFAULT_LATENCY_HALF_LIFE_S; + if (half_life <= 0) { + /* Flat moving average. */ + alpha = 1.0 / (double) (up->latency_n + 1); + } + else { + dt = now - up->latency_last_at; + if (dt < 0.0) { + dt = 0.0; + } + tau = half_life / 0.6931471805599453; /* ln(2) */ + alpha = 1.0 - exp(-dt / tau); + /* Cap so we never wholly forget the prior estimate. */ + if (alpha > 0.5) alpha = 0.5; + if (alpha < 0.01) alpha = 0.01; + } + up->latency_ewma = alpha * seconds + (1.0 - alpha) * up->latency_ewma; + } + + up->latency_last_at = now; + if (up->latency_n < UINT_MAX) { + up->latency_n++; + } + + RSPAMD_UPSTREAM_UNLOCK(up); +} + +double rspamd_upstream_get_latency(const struct upstream *up) +{ + if (up == NULL) { + return 0.0; + } + return up->latency_ewma; +} + /* * Calculate token cost for a message of given size */ diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index f9061eb7bd..246a0f800a 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -385,6 +385,31 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups, void rspamd_upstreams_set_slow_start(struct upstream_list *ups, unsigned int slow_start_ms); +/** + * Record a per-request latency observation for the upstream. + * Updates a time-weighted EWMA that decays old samples on a + * configurable half-life. The EWMA feeds into P2C selection so + * faster backends are preferred when load is otherwise comparable. + * Cheap to call; no allocation. + * @param up upstream + * @param seconds observed latency (e.g. request RTT) + */ +void rspamd_upstream_record_latency(struct upstream *up, double seconds); + +/** + * Read the current latency EWMA in seconds. Zero if no samples yet. + */ +double rspamd_upstream_get_latency(const struct upstream *up); + +/** + * Configure latency EWMA half-life. Defaults to 60s; setting to 0 + * disables time-weighting (becomes a flat moving average). + * @param ups upstream list + * @param half_life_s decay half-life in seconds + */ +void rspamd_upstreams_set_latency_half_life(struct upstream_list *ups, + double half_life_s); + /** * Get upstream using token bucket algorithm. * Selects upstream with lowest inflight tokens (weighted by message size). diff --git a/test/rspamd_cxx_unit.cxx b/test/rspamd_cxx_unit.cxx index e48a8c726d..c0beddf386 100644 --- a/test/rspamd_cxx_unit.cxx +++ b/test/rspamd_cxx_unit.cxx @@ -33,6 +33,7 @@ #include "rspamd_cxx_unit_upstream_round_robin.hxx" #include "rspamd_cxx_unit_upstream_p2c.hxx" #include "rspamd_cxx_unit_upstream_slow_start.hxx" +#include "rspamd_cxx_unit_upstream_latency.hxx" #include "rspamd_cxx_unit_multipart.hxx" #include "rspamd_cxx_unit_settings_merge.hxx" diff --git a/test/rspamd_cxx_unit_upstream_latency.hxx b/test/rspamd_cxx_unit_upstream_latency.hxx new file mode 100644 index 0000000000..78a0053563 --- /dev/null +++ b/test/rspamd_cxx_unit_upstream_latency.hxx @@ -0,0 +1,203 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Unit tests for upstream latency EWMA tracking and P2C integration */ + +#ifndef RSPAMD_CXX_UNIT_UPSTREAM_LATENCY_HXX +#define RSPAMD_CXX_UNIT_UPSTREAM_LATENCY_HXX + +#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL +#include "doctest/doctest.h" + +#include "libutil/upstream.h" + +#include +#include +#include + +TEST_SUITE("upstream_latency") +{ + TEST_CASE("first sample sets the EWMA exactly") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + + CHECK(rspamd_upstream_get_latency(up) == 0.0); + rspamd_upstream_record_latency(up, 0.123); + CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.123)); + + rspamd_upstream_ok(up); + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("repeated samples converge toward steady value") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + + /* Feed 200 identical 0.05s samples; EWMA must converge to 0.05. */ + for (int i = 0; i < 200; i++) { + rspamd_upstream_record_latency(up, 0.05); + } + CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.05).epsilon(0.01)); + + rspamd_upstream_ok(up); + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("step change is reflected after enough samples") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + + /* Steady at 1.0s, then a step to 0.1s. After many samples at 0.1 + * the EWMA should be much closer to 0.1 than to 1.0. */ + for (int i = 0; i < 50; i++) { + rspamd_upstream_record_latency(up, 1.0); + } + double slow = rspamd_upstream_get_latency(up); + CHECK(slow > 0.5); + + for (int i = 0; i < 500; i++) { + rspamd_upstream_record_latency(up, 0.1); + } + double fast = rspamd_upstream_get_latency(up); + CHECK(fast < slow); + CHECK(fast < 0.5); + + rspamd_upstream_ok(up); + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("negative samples are ignored") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + + rspamd_upstream_record_latency(up, 0.05); + double before = rspamd_upstream_get_latency(up); + rspamd_upstream_record_latency(up, -1.0); + double after = rspamd_upstream_get_latency(up); + CHECK(after == doctest::Approx(before)); + + rspamd_upstream_ok(up); + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("set_latency_half_life accepts and clamps") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + rspamd_upstreams_set_latency_half_life(ups, 30.0); + rspamd_upstreams_set_latency_half_life(ups, 0); + rspamd_upstreams_set_latency_half_life(ups, -5); + + auto *up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + rspamd_upstream_record_latency(up, 0.2); + CHECK(rspamd_upstream_get_latency(up) == doctest::Approx(0.2)); + rspamd_upstream_ok(up); + + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("P2C prefers the lower-latency upstream when load matches") + { + auto *ctx = rspamd_upstreams_library_init(); + auto *ups = rspamd_upstreams_create(ctx); + rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C); + + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.1:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + REQUIRE(rspamd_upstreams_add_upstream(ups, "127.0.0.2:11333", 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr)); + + /* Collect the two distinct upstream pointers via repeated selection. */ + std::map seen; + for (int tries = 0; tries < 200 && seen.size() < 2; tries++) { + auto *u = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(u != nullptr); + seen[rspamd_upstream_name(u)] = u; + rspamd_upstream_ok(u); + } + REQUIRE(seen.size() == 2); + + auto it = seen.begin(); + struct upstream *fast = it->second; + std::string fast_name = it->first; + ++it; + struct upstream *slow = it->second; + std::string slow_name = it->first; + + /* Plant clear EWMAs: fast = 10ms, slow = 500ms. */ + for (int i = 0; i < 30; i++) { + rspamd_upstream_record_latency(fast, 0.01); + rspamd_upstream_record_latency(slow, 0.5); + } + CHECK(rspamd_upstream_get_latency(fast) < rspamd_upstream_get_latency(slow)); + + std::map hits; + for (int i = 0; i < 1000; i++) { + auto *u = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(u != nullptr); + hits[rspamd_upstream_name(u)]++; + rspamd_upstream_ok(u); + } + + /* Latency-aware P2C should heavily favour the fast upstream. */ + CHECK(hits[fast_name] > hits[slow_name] * 2); + + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + TEST_CASE("null/zero arguments handled safely") + { + rspamd_upstream_record_latency(nullptr, 0.1); + CHECK(rspamd_upstream_get_latency(nullptr) == 0.0); + } +} + +#endif