From: Vsevolod Stakhov Date: Fri, 6 Feb 2026 09:25:17 +0000 (+0000) Subject: [Fix] upstream: fix stale heap_idx in token bucket X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=61bed05c2c384ef8b3976bf8d43e9ecc725a1f9b;p=thirdparty%2Frspamd.git [Fix] upstream: fix stale heap_idx in token bucket The intrusive heap swaps entire structs during swim/sink, making up->heap_idx stale after any heap modification. The update function would silently skip updates when the cached index pointed to a different upstream, breaking load distribution across backends. Fix by falling back to linear search on cache miss and refreshing heap_idx after every heap update. Also add underflow warning for double-return detection and improve API documentation. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 8e6ae05459..3725e80474 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -2233,31 +2233,8 @@ init_error: } /* - * Update heap position after changing inflight_tokens - */ -static void -rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up) -{ - struct upstream_token_heap_entry *entry; - - if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) { - return; - } - - if (up->heap_idx >= rspamd_heap_size(upstream_token_heap, &ups->token_heap)) { - return; - } - - entry = rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx); - if (entry && entry->up == up) { - /* Use rspamd_heap_update to adjust position based on new priority */ - unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX); - rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri); - } -} - -/* - * Find upstream in heap by pointer (for removal or update after finding mismatch) + * Find upstream in heap by pointer (for removal or update after finding mismatch). + * Also refreshes up->heap_idx as a side effect. */ static struct upstream_token_heap_entry * rspamd_upstream_find_in_heap(struct upstream_list *ups, struct upstream *up) @@ -2275,6 +2252,54 @@ rspamd_upstream_find_in_heap(struct upstream_list *ups, struct upstream *up) return NULL; } +/* + * Update heap position after changing inflight_tokens. + * + * The intrusive heap stores elements by value and swaps entire structs during + * swim/sink operations. This means up->heap_idx can become stale after any + * heap modification (the entry at that index may now belong to a different + * upstream). We handle this by: + * 1. Trying the cached heap_idx first (fast path) + * 2. Falling back to linear search if the cache is stale + * 3. Refreshing heap_idx after the update via linear search + * + * Linear search is acceptable since upstream count is typically small (2-10). + */ +static void +rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up) +{ + struct upstream_token_heap_entry *entry = NULL; + + if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) { + return; + } + + /* Try cached index first (fast path) */ + if (up->heap_idx < rspamd_heap_size(upstream_token_heap, &ups->token_heap)) { + struct upstream_token_heap_entry *candidate = + rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx); + if (candidate && candidate->up == up) { + entry = candidate; + } + } + + /* Cache miss: linear search */ + if (!entry) { + entry = rspamd_upstream_find_in_heap(ups, up); + if (!entry) { + return; + } + } + + unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX); + rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri); + + /* Refresh heap_idx: heap_update swaps whole structs during swim/sink, + * so the entry pointer now points to whatever element ended up at that + * array slot - not necessarily our upstream. */ + rspamd_upstream_find_in_heap(ups, up); +} + struct upstream * rspamd_upstream_get_token_bucket(struct upstream_list *ups, struct upstream *except, @@ -2408,6 +2433,8 @@ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean s up->inflight_tokens -= tokens; } else { + msg_warn("upstream %s: returning %z tokens but only %z inflight (possible double-return)", + up->name, tokens, up->inflight_tokens); up->inflight_tokens = 0; } diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 81441f4219..cbebbb59a4 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -361,10 +361,12 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups, gsize base_cost); /** - * Get upstream using token bucket algorithm - * Selects upstream with lowest inflight tokens (weighted by message size) + * Get upstream using token bucket algorithm. + * Selects upstream with lowest inflight tokens (weighted by message size). + * Falls back to round-robin if heap initialization fails. + * Token cost is calculated as: base_cost + (message_size / scale). * @param ups upstream list - * @param except upstream to exclude (for retries) + * @param except upstream to exclude (for retries), or NULL * @param message_size size of the message being processed * @param reserved_tokens output: tokens reserved for this request (must be returned later) * @return selected upstream or NULL if none available @@ -375,11 +377,13 @@ struct upstream *rspamd_upstream_get_token_bucket(struct upstream_list *ups, gsize *reserved_tokens); /** - * Return tokens to upstream after request completion - * Must be called when a request completes (success or failure) + * Return tokens to upstream after request completion. + * Must be called exactly once for each successful rspamd_upstream_get_token_bucket call. + * On success, tokens are restored to the available pool. + * On failure, tokens are NOT restored - this penalises failing backends. * @param up upstream to return tokens to * @param tokens number of tokens to return (from rspamd_upstream_get_token_bucket) - * @param success TRUE if request succeeded, FALSE if failed + * @param success TRUE if request succeeded, FALSE if failed (penalty) */ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean success);