From: Vsevolod Stakhov Date: Fri, 1 May 2026 08:33:31 +0000 (+0100) Subject: [Feature] upstream: add Power of Two Choices (P2C) selection X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=e558c38bbda86e760d43857ec188805c7aa5ac08;p=thirdparty%2Frspamd.git [Feature] upstream: add Power of Two Choices (P2C) selection P2C samples two alive upstreams uniformly at random and chooses the one with the lower load score (inflight + errors*2). Provably within a constant factor of optimal max-load and the modern default for load-aware random selection (Envoy LEAST_REQUEST, Finagle, NGINX least_conn). A passive in-flight counter on struct upstream is incremented on every selection in get_common and in get_token_bucket, decremented in ok() and fail(); the existing caller contract (every get pairs with one ok or fail) is preserved without any new public API. RSPAMD_UPSTREAM_RANDOM callers are silently upgraded to P2C since it strictly dominates uniform random with no extra cost. The token-bucket fallback when message size is unavailable also uses P2C now. Tests: new upstream_p2c suite (7 cases, 800+ assertions) covers single-upstream cases, the silent RANDOM upgrade, load-aware bias toward idle upstreams, and balanced inflight tracking under mixed ok/fail outcomes. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index c3cbfb0338..f45a8cc73c 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -61,6 +61,12 @@ struct upstream { unsigned int errors; unsigned int checked; unsigned int dns_requests; + /* + * Passive in-flight counter: incremented on every selection via + * rspamd_upstream_get_common, decremented in rspamd_upstream_ok / + * rspamd_upstream_fail. Used by P2C as the load comparator. + */ + unsigned int inflight; int active_idx; unsigned int ttl; char *name; @@ -1134,6 +1140,11 @@ void rspamd_upstream_fail(struct upstream *upstream, upstream->name, reason); + /* Pair with the increment in rspamd_upstream_get_common. */ + if (upstream->inflight > 0) { + upstream->inflight--; + } + if (upstream->ctx && upstream->active_idx != -1 && upstream->ls) { sec_cur = rspamd_get_ticks(FALSE); @@ -1257,6 +1268,10 @@ void rspamd_upstream_ok(struct upstream *upstream) struct upstream_list_watcher *w; RSPAMD_UPSTREAM_LOCK(upstream); + /* Pair with the increment in rspamd_upstream_get_common. */ + if (upstream->inflight > 0) { + upstream->inflight--; + } /* Success handling */ if (upstream->half_open_inflight > 0) { /* Successful probe: mark alive and reset backoff */ @@ -1994,6 +2009,64 @@ rspamd_upstream_get_random(struct upstream_list *ups, } } +/* + * Load score used by P2C: combines passive in-flight count with a small + * penalty for recent errors. Lower is better. The errors term keeps + * P2C biased away from a flapping upstream that hasn't yet accumulated + * enough failures to be marked dead. + */ +static inline unsigned int +rspamd_upstream_load_score(const struct upstream *up) +{ + return up->inflight + up->errors * 2; +} + +/* + * Power of Two Choices: pick two distinct alive upstreams uniformly at + * random and return the one with the lower load score. Provably within a + * constant factor of optimal max-load and the modern default for + * load-aware random selection. + */ +static struct upstream * +rspamd_upstream_get_p2c(struct upstream_list *ups, struct upstream *except) +{ + unsigned int n = ups->alive->len; + struct upstream *a, *b; + + if (n == 0) { + return NULL; + } + if (n == 1) { + a = g_ptr_array_index(ups->alive, 0); + return (except != NULL && a == except) ? NULL : a; + } + if (n == 2 && except != NULL) { + /* If one of the two is excluded, the choice is forced. */ + a = g_ptr_array_index(ups->alive, 0); + b = g_ptr_array_index(ups->alive, 1); + if (a == except) return b; + if (b == except) return a; + /* Neither excluded: fall through to standard P2C. */ + } + + /* Sample two distinct indices. */ + unsigned int i = ottery_rand_range(n - 1); + unsigned int j; + do { + j = ottery_rand_range(n - 1); + } while (j == i); + + a = g_ptr_array_index(ups->alive, i); + b = g_ptr_array_index(ups->alive, j); + + if (except != NULL) { + if (a == except) return b; + if (b == except) return a; + } + + return rspamd_upstream_load_score(a) <= rspamd_upstream_load_score(b) ? a : b; +} + static struct upstream * rspamd_upstream_get_round_robin(struct upstream_list *ups, struct upstream *except, @@ -2321,7 +2394,8 @@ rspamd_upstream_get_common(struct upstream_list *ups, RSPAMD_UPSTREAM_UNLOCK(ups); if (ups->alive->len == 1 && default_type != RSPAMD_UPSTREAM_SEQUENTIAL) { - /* Fast path */ + /* Fast path: single alive upstream is returned even when it equals + * `except` (documented last-resort behaviour to avoid NULL return). */ up = g_ptr_array_index(ups->alive, 0); goto end; } @@ -2341,7 +2415,14 @@ rspamd_upstream_get_common(struct upstream_list *ups, switch (type) { default: case RSPAMD_UPSTREAM_RANDOM: - up = rspamd_upstream_get_random(ups, except); + /* + * Silent upgrade: P2C strictly dominates uniform random. Existing + * RANDOM callers get load-aware selection at no cost. + */ + up = rspamd_upstream_get_p2c(ups, except); + break; + case RSPAMD_UPSTREAM_P2C: + up = rspamd_upstream_get_p2c(ups, except); break; case RSPAMD_UPSTREAM_HASHED: up = rspamd_upstream_get_hashed(ups, except, key, keylen); @@ -2355,10 +2436,10 @@ rspamd_upstream_get_common(struct upstream_list *ups, case RSPAMD_UPSTREAM_TOKEN_BUCKET: /* * Token bucket requires message size, which isn't available here. - * Fall back to round robin. Use rspamd_upstream_get_token_bucket() - * for proper token bucket selection. + * Fall back to P2C. Use rspamd_upstream_get_token_bucket() for + * proper token bucket selection. */ - up = rspamd_upstream_get_round_robin(ups, except, TRUE); + up = rspamd_upstream_get_p2c(ups, except); break; case RSPAMD_UPSTREAM_SEQUENTIAL: if (ups->cur_elt >= ups->alive->len) { @@ -2373,6 +2454,7 @@ rspamd_upstream_get_common(struct upstream_list *ups, end: if (up) { up->checked++; + up->inflight++; } return up; @@ -2716,6 +2798,7 @@ rspamd_upstream_get_token_bucket(struct upstream_list *ups, selected->inflight_tokens += token_cost; *reserved_tokens = token_cost; selected->checked++; + selected->inflight++; /* paired with ok()/fail() decrement */ } RSPAMD_UPSTREAM_UNLOCK(ups); diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 5af3a1b632..406bd2e777 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -36,6 +36,13 @@ enum rspamd_upstream_rotation { RSPAMD_UPSTREAM_MASTER_SLAVE, RSPAMD_UPSTREAM_SEQUENTIAL, RSPAMD_UPSTREAM_TOKEN_BUCKET, /* Token bucket weighted balancing */ + /* + * Power of Two Choices: pick two alive upstreams at random, choose the + * one with lower load (inflight + recent errors). Provably within a + * constant factor of optimal max-load. RANDOM callers are silently + * upgraded to P2C since it strictly dominates uniform random. + */ + RSPAMD_UPSTREAM_P2C, RSPAMD_UPSTREAM_UNDEF }; diff --git a/test/rspamd_cxx_unit.cxx b/test/rspamd_cxx_unit.cxx index 00bec5dd86..681b1c8864 100644 --- a/test/rspamd_cxx_unit.cxx +++ b/test/rspamd_cxx_unit.cxx @@ -31,6 +31,7 @@ #include "rspamd_cxx_unit_upstream_token_bucket.hxx" #include "rspamd_cxx_unit_upstream_ring_hash.hxx" #include "rspamd_cxx_unit_upstream_round_robin.hxx" +#include "rspamd_cxx_unit_upstream_p2c.hxx" #include "rspamd_cxx_unit_multipart.hxx" #include "rspamd_cxx_unit_settings_merge.hxx" diff --git a/test/rspamd_cxx_unit_upstream_p2c.hxx b/test/rspamd_cxx_unit_upstream_p2c.hxx new file mode 100644 index 0000000000..d183475311 --- /dev/null +++ b/test/rspamd_cxx_unit_upstream_p2c.hxx @@ -0,0 +1,235 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Unit tests for upstream Power-of-Two-Choices (P2C) selection */ + +#ifndef RSPAMD_CXX_UNIT_UPSTREAM_P2C_HXX +#define RSPAMD_CXX_UNIT_UPSTREAM_P2C_HXX + +#define DOCTEST_CONFIG_IMPLEMENTATION_IN_DLL +#include "doctest/doctest.h" + +#include "libutil/upstream.h" + +#include +#include + +struct p2c_test_ctx { + struct upstream_ctx *ctx; + struct upstream_list *ups; + + explicit p2c_test_ctx(unsigned int n) + { + ctx = rspamd_upstreams_library_init(); + ups = rspamd_upstreams_create(ctx); + rspamd_upstreams_set_rotation(ups, RSPAMD_UPSTREAM_P2C); + + for (unsigned int i = 0; i < n; i++) { + char addr[32]; + snprintf(addr, sizeof(addr), "127.0.0.%u:11333", i + 1); + auto ok = rspamd_upstreams_add_upstream(ups, addr, 11333, + RSPAMD_UPSTREAM_PARSE_DEFAULT, nullptr); + REQUIRE(ok); + } + } + + ~p2c_test_ctx() + { + rspamd_upstreams_destroy(ups); + rspamd_upstreams_library_unref(ctx); + } + + p2c_test_ctx(const p2c_test_ctx &) = delete; + p2c_test_ctx &operator=(const p2c_test_ctx &) = delete; +}; + +TEST_SUITE("upstream_p2c") +{ + TEST_CASE("single upstream is selectable") + { + p2c_test_ctx t(1); + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + rspamd_upstream_ok(up); + } + + TEST_CASE("single upstream returned even when excluded (last resort)") + { + /* + * Documented behaviour shared with the other rotations: when only + * one upstream is alive, the _get_common fast path returns it even + * when it matches `except`, to avoid leaving the caller with no + * candidate at all. + */ + p2c_test_ctx t(1); + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + rspamd_upstream_ok(up); + + auto *other = rspamd_upstream_get_except(t.ups, up, RSPAMD_UPSTREAM_P2C, nullptr, 0); + CHECK(other == up); + rspamd_upstream_ok(other); + } + + TEST_CASE("two upstreams: except forces the other") + { + p2c_test_ctx t(2); + auto *first = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(first != nullptr); + auto first_name = std::string(rspamd_upstream_name(first)); + rspamd_upstream_ok(first); + + auto *second = rspamd_upstream_get_except(t.ups, first, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(second != nullptr); + CHECK(std::string(rspamd_upstream_name(second)) != first_name); + rspamd_upstream_ok(second); + } + + TEST_CASE("RANDOM rotation silently uses P2C path") + { + /* Nothing observable changes for a healthy fleet, but the request + * must succeed identically to an explicit P2C rotation. */ + p2c_test_ctx t(3); + rspamd_upstreams_set_rotation(t.ups, RSPAMD_UPSTREAM_RANDOM); + + for (int i = 0; i < 100; i++) { + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_RANDOM, nullptr, 0); + REQUIRE(up != nullptr); + rspamd_upstream_ok(up); + } + } + + TEST_CASE("loaded upstream is picked less often than idle one") + { + /* + * Strategy: build a 4-upstream fleet, then deliberately leak inflight + * on one upstream by calling get without ok/fail. With pure random + * we'd expect ~25% selections of the loaded one; with P2C the load + * comparator should make it strictly under 25% over a large run. + */ + p2c_test_ctx t(4); + + /* Pick one upstream and leak inflight on it 10 times. */ + struct upstream *loaded = nullptr; + { + auto *first = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(first != nullptr); + loaded = first; + /* Don't ok/fail: keeps inflight high. */ + for (int i = 0; i < 9; i++) { + /* Force selection of the same one by always calling get_except + * on the others; not exact, but get_p2c samples from all 4 so + * we can't pin selection. Instead, leak by directly counting. + * + * The intrusive way: just call get() 10 more times and ok() + * everything except 'loaded'. */ + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + if (up == loaded) { + /* Skip ok/fail to leave inflight high on the loaded one. */ + continue; + } + rspamd_upstream_ok(up); + } + } + + auto loaded_name = std::string(rspamd_upstream_name(loaded)); + + /* Now run 1000 selections, ok-ing each immediately, and count hits + * to the loaded upstream. */ + std::map hits; + for (int i = 0; i < 1000; i++) { + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + hits[rspamd_upstream_name(up)]++; + rspamd_upstream_ok(up); + } + + int loaded_hits = hits[loaded_name]; + + /* + * P2C with N=4 and one heavily loaded upstream: probability that an + * upstream is picked is the probability that both samples include + * it AND its score is the lowest. Theoretical analysis is messy + * because the loaded one starts with high inflight that gets + * decremented by ok() across the run; we just assert it's noticeably + * lower than uniform random would give (~250). + */ + CHECK(loaded_hits < 250); + } + + TEST_CASE("inflight tracking via ok/fail balances out") + { + /* After get/ok pairs, inflight should round-trip to zero so the + * comparator behaves the same as a fresh fleet. We verify by + * exhausting many rounds and checking the distribution stays + * roughly uniform. */ + p2c_test_ctx t(3); + std::map hits; + + for (int i = 0; i < 3000; i++) { + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + hits[rspamd_upstream_name(up)]++; + rspamd_upstream_ok(up); + } + + CHECK(hits.size() == 3); + for (const auto &[name, count]: hits) { + /* Each upstream gets ~1000 selections; ±25% tolerance. */ + CHECK(count >= 750); + CHECK(count <= 1250); + } + } + + TEST_CASE("get/fail rounds keep inflight bounded") + { + /* + * Without the inflight-decrement on fail, repeated get/fail rounds + * would let `inflight` grow unboundedly on whichever upstream the + * P2C comparator keeps choosing first, eventually starving the + * other one entirely. With the fix, inflight stays bounded and + * selection drift stays moderate. + * + * Run many iterations alternating get/fail; the loser side of P2C + * (the other upstream) must still be selected occasionally. + */ + p2c_test_ctx t(2); + std::map hits; + + for (int i = 0; i < 1000; i++) { + auto *u = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(u != nullptr); + hits[rspamd_upstream_name(u)]++; + /* Mix of fail and ok keeps both error counts bounded. */ + if (i % 2 == 0) { + rspamd_upstream_fail(u, FALSE, "test"); + } + else { + rspamd_upstream_ok(u); + } + } + + CHECK(hits.size() == 2); + for (const auto &[name, count]: hits) { + /* Both upstreams must be reachable. With the fix, inflight stays + * bounded and selection isn't permanently skewed. */ + CHECK(count > 0); + } + } +} + +#endif