From: Vsevolod Stakhov Date: Sat, 2 May 2026 10:34:25 +0000 (+0100) Subject: [Fix] upstream: add release() for non-success/failure paths X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=32ff22877d83d81dadd70459fad985c8446a23ea;p=thirdparty%2Frspamd.git [Fix] upstream: add release() for non-success/failure paths The new inflight counter introduced for P2C exposed several pre-existing leaks where a get_* selection had no matching ok()/fail() call. ok() was unsuitable as a generic retire because it also clears the error count. Add rspamd_upstream_release() — decrement inflight without touching errors, latency, or watchers — and apply at four call sites: - rspamd_proxy.c mirror loop: copy_msg failure after upstream selection - rspamd_proxy.c master loop: copy_msg failure after upstream selection - fuzzy_check.c PING: fire-and-forget address lookup - http_connection.c proxy: hand-off path where new_common drops the upstream pointer (per-request tracking left for a follow-up) Two more leak classes remain for separate PRs: Lua-side retire fallback via __gc, and librdns retransmit/select pairing in dns.c. Tests: 9 P2C cases (was 7; +2 covering release behaviour and null safety). --- diff --git a/src/libserver/http/http_connection.c b/src/libserver/http/http_connection.c index 3cdcb88448..2b49711b20 100644 --- a/src/libserver/http/http_connection.c +++ b/src/libserver/http/http_connection.c @@ -1329,6 +1329,13 @@ rspamd_http_connection_new_client(struct rspamd_http_context *ctx, return NULL; } + /* The selected proxy upstream is handed off to new_common but + * never tracked through the request lifecycle here; retire the + * inflight counter at connect-success time so P2C scoring stays + * accurate. Wiring per-request success/failure is left for a + * follow-up. */ + rspamd_upstream_release(up); + return rspamd_http_connection_new_common(ctx, fd, body_handler, error_handler, finish_handler, opts, RSPAMD_HTTP_CLIENT, diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index bead2cf87d..b240d90a60 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -1344,6 +1344,22 @@ void rspamd_upstream_ok(struct upstream *upstream) RSPAMD_UPSTREAM_UNLOCK(upstream); } +void rspamd_upstream_release(struct upstream *up) +{ + if (up == NULL) { + return; + } + + RSPAMD_UPSTREAM_LOCK(up); + /* Pair with the increment in rspamd_upstream_get_common / + * rspamd_upstream_get_token_bucket without disturbing error or + * latency state. */ + if (up->inflight > 0) { + up->inflight--; + } + RSPAMD_UPSTREAM_UNLOCK(up); +} + void rspamd_upstream_set_weight(struct upstream *up, unsigned int weight) { RSPAMD_UPSTREAM_LOCK(up); diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 246a0f800a..d8b58b6ee1 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -102,6 +102,16 @@ void rspamd_upstream_fail(struct upstream *upstream, gboolean addr_failure, cons */ void rspamd_upstream_ok(struct upstream *up); +/** + * Retire an upstream selection without affecting error counters or latency. + * Use this when neither success nor failure semantics apply: message-copy + * failures after a successful selection, fire-and-forget address lookups, + * or hand-off paths where success/failure is signalled by a different + * layer. Decrements the inflight counter so P2C load comparisons stay + * accurate; otherwise abandoned selections would skew selection forever. + */ +void rspamd_upstream_release(struct upstream *up); + /** * Set weight for an upstream * @param up diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 5c95ac6e7e..c4efd31fea 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -7170,6 +7170,10 @@ fuzzy_lua_ping_storage(lua_State *L) return 2; } addr = rspamd_upstream_addr_next(selected); + /* Fire-and-forget ping: the session below tracks the address + * directly, not the upstream, so retire the inflight counter + * immediately rather than leaking it. */ + rspamd_upstream_release(selected); } if (addr != NULL) { diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 3ea84de691..5a335e503a 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -2203,6 +2203,10 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) if (err) { g_error_free(err); } + /* Selection happened but no request will be sent: retire the + * inflight counter so P2C scoring isn't skewed by abandoned + * picks. */ + rspamd_upstream_release(bk_conn->up); continue; } @@ -2984,6 +2988,10 @@ proxy_send_master_message(struct rspamd_proxy_session *session) g_error_free(err); } + /* Selection succeeded but no request will be sent: retire the + * inflight counter so P2C scoring isn't skewed by abandoned + * picks. */ + rspamd_upstream_release(session->master_conn->up); goto err; /* No fallback here */ } diff --git a/test/rspamd_cxx_unit_upstream_p2c.hxx b/test/rspamd_cxx_unit_upstream_p2c.hxx index d183475311..28744af522 100644 --- a/test/rspamd_cxx_unit_upstream_p2c.hxx +++ b/test/rspamd_cxx_unit_upstream_p2c.hxx @@ -195,6 +195,47 @@ TEST_SUITE("upstream_p2c") } } + TEST_CASE("release retires inflight without affecting selection bias") + { + /* + * release() must decrement inflight just like ok()/fail() do, so + * abandoned selections (e.g. message-copy failures, fire-and-forget + * lookups) don't permanently skew the P2C comparator. We verify by + * leaking via release on one upstream and checking that selection + * stays balanced — unlike the "loaded upstream" test where leaking + * with no retirement skews selection away from it. + */ + p2c_test_ctx t(3); + std::map hits; + + /* Burn many get/release pairs on whatever P2C picks first. If + * release didn't retire inflight, that upstream would build up + * a load score and stop being picked. */ + for (int i = 0; i < 100; i++) { + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + rspamd_upstream_release(up); + } + + for (int i = 0; i < 1500; i++) { + auto *up = rspamd_upstream_get(t.ups, RSPAMD_UPSTREAM_P2C, nullptr, 0); + REQUIRE(up != nullptr); + hits[rspamd_upstream_name(up)]++; + rspamd_upstream_ok(up); + } + + CHECK(hits.size() == 3); + for (const auto &[name, count]: hits) { + /* Each ~500; ±40% tolerance to absorb P2C noise. */ + CHECK(count >= 300); + } + } + + TEST_CASE("release on null is a no-op") + { + rspamd_upstream_release(nullptr); + } + TEST_CASE("get/fail rounds keep inflight bounded") { /*