}
/*
- * Update heap position after changing inflight_tokens
- */
-static void
-rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up)
-{
- struct upstream_token_heap_entry *entry;
-
- if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) {
- return;
- }
-
- if (up->heap_idx >= rspamd_heap_size(upstream_token_heap, &ups->token_heap)) {
- return;
- }
-
- entry = rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx);
- if (entry && entry->up == up) {
- /* Use rspamd_heap_update to adjust position based on new priority */
- unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX);
- rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri);
- }
-}
-
-/*
- * Find upstream in heap by pointer (for removal or update after finding mismatch)
+ * Find upstream in heap by pointer (for removal or update after finding mismatch).
+ * Also refreshes up->heap_idx as a side effect.
*/
static struct upstream_token_heap_entry *
rspamd_upstream_find_in_heap(struct upstream_list *ups, struct upstream *up)
return NULL;
}
+/*
+ * Update heap position after changing inflight_tokens.
+ *
+ * The intrusive heap stores elements by value and swaps entire structs during
+ * swim/sink operations. This means up->heap_idx can become stale after any
+ * heap modification (the entry at that index may now belong to a different
+ * upstream). We handle this by:
+ * 1. Trying the cached heap_idx first (fast path)
+ * 2. Falling back to linear search if the cache is stale
+ * 3. Refreshing heap_idx after the update via linear search
+ *
+ * Linear search is acceptable since upstream count is typically small (2-10).
+ */
+static void
+rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up)
+{
+ struct upstream_token_heap_entry *entry = NULL;
+
+ if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) {
+ return;
+ }
+
+ /* Try cached index first (fast path) */
+ if (up->heap_idx < rspamd_heap_size(upstream_token_heap, &ups->token_heap)) {
+ struct upstream_token_heap_entry *candidate =
+ rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx);
+ if (candidate && candidate->up == up) {
+ entry = candidate;
+ }
+ }
+
+ /* Cache miss: linear search */
+ if (!entry) {
+ entry = rspamd_upstream_find_in_heap(ups, up);
+ if (!entry) {
+ return;
+ }
+ }
+
+ unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX);
+ rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri);
+
+ /* Refresh heap_idx: heap_update swaps whole structs during swim/sink,
+ * so the entry pointer now points to whatever element ended up at that
+ * array slot - not necessarily our upstream. */
+ rspamd_upstream_find_in_heap(ups, up);
+}
+
struct upstream *
rspamd_upstream_get_token_bucket(struct upstream_list *ups,
struct upstream *except,
up->inflight_tokens -= tokens;
}
else {
+ msg_warn("upstream %s: returning %z tokens but only %z inflight (possible double-return)",
+ up->name, tokens, up->inflight_tokens);
up->inflight_tokens = 0;
}
gsize base_cost);
/**
- * Get upstream using token bucket algorithm
- * Selects upstream with lowest inflight tokens (weighted by message size)
+ * Get upstream using token bucket algorithm.
+ * Selects upstream with lowest inflight tokens (weighted by message size).
+ * Falls back to round-robin if heap initialization fails.
+ * Token cost is calculated as: base_cost + (message_size / scale).
* @param ups upstream list
- * @param except upstream to exclude (for retries)
+ * @param except upstream to exclude (for retries), or NULL
* @param message_size size of the message being processed
* @param reserved_tokens output: tokens reserved for this request (must be returned later)
* @return selected upstream or NULL if none available
gsize *reserved_tokens);
/**
- * Return tokens to upstream after request completion
- * Must be called when a request completes (success or failure)
+ * Return tokens to upstream after request completion.
+ * Must be called exactly once for each successful rspamd_upstream_get_token_bucket call.
+ * On success, tokens are restored to the available pool.
+ * On failure, tokens are NOT restored - this penalises failing backends.
* @param up upstream to return tokens to
* @param tokens number of tokens to return (from rspamd_upstream_get_token_bucket)
- * @param success TRUE if request succeeded, FALSE if failed
+ * @param success TRUE if request succeeded, FALSE if failed (penalty)
*/
void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean success);