]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] upstream: add Power of Two Choices (P2C) selection
authorVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:33:31 +0000 (09:33 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:33:31 +0000 (09:33 +0100)
P2C samples two alive upstreams uniformly at random and chooses the
one with the lower load score (inflight + errors*2). Provably within
a constant factor of optimal max-load and the modern default for
load-aware random selection (Envoy LEAST_REQUEST, Finagle, NGINX
least_conn).

A passive in-flight counter on struct upstream is incremented on every
selection in get_common and in get_token_bucket, decremented in ok()
and fail(); the existing caller contract (every get pairs with one
ok or fail) is preserved without any new public API.

RSPAMD_UPSTREAM_RANDOM callers are silently upgraded to P2C since it
strictly dominates uniform random with no extra cost. The token-bucket
fallback when message size is unavailable also uses P2C now.

Tests: new upstream_p2c suite (7 cases, 800+ assertions) covers
single-upstream cases, the silent RANDOM upgrade, load-aware bias
toward idle upstreams, and balanced inflight tracking under mixed
ok/fail outcomes.

src/libutil/upstream.c
src/libutil/upstream.h
test/rspamd_cxx_unit.cxx
test/rspamd_cxx_unit_upstream_p2c.hxx [new file with mode: 0644]

index c3cbfb033802a40f8eb6d543c337697e830a1fe1..f45a8cc73c1390c03d1048c7c65c9f8529c01807 100644 (file)
@@ -61,6 +61,12 @@ struct upstream {
        unsigned int errors;
        unsigned int checked;
        unsigned int dns_requests;
+       /*
+        * Passive in-flight counter: incremented on every selection via
+        * rspamd_upstream_get_common, decremented in rspamd_upstream_ok /
+        * rspamd_upstream_fail. Used by P2C as the load comparator.
+        */
+       unsigned int inflight;
        int active_idx;
        unsigned int ttl;
        char *name;
@@ -1134,6 +1140,11 @@ void rspamd_upstream_fail(struct upstream *upstream,
                                           upstream->name,
                                           reason);
 
+       /* Pair with the increment in rspamd_upstream_get_common. */
+       if (upstream->inflight > 0) {
+               upstream->inflight--;
+       }
+
        if (upstream->ctx && upstream->active_idx != -1 && upstream->ls) {
                sec_cur = rspamd_get_ticks(FALSE);
 
@@ -1257,6 +1268,10 @@ void rspamd_upstream_ok(struct upstream *upstream)
        struct upstream_list_watcher *w;
 
        RSPAMD_UPSTREAM_LOCK(upstream);
+       /* Pair with the increment in rspamd_upstream_get_common. */
+       if (upstream->inflight > 0) {
+               upstream->inflight--;
+       }
        /* Success handling */
        if (upstream->half_open_inflight > 0) {
                /* Successful probe: mark alive and reset backoff */
@@ -1994,6 +2009,64 @@ rspamd_upstream_get_random(struct upstream_list *ups,
        }
 }
 
+/*
+ * Load score used by P2C: combines passive in-flight count with a small
+ * penalty for recent errors. Lower is better. The errors term keeps
+ * P2C biased away from a flapping upstream that hasn't yet accumulated
+ * enough failures to be marked dead.
+ */
+static inline unsigned int
+rspamd_upstream_load_score(const struct upstream *up)
+{
+       return up->inflight + up->errors * 2;
+}
+
+/*
+ * Power of Two Choices: pick two distinct alive upstreams uniformly at
+ * random and return the one with the lower load score. Provably within a
+ * constant factor of optimal max-load and the modern default for
+ * load-aware random selection.
+ */
+static struct upstream *
+rspamd_upstream_get_p2c(struct upstream_list *ups, struct upstream *except)
+{
+       unsigned int n = ups->alive->len;
+       struct upstream *a, *b;
+
+       if (n == 0) {
+               return NULL;
+       }
+       if (n == 1) {
+               a = g_ptr_array_index(ups->alive, 0);
+               return (except != NULL && a == except) ? NULL : a;
+       }
+       if (n == 2 && except != NULL) {
+               /* If one of the two is excluded, the choice is forced. */
+               a = g_ptr_array_index(ups->alive, 0);
+               b = g_ptr_array_index(ups->alive, 1);
+               if (a == except) return b;
+               if (b == except) return a;
+               /* Neither excluded: fall through to standard P2C. */
+       }
+
+       /* Sample two distinct indices. */
+       unsigned int i = ottery_rand_range(n - 1);
+       unsigned int j;
+       do {
+               j = ottery_rand_range(n - 1);
+       } while (j == i);
+
+       a = g_ptr_array_index(ups->alive, i);
+       b = g_ptr_array_index(ups->alive, j);
+
+       if (except != NULL) {
+               if (a == except) return b;
+               if (b == except) return a;
+       }
+
+       return rspamd_upstream_load_score(a) <= rspamd_upstream_load_score(b) ? a : b;
+}
+
 static struct upstream *
 rspamd_upstream_get_round_robin(struct upstream_list *ups,
                                                                struct upstream *except,
@@ -2321,7 +2394,8 @@ rspamd_upstream_get_common(struct upstream_list *ups,
        RSPAMD_UPSTREAM_UNLOCK(ups);
 
        if (ups->alive->len == 1 && default_type != RSPAMD_UPSTREAM_SEQUENTIAL) {
-               /* Fast path */
+               /* Fast path: single alive upstream is returned even when it equals
+                * `except` (documented last-resort behaviour to avoid NULL return). */
                up = g_ptr_array_index(ups->alive, 0);
                goto end;
        }
@@ -2341,7 +2415,14 @@ rspamd_upstream_get_common(struct upstream_list *ups,
        switch (type) {
        default:
        case RSPAMD_UPSTREAM_RANDOM:
-               up = rspamd_upstream_get_random(ups, except);
+               /*
+                * Silent upgrade: P2C strictly dominates uniform random. Existing
+                * RANDOM callers get load-aware selection at no cost.
+                */
+               up = rspamd_upstream_get_p2c(ups, except);
+               break;
+       case RSPAMD_UPSTREAM_P2C:
+               up = rspamd_upstream_get_p2c(ups, except);
                break;
        case RSPAMD_UPSTREAM_HASHED:
                up = rspamd_upstream_get_hashed(ups, except, key, keylen);
@@ -2355,10 +2436,10 @@ rspamd_upstream_get_common(struct upstream_list *ups,
        case RSPAMD_UPSTREAM_TOKEN_BUCKET:
                /*
                 * Token bucket requires message size, which isn't available here.
-                * Fall back to round robin. Use rspamd_upstream_get_token_bucket()
-                * for proper token bucket selection.
+                * Fall back to P2C. Use rspamd_upstream_get_token_bucket() for
+                * proper token bucket selection.
                 */
-               up = rspamd_upstream_get_round_robin(ups, except, TRUE);
+               up = rspamd_upstream_get_p2c(ups, except);
                break;
        case RSPAMD_UPSTREAM_SEQUENTIAL:
                if (ups->cur_elt >= ups->alive->len) {
@@ -2373,6 +2454,7 @@ rspamd_upstream_get_common(struct upstream_list *ups,
 end:
        if (up) {
                up->checked++;
+               up->inflight++;
        }
 
        return up;
@@ -2716,6 +2798,7 @@ rspamd_upstream_get_token_bucket(struct upstream_list *ups,
                selected->inflight_tokens += token_cost;
                *reserved_tokens = token_cost;
                selected->checked++;
+               selected->inflight++; /* paired with ok()/fail() decrement */
        }
 
        RSPAMD_UPSTREAM_UNLOCK(ups);
index 5af3a1b6326b2c93b73e2952c4fa7831010607a7..406bd2e7776dadee0ca92533af115adf7b9b3689 100644 (file)
@@ -36,6 +36,13 @@ enum rspamd_upstream_rotation {
        RSPAMD_UPSTREAM_MASTER_SLAVE,
        RSPAMD_UPSTREAM_SEQUENTIAL,
        RSPAMD_UPSTREAM_TOKEN_BUCKET, /* Token bucket weighted balancing */
+       /*
+        * Power of Two Choices: pick two alive upstreams at random, choose the
+        * one with lower load (inflight + recent errors). Provably within a
+        * constant factor of optimal max-load. RANDOM callers are silently
+        * upgraded to P2C since it strictly dominates uniform random.
+        */
+       RSPAMD_UPSTREAM_P2C,
        RSPAMD_UPSTREAM_UNDEF
 };
 
index 00bec5dd86294a1166587b512833656304828934..681b1c8864e43c1db4ca1e156a6a701d0e64cb2b 100644 (file)
@@ -31,6 +31,7 @@
 #include "rspamd_cxx_unit_upstream_token_bucket.hxx"
 #include "rspamd_cxx_unit_upstream_ring_hash.hxx"
 #include "rspamd_cxx_unit_upstream_round_robin.hxx"
+#include "rspamd_cxx_unit_upstream_p2c.hxx"
 #include "rspamd_cxx_unit_multipart.hxx"
 #include "rspamd_cxx_unit_settings_merge.hxx"
 
diff --git a/test/rspamd_cxx_unit_upstream_p2c.hxx b/test/rspamd_cxx_unit_upstream_p2c.hxx
new file mode 100644 (file)
index 0000000..d183475
--- /dev/null
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Unit tests for upstream Power-of-Two-Choices (P2C) selection */
+
+#ifndef RSPAMD_CXX_UNIT_UPSTREAM_P2C_HXX
+#define RSPAMD_CXX_UNIT_UPSTREAM_P2C_HXX
+
+#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL
+#include "doctest/doctest.h"
+
+#include "libutil/upstream.h"
+
+#include <map>
+#include <string>
+
+struct p2c_test_ctx {
+       struct upstream_ctx *ctx;
+       struct upstream_list *ups;
+
+       explicit p2c_test_ctx(unsigned int n)
+       {
+               ctx = rspamd_upstreams_library_init();
+               ups = rspamd_upstreams_create(ctx);
+               rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C);
+
+               for (unsigned int i = 0; i < n; i++) {
+                       char addr[32];
+                       snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1);
+                       auto ok = rspamd_upstreams_add_upstream(ups, addr, 11333,
+                                                                                                       RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr);
+                       REQUIRE(ok);
+               }
+       }
+
+       ~p2c_test_ctx()
+       {
+               rspamd_upstreams_destroy(ups);
+               rspamd_upstreams_library_unref(ctx);
+       }
+
+       p2c_test_ctx(const p2c_test_ctx &) = delete;
+       p2c_test_ctx &operator=(const p2c_test_ctx &) = delete;
+};
+
+TEST_SUITE("upstream_p2c")
+{
+       TEST_CASE("single upstream is selectable")
+       {
+               p2c_test_ctx t(1);
+               auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+               rspamd_upstream_ok(up);
+       }
+
+       TEST_CASE("single upstream returned even when excluded (last resort)")
+       {
+               /*
+                * Documented behaviour shared with the other rotations: when only
+                * one upstream is alive, the _get_common fast path returns it even
+                * when it matches `except`, to avoid leaving the caller with no
+                * candidate at all.
+                */
+               p2c_test_ctx t(1);
+               auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(up != nullptr);
+               rspamd_upstream_ok(up);
+
+               auto *other = rspamd_upstream_get_except(t.ups, up, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               CHECK(other == up);
+               rspamd_upstream_ok(other);
+       }
+
+       TEST_CASE("two upstreams: except forces the other")
+       {
+               p2c_test_ctx t(2);
+               auto *first = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(first != nullptr);
+               auto first_name = std::string(rspamd_upstream_name(first));
+               rspamd_upstream_ok(first);
+
+               auto *second = rspamd_upstream_get_except(t.ups, first, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+               REQUIRE(second != nullptr);
+               CHECK(std::string(rspamd_upstream_name(second)) != first_name);
+               rspamd_upstream_ok(second);
+       }
+
+       TEST_CASE("RANDOM rotation silently uses P2C path")
+       {
+               /* Nothing observable changes for a healthy fleet, but the request
+                * must succeed identically to an explicit P2C rotation. */
+               p2c_test_ctx t(3);
+               rspamd_upstreams_set_rotation(t.ups, RSPAMD_UPSTREAM_RANDOM);
+
+               for (int i = 0; i < 100; i++) {
+                       auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_RANDOM, nullptr, 0);
+                       REQUIRE(up != nullptr);
+                       rspamd_upstream_ok(up);
+               }
+       }
+
+       TEST_CASE("loaded upstream is picked less often than idle one")
+       {
+               /*
+                * Strategy: build a 4-upstream fleet, then deliberately leak inflight
+                * on one upstream by calling get without ok/fail. With pure random
+                * we'd expect ~25% selections of the loaded one; with P2C the load
+                * comparator should make it strictly under 25% over a large run.
+                */
+               p2c_test_ctx t(4);
+
+               /* Pick one upstream and leak inflight on it 10 times. */
+               struct upstream *loaded = nullptr;
+               {
+                       auto *first = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(first != nullptr);
+                       loaded = first;
+                       /* Don't ok/fail: keeps inflight high. */
+                       for (int i = 0; i < 9; i++) {
+                               /* Force selection of the same one by always calling get_except
+                                * on the others; not exact, but get_p2c samples from all 4 so
+                                * we can't pin selection. Instead, leak by directly counting.
+                                *
+                                * The intrusive way: just call get() 10 more times and ok()
+                                * everything except 'loaded'. */
+                               auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                               REQUIRE(up != nullptr);
+                               if (up == loaded) {
+                                       /* Skip ok/fail to leave inflight high on the loaded one. */
+                                       continue;
+                               }
+                               rspamd_upstream_ok(up);
+                       }
+               }
+
+               auto loaded_name = std::string(rspamd_upstream_name(loaded));
+
+               /* Now run 1000 selections, ok-ing each immediately, and count hits
+                * to the loaded upstream. */
+               std::map<std::string, int> hits;
+               for (int i = 0; i < 1000; i++) {
+                       auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(up != nullptr);
+                       hits[rspamd_upstream_name(up)]++;
+                       rspamd_upstream_ok(up);
+               }
+
+               int loaded_hits = hits[loaded_name];
+
+               /*
+                * P2C with N=4 and one heavily loaded upstream: probability that an
+                * upstream is picked is the probability that both samples include
+                * it AND its score is the lowest. Theoretical analysis is messy
+                * because the loaded one starts with high inflight that gets
+                * decremented by ok() across the run; we just assert it's noticeably
+                * lower than uniform random would give (~250).
+                */
+               CHECK(loaded_hits < 250);
+       }
+
+       TEST_CASE("inflight tracking via ok/fail balances out")
+       {
+               /* After get/ok pairs, inflight should round-trip to zero so the
+                * comparator behaves the same as a fresh fleet. We verify by
+                * exhausting many rounds and checking the distribution stays
+                * roughly uniform. */
+               p2c_test_ctx t(3);
+               std::map<std::string, int> hits;
+
+               for (int i = 0; i < 3000; i++) {
+                       auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(up != nullptr);
+                       hits[rspamd_upstream_name(up)]++;
+                       rspamd_upstream_ok(up);
+               }
+
+               CHECK(hits.size() == 3);
+               for (const auto &[name, count]: hits) {
+                       /* Each upstream gets ~1000 selections; ±25% tolerance. */
+                       CHECK(count >= 750);
+                       CHECK(count <= 1250);
+               }
+       }
+
+       TEST_CASE("get/fail rounds keep inflight bounded")
+       {
+               /*
+                * Without the inflight-decrement on fail, repeated get/fail rounds
+                * would let `inflight` grow unboundedly on whichever upstream the
+                * P2C comparator keeps choosing first, eventually starving the
+                * other one entirely. With the fix, inflight stays bounded and
+                * selection drift stays moderate.
+                *
+                * Run many iterations alternating get/fail; the loser side of P2C
+                * (the other upstream) must still be selected occasionally.
+                */
+               p2c_test_ctx t(2);
+               std::map<std::string, int> hits;
+
+               for (int i = 0; i < 1000; i++) {
+                       auto *u = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0);
+                       REQUIRE(u != nullptr);
+                       hits[rspamd_upstream_name(u)]++;
+                       /* Mix of fail and ok keeps both error counts bounded. */
+                       if (i % 2 == 0) {
+                               rspamd_upstream_fail(u, FALSE, "test");
+                       }
+                       else {
+                               rspamd_upstream_ok(u);
+                       }
+               }
+
+               CHECK(hits.size() == 2);
+               for (const auto &[name, count]: hits) {
+                       /* Both upstreams must be reachable. With the fix, inflight stays
+                        * bounded and selection isn't permanently skewed. */
+                       CHECK(count > 0);
+               }
+       }
+}
+
+#endif