]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] upstream: fix stale heap_idx in token bucket
authorVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 6 Feb 2026 09:25:17 +0000 (09:25 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 6 Feb 2026 09:25:17 +0000 (09:25 +0000)
The intrusive heap swaps entire structs during swim/sink, making
up->heap_idx stale after any heap modification. The update function
would silently skip updates when the cached index pointed to a
different upstream, breaking load distribution across backends.

Fix by falling back to linear search on cache miss and refreshing
heap_idx after every heap update. Also add underflow warning for
double-return detection and improve API documentation.

src/libutil/upstream.c
src/libutil/upstream.h

index 8e6ae0545922913e8916a9ed8e9aec664b5a9882..3725e80474c8d232586abe53b2b918af06302e3e 100644 (file)
@@ -2233,31 +2233,8 @@ init_error:
 }
 
 /*
- * Update heap position after changing inflight_tokens
- */
-static void
-rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up)
-{
-       struct upstream_token_heap_entry *entry;
-
-       if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) {
-               return;
-       }
-
-       if (up->heap_idx >= rspamd_heap_size(upstream_token_heap, &ups->token_heap)) {
-               return;
-       }
-
-       entry = rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx);
-       if (entry && entry->up == up) {
-               /* Use rspamd_heap_update to adjust position based on new priority */
-               unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX);
-               rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri);
-       }
-}
-
-/*
- * Find upstream in heap by pointer (for removal or update after finding mismatch)
+ * Find upstream in heap by pointer (for removal or update after finding mismatch).
+ * Also refreshes up->heap_idx as a side effect.
  */
 static struct upstream_token_heap_entry *
 rspamd_upstream_find_in_heap(struct upstream_list *ups, struct upstream *up)
@@ -2275,6 +2252,54 @@ rspamd_upstream_find_in_heap(struct upstream_list *ups, struct upstream *up)
        return NULL;
 }
 
+/*
+ * Update heap position after changing inflight_tokens.
+ *
+ * The intrusive heap stores elements by value and swaps entire structs during
+ * swim/sink operations. This means up->heap_idx can become stale after any
+ * heap modification (the entry at that index may now belong to a different
+ * upstream). We handle this by:
+ * 1. Trying the cached heap_idx first (fast path)
+ * 2. Falling back to linear search if the cache is stale
+ * 3. Refreshing heap_idx after the update via linear search
+ *
+ * Linear search is acceptable since upstream count is typically small (2-10).
+ */
+static void
+rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up)
+{
+       struct upstream_token_heap_entry *entry = NULL;
+
+       if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) {
+               return;
+       }
+
+       /* Try cached index first (fast path) */
+       if (up->heap_idx < rspamd_heap_size(upstream_token_heap, &ups->token_heap)) {
+               struct upstream_token_heap_entry *candidate =
+                       rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx);
+               if (candidate && candidate->up == up) {
+                       entry = candidate;
+               }
+       }
+
+       /* Cache miss: linear search */
+       if (!entry) {
+               entry = rspamd_upstream_find_in_heap(ups, up);
+               if (!entry) {
+                       return;
+               }
+       }
+
+       unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX);
+       rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri);
+
+       /* Refresh heap_idx: heap_update swaps whole structs during swim/sink,
+        * so the entry pointer now points to whatever element ended up at that
+        * array slot - not necessarily our upstream. */
+       rspamd_upstream_find_in_heap(ups, up);
+}
+
 struct upstream *
 rspamd_upstream_get_token_bucket(struct upstream_list *ups,
                                                                 struct upstream *except,
@@ -2408,6 +2433,8 @@ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean s
                up->inflight_tokens -= tokens;
        }
        else {
+               msg_warn("upstream %s: returning %z tokens but only %z inflight (possible double-return)",
+                                up->name, tokens, up->inflight_tokens);
                up->inflight_tokens = 0;
        }
 
index 81441f42199319f200e9a28a5ffd30394a67fb0b..cbebbb59a431cd6b43020b3c42ac906263b348ff 100644 (file)
@@ -361,10 +361,12 @@ void rspamd_upstreams_set_token_bucket(struct upstream_list *ups,
                                                                           gsize base_cost);
 
 /**
- * Get upstream using token bucket algorithm
- * Selects upstream with lowest inflight tokens (weighted by message size)
+ * Get upstream using token bucket algorithm.
+ * Selects upstream with lowest inflight tokens (weighted by message size).
+ * Falls back to round-robin if heap initialization fails.
+ * Token cost is calculated as: base_cost + (message_size / scale).
  * @param ups upstream list
- * @param except upstream to exclude (for retries)
+ * @param except upstream to exclude (for retries), or NULL
  * @param message_size size of the message being processed
  * @param reserved_tokens output: tokens reserved for this request (must be returned later)
  * @return selected upstream or NULL if none available
@@ -375,11 +377,13 @@ struct upstream *rspamd_upstream_get_token_bucket(struct upstream_list *ups,
                                                                                                  gsize *reserved_tokens);
 
 /**
- * Return tokens to upstream after request completion
- * Must be called when a request completes (success or failure)
+ * Return tokens to upstream after request completion.
+ * Must be called exactly once for each successful rspamd_upstream_get_token_bucket call.
+ * On success, tokens are restored to the available pool.
+ * On failure, tokens are NOT restored - this penalises failing backends.
  * @param up upstream to return tokens to
  * @param tokens number of tokens to return (from rspamd_upstream_get_token_bucket)
- * @param success TRUE if request succeeded, FALSE if failed
+ * @param success TRUE if request succeeded, FALSE if failed (penalty)
  */
 void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean success);