]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] proxy: implement token bucket load balancing for upstreams
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 3 Feb 2026 11:42:27 +0000 (11:42 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 3 Feb 2026 11:42:27 +0000 (11:42 +0000)
Add weighted load balancing algorithm that considers message size and
current backend load when selecting upstreams. Each upstream has a token
pool that gets depleted proportionally to message size and replenished
when requests complete successfully.

- Add RSPAMD_UPSTREAM_TOKEN_BUCKET rotation type
- Implement min-heap based selection for O(log n) upstream selection
- Reserve tokens proportional to message size (base_cost + size/scale)
- Return tokens on success (restores available) or failure (lost)
- Fall back to least-loaded upstream when all are token-exhausted
- Add UCL configuration: token_bucket { max_tokens, scale, min_tokens, base_cost }

src/libutil/upstream.c
src/libutil/upstream.h
src/rspamd_proxy.c

index 04d645a9c8aac4c31a1b8eab4c351efe33104095..8e6ae0545922913e8916a9ed8e9aec664b5a9882 100644 (file)
@@ -25,6 +25,7 @@
 #include "logger.h"
 #include "contrib/librdns/rdns.h"
 #include "contrib/mumhash/mum.h"
+#include "heap.h"
 
 #include <math.h>
 #include <netdb.h>
@@ -50,6 +51,15 @@ struct upstream_list_watcher {
        struct upstream_list_watcher *next, *prev;
 };
 
+/* Heap element for token bucket selection */
+struct upstream_token_heap_entry {
+       unsigned int pri;    /* Priority = inflight_tokens (lower = better) */
+       unsigned int idx;    /* Heap index (managed by heap) */
+       struct upstream *up; /* Pointer to upstream */
+};
+
+RSPAMD_HEAP_DECLARE(upstream_token_heap, struct upstream_token_heap_entry);
+
 struct upstream {
        unsigned int weight;
        unsigned int cur_weight;
@@ -81,6 +91,12 @@ struct upstream {
        gpointer data;
        char uid[8];
        ref_entry_t ref;
+
+       /* Token bucket fields for weighted load balancing */
+       gsize max_tokens;       /* Maximum token capacity */
+       gsize available_tokens; /* Current available tokens */
+       gsize inflight_tokens;  /* Tokens reserved by in-flight requests */
+       unsigned int heap_idx;  /* Index in token heap (UINT_MAX if not in heap) */
 #ifdef UPSTREAMS_THREAD_SAFE
        rspamd_mutex_t *lock;
 #endif
@@ -97,6 +113,12 @@ struct upstream_limits {
        double probe_jitter;
        unsigned int max_errors;
        unsigned int dns_retransmits;
+
+       /* Token bucket configuration */
+       gsize token_bucket_max;       /* Max tokens per upstream (default: 10000) */
+       gsize token_bucket_scale;     /* Bytes per token (default: 1024) */
+       gsize token_bucket_min;       /* Min tokens for selection (default: 1) */
+       gsize token_bucket_base_cost; /* Base cost per request (default: 10) */
 };
 
 struct upstream_list {
@@ -110,6 +132,10 @@ struct upstream_list {
        enum rspamd_upstream_flag flags;
        unsigned int cur_elt;
        enum rspamd_upstream_rotation rot_alg;
+
+       /* Token bucket heap for weighted selection */
+       upstream_token_heap_t token_heap;
+       gboolean token_bucket_initialized;
 #ifdef UPSTREAMS_THREAD_SAFE
        rspamd_mutex_t *lock;
 #endif
@@ -174,6 +200,12 @@ static const double default_probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF;
 #define DEFAULT_PROBE_JITTER 0.3
 static const double default_probe_jitter = DEFAULT_PROBE_JITTER;
 
+/* Token bucket defaults */
+#define DEFAULT_TOKEN_BUCKET_MAX 10000
+#define DEFAULT_TOKEN_BUCKET_SCALE 1024
+#define DEFAULT_TOKEN_BUCKET_MIN 1
+#define DEFAULT_TOKEN_BUCKET_BASE_COST 10
+
 static const struct upstream_limits default_limits = {
        .revive_time = DEFAULT_REVIVE_TIME,
        .revive_jitter = DEFAULT_REVIVE_JITTER,
@@ -185,6 +217,10 @@ static const struct upstream_limits default_limits = {
        .resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL,
        .probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF,
        .probe_jitter = DEFAULT_PROBE_JITTER,
+       .token_bucket_max = DEFAULT_TOKEN_BUCKET_MAX,
+       .token_bucket_scale = DEFAULT_TOKEN_BUCKET_SCALE,
+       .token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN,
+       .token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST,
 };
 
 static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
@@ -360,6 +396,25 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
        g_ptr_array_add(ls->alive, upstream);
        upstream->active_idx = ls->alive->len - 1;
 
+       /* Initialize token bucket state */
+       upstream->heap_idx = UINT_MAX;
+       if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
+               upstream->max_tokens = ls->limits->token_bucket_max;
+               upstream->available_tokens = upstream->max_tokens;
+               upstream->inflight_tokens = 0;
+
+               /* Add to token heap if already initialized */
+               if (ls->token_bucket_initialized) {
+                       struct upstream_token_heap_entry entry;
+                       entry.pri = 0;
+                       entry.idx = 0;
+                       entry.up = upstream;
+                       rspamd_heap_push_safe(upstream_token_heap, &ls->token_heap, &entry, skip_heap);
+                       upstream->heap_idx = rspamd_heap_size(upstream_token_heap, &ls->token_heap) - 1;
+               skip_heap:;
+               }
+       }
+
        if (upstream->ctx && upstream->ctx->configured &&
                !((upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE) ||
                  (upstream->flags & RSPAMD_UPSTREAM_FLAG_DNS))) {
@@ -867,6 +922,36 @@ rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream
        g_ptr_array_remove_index(ls->alive, upstream->active_idx);
        upstream->active_idx = -1;
 
+       /* Remove from token bucket heap if present */
+       if (ls->token_bucket_initialized && upstream->heap_idx != UINT_MAX) {
+               struct upstream_token_heap_entry *entry;
+
+               RSPAMD_UPSTREAM_LOCK(upstream);
+
+               if (upstream->heap_idx < rspamd_heap_size(upstream_token_heap, &ls->token_heap)) {
+                       entry = rspamd_heap_index(upstream_token_heap, &ls->token_heap, upstream->heap_idx);
+                       if (entry && entry->up == upstream) {
+                               rspamd_heap_remove(upstream_token_heap, &ls->token_heap, entry);
+                       }
+               }
+               upstream->heap_idx = UINT_MAX;
+
+               /*
+                * Return inflight tokens to available pool - these represent
+                * requests that were in-flight when upstream failed. The tokens
+                * should be restored so they're available when upstream comes back.
+                */
+               if (upstream->inflight_tokens > 0) {
+                       upstream->available_tokens += upstream->inflight_tokens;
+                       if (upstream->available_tokens > upstream->max_tokens) {
+                               upstream->available_tokens = upstream->max_tokens;
+                       }
+                       upstream->inflight_tokens = 0;
+               }
+
+               RSPAMD_UPSTREAM_UNLOCK(upstream);
+       }
+
        /* We need to update all indices */
        for (i = 0; i < ls->alive->len; i++) {
                cur = g_ptr_array_index(ls->alive, i);
@@ -1594,6 +1679,12 @@ void rspamd_upstreams_destroy(struct upstream_list *ups)
        struct upstream_list_watcher *w, *tmp;
 
        if (ups != NULL) {
+               /* Clean up token bucket heap */
+               if (ups->token_bucket_initialized) {
+                       rspamd_heap_destroy(upstream_token_heap, &ups->token_heap);
+                       ups->token_bucket_initialized = FALSE;
+               }
+
                g_ptr_array_free(ups->alive, TRUE);
 
                for (i = 0; i < ups->ups->len; i++) {
@@ -1886,6 +1977,14 @@ rspamd_upstream_get_common(struct upstream_list *ups,
        case RSPAMD_UPSTREAM_MASTER_SLAVE:
                up = rspamd_upstream_get_round_robin(ups, except, FALSE);
                break;
+       case RSPAMD_UPSTREAM_TOKEN_BUCKET:
+               /*
+                * Token bucket requires message size, which isn't available here.
+                * Fall back to round robin. Use rspamd_upstream_get_token_bucket()
+                * for proper token bucket selection.
+                */
+               up = rspamd_upstream_get_round_robin(ups, except, TRUE);
+               break;
        case RSPAMD_UPSTREAM_SEQUENTIAL:
                if (ups->cur_elt >= ups->alive->len) {
                        ups->cur_elt = 0;
@@ -2034,6 +2133,304 @@ void rspamd_upstreams_add_watch_callback(struct upstream_list *ups,
        DL_APPEND(ups->watchers, nw);
 }
 
+enum rspamd_upstream_rotation
+rspamd_upstreams_get_rotation(struct upstream_list *ups)
+{
+       if (ups == NULL) {
+               return RSPAMD_UPSTREAM_UNDEF;
+       }
+       return ups->rot_alg;
+}
+
+void rspamd_upstreams_set_token_bucket(struct upstream_list *ups,
+                                                                          gsize max_tokens,
+                                                                          gsize scale_factor,
+                                                                          gsize min_tokens,
+                                                                          gsize base_cost)
+{
+       struct upstream_limits *nlimits;
+       g_assert(ups != NULL);
+
+       /* Allocate new limits if we have a pool, otherwise modify in place */
+       if (ups->ctx && ups->ctx->pool) {
+               nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
+               memcpy(nlimits, ups->limits, sizeof(*nlimits));
+       }
+       else {
+               /* No pool, we need to be careful here */
+               nlimits = g_malloc(sizeof(*nlimits));
+               memcpy(nlimits, ups->limits, sizeof(*nlimits));
+       }
+
+       if (max_tokens > 0) {
+               nlimits->token_bucket_max = max_tokens;
+       }
+       if (scale_factor > 0) {
+               nlimits->token_bucket_scale = scale_factor;
+       }
+       if (min_tokens > 0) {
+               nlimits->token_bucket_min = min_tokens;
+       }
+       if (base_cost > 0) {
+               nlimits->token_bucket_base_cost = base_cost;
+       }
+
+       ups->limits = nlimits;
+}
+
+/*
+ * Calculate token cost for a message of given size
+ */
+static inline gsize
+rspamd_upstream_calculate_tokens(const struct upstream_limits *limits,
+                                                                gsize message_size)
+{
+       return limits->token_bucket_base_cost +
+                  (message_size / limits->token_bucket_scale);
+}
+
+/*
+ * Initialize token bucket heap for an upstream list (lazy initialization)
+ */
+static gboolean
+rspamd_upstream_token_bucket_init(struct upstream_list *ups)
+{
+       unsigned int i;
+       struct upstream *up;
+       struct upstream_token_heap_entry entry;
+
+       if (ups->token_bucket_initialized) {
+               return TRUE;
+       }
+
+       rspamd_heap_init(upstream_token_heap, &ups->token_heap);
+
+       /* Add all alive upstreams to the heap */
+       for (i = 0; i < ups->alive->len; i++) {
+               up = g_ptr_array_index(ups->alive, i);
+
+               /* Initialize token bucket state for this upstream */
+               up->max_tokens = ups->limits->token_bucket_max;
+               up->available_tokens = up->max_tokens;
+               up->inflight_tokens = 0;
+
+               /* Add to heap with priority = inflight_tokens (0 initially) */
+               entry.pri = 0;
+               entry.idx = 0;
+               entry.up = up;
+
+               rspamd_heap_push_safe(upstream_token_heap, &ups->token_heap, &entry, init_error);
+               up->heap_idx = rspamd_heap_size(upstream_token_heap, &ups->token_heap) - 1;
+       }
+
+       ups->token_bucket_initialized = TRUE;
+       return TRUE;
+
+init_error:
+       /* Heap allocation failed, destroy what we have */
+       rspamd_heap_destroy(upstream_token_heap, &ups->token_heap);
+       return FALSE;
+}
+
+/*
+ * Update heap position after changing inflight_tokens
+ */
+static void
+rspamd_upstream_token_heap_update(struct upstream_list *ups, struct upstream *up)
+{
+       struct upstream_token_heap_entry *entry;
+
+       if (!ups->token_bucket_initialized || up->heap_idx == UINT_MAX) {
+               return;
+       }
+
+       if (up->heap_idx >= rspamd_heap_size(upstream_token_heap, &ups->token_heap)) {
+               return;
+       }
+
+       entry = rspamd_heap_index(upstream_token_heap, &ups->token_heap, up->heap_idx);
+       if (entry && entry->up == up) {
+               /* Use rspamd_heap_update to adjust position based on new priority */
+               unsigned int new_pri = (unsigned int) MIN(up->inflight_tokens, UINT_MAX);
+               rspamd_heap_update(upstream_token_heap, &ups->token_heap, entry, new_pri);
+       }
+}
+
+/*
+ * Find upstream in heap by pointer (for removal or update after finding mismatch)
+ */
+static struct upstream_token_heap_entry *
+rspamd_upstream_find_in_heap(struct upstream_list *ups, struct upstream *up)
+{
+       unsigned int i;
+       struct upstream_token_heap_entry *entry;
+
+       for (i = 0; i < rspamd_heap_size(upstream_token_heap, &ups->token_heap); i++) {
+               entry = rspamd_heap_index(upstream_token_heap, &ups->token_heap, i);
+               if (entry && entry->up == up) {
+                       up->heap_idx = i;
+                       return entry;
+               }
+       }
+       return NULL;
+}
+
+struct upstream *
+rspamd_upstream_get_token_bucket(struct upstream_list *ups,
+                                                                struct upstream *except,
+                                                                gsize message_size,
+                                                                gsize *reserved_tokens)
+{
+       struct upstream *selected = NULL;
+       struct upstream_token_heap_entry *entry;
+       gsize token_cost;
+       unsigned int i;
+       gsize min_inflight = G_MAXSIZE;
+       struct upstream *fallback = NULL;
+
+       if (ups == NULL || reserved_tokens == NULL) {
+               return NULL;
+       }
+
+       *reserved_tokens = 0;
+
+       RSPAMD_UPSTREAM_LOCK(ups);
+
+       /* Handle empty alive list same as other algorithms */
+       if (ups->alive->len == 0) {
+               RSPAMD_UPSTREAM_UNLOCK(ups);
+               return NULL;
+       }
+
+       /* Initialize token bucket if not done yet */
+       if (!ups->token_bucket_initialized) {
+               if (!rspamd_upstream_token_bucket_init(ups)) {
+                       /* Fall back to round robin on init failure */
+                       RSPAMD_UPSTREAM_UNLOCK(ups);
+                       return rspamd_upstream_get_round_robin(ups, except, TRUE);
+               }
+       }
+
+       /* Calculate token cost for this message */
+       token_cost = rspamd_upstream_calculate_tokens(ups->limits, message_size);
+
+       /*
+        * Use heap property: the root (index 0) has minimum inflight_tokens.
+        * Check a few candidates from the top of the heap rather than scanning all.
+        */
+       unsigned int heap_size = rspamd_heap_size(upstream_token_heap, &ups->token_heap);
+       unsigned int candidates_checked = 0;
+       const unsigned int max_candidates = 8; /* Check up to 8 lowest-loaded upstreams */
+
+       for (i = 0; i < heap_size && candidates_checked < max_candidates; i++) {
+               entry = rspamd_heap_index(upstream_token_heap, &ups->token_heap, i);
+
+               if (entry == NULL || entry->up == NULL) {
+                       continue;
+               }
+
+               struct upstream *up = entry->up;
+
+               /* Skip inactive upstreams */
+               if (up->active_idx < 0) {
+                       continue;
+               }
+
+               /* Skip excluded upstream */
+               if (except && up == except) {
+                       continue;
+               }
+
+               candidates_checked++;
+
+               /* Track upstream with minimum inflight for fallback */
+               if (up->inflight_tokens < min_inflight) {
+                       min_inflight = up->inflight_tokens;
+                       fallback = up;
+               }
+
+               /* Check if upstream has sufficient tokens */
+               if (up->available_tokens >= token_cost) {
+                       selected = up;
+                       break;
+               }
+       }
+
+       /* If no upstream has sufficient tokens, use the least loaded one */
+       if (selected == NULL && fallback != NULL) {
+               selected = fallback;
+       }
+
+       if (selected != NULL) {
+               /* Reserve tokens */
+               if (selected->available_tokens >= token_cost) {
+                       selected->available_tokens -= token_cost;
+               }
+               else {
+                       /* Clamp to 0 if we don't have enough */
+                       selected->available_tokens = 0;
+               }
+               selected->inflight_tokens += token_cost;
+               *reserved_tokens = token_cost;
+
+               /* Update heap position */
+               rspamd_upstream_token_heap_update(ups, selected);
+
+               selected->checked++;
+       }
+
+       RSPAMD_UPSTREAM_UNLOCK(ups);
+
+       return selected;
+}
+
+void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean success)
+{
+       struct upstream_list *ls;
+
+       if (up == NULL || tokens == 0) {
+               return;
+       }
+
+       ls = up->ls;
+
+       /*
+        * Lock ordering: always lock list before upstream to prevent deadlocks.
+        * This is consistent with rspamd_upstream_get_token_bucket.
+        */
+       if (ls) {
+               RSPAMD_UPSTREAM_LOCK(ls);
+       }
+       RSPAMD_UPSTREAM_LOCK(up);
+
+       /* Return tokens from inflight */
+       if (up->inflight_tokens >= tokens) {
+               up->inflight_tokens -= tokens;
+       }
+       else {
+               up->inflight_tokens = 0;
+       }
+
+       /* Only restore available tokens on success */
+       if (success) {
+               up->available_tokens += tokens;
+               /* Cap at max tokens */
+               if (up->available_tokens > up->max_tokens) {
+                       up->available_tokens = up->max_tokens;
+               }
+       }
+
+       /* Update heap position if we have a list */
+       if (ls && ls->token_bucket_initialized) {
+               rspamd_upstream_token_heap_update(ls, up);
+       }
+
+       RSPAMD_UPSTREAM_UNLOCK(up);
+       if (ls) {
+               RSPAMD_UPSTREAM_UNLOCK(ls);
+       }
+}
+
 struct upstream *
 rspamd_upstream_ref(struct upstream *up)
 {
index 80d4242f46969104e611301a9beeaa8f6e1d6bb7..81441f42199319f200e9a28a5ffd30394a67fb0b 100644 (file)
@@ -35,6 +35,7 @@ enum rspamd_upstream_rotation {
        RSPAMD_UPSTREAM_ROUND_ROBIN,
        RSPAMD_UPSTREAM_MASTER_SLAVE,
        RSPAMD_UPSTREAM_SEQUENTIAL,
+       RSPAMD_UPSTREAM_TOKEN_BUCKET, /* Token bucket weighted balancing */
        RSPAMD_UPSTREAM_UNDEF
 };
 
@@ -338,6 +339,50 @@ struct upstream *rspamd_upstream_ref(struct upstream *up);
  */
 void rspamd_upstream_unref(struct upstream *up);
 
+/**
+ * Get the current rotation algorithm for an upstream list
+ * @param ups upstream list
+ * @return rotation algorithm
+ */
+enum rspamd_upstream_rotation rspamd_upstreams_get_rotation(struct upstream_list *ups);
+
+/**
+ * Configure token bucket parameters for an upstream list
+ * @param ups upstream list
+ * @param max_tokens maximum tokens per upstream (default: 10000)
+ * @param scale_factor bytes per token (default: 1024)
+ * @param min_tokens minimum tokens for selection (default: 1)
+ * @param base_cost base cost per request (default: 10)
+ */
+void rspamd_upstreams_set_token_bucket(struct upstream_list *ups,
+                                                                          gsize max_tokens,
+                                                                          gsize scale_factor,
+                                                                          gsize min_tokens,
+                                                                          gsize base_cost);
+
+/**
+ * Get upstream using token bucket algorithm
+ * Selects upstream with lowest inflight tokens (weighted by message size)
+ * @param ups upstream list
+ * @param except upstream to exclude (for retries)
+ * @param message_size size of the message being processed
+ * @param reserved_tokens output: tokens reserved for this request (must be returned later)
+ * @return selected upstream or NULL if none available
+ */
+struct upstream *rspamd_upstream_get_token_bucket(struct upstream_list *ups,
+                                                                                                 struct upstream *except,
+                                                                                                 gsize message_size,
+                                                                                                 gsize *reserved_tokens);
+
+/**
+ * Return tokens to upstream after request completion
+ * Must be called when a request completes (success or failure)
+ * @param up upstream to return tokens to
+ * @param tokens number of tokens to return (from rspamd_upstream_get_token_bucket)
+ * @param success TRUE if request succeeded, FALSE if failed
+ */
+void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean success);
+
 #ifdef __cplusplus
 }
 #endif
index 683f35b079aab78db26111386b293034ae1c57fb..82b7a2ec02753462a9a9ce6d29840748559aaee4 100644 (file)
@@ -203,6 +203,7 @@ struct rspamd_proxy_backend_connection {
        int parser_from_ref;
        int parser_to_ref;
        struct rspamd_task *task;
+       gsize reserved_tokens; /* Tokens reserved for this request (token bucket) */
 };
 
 enum rspamd_proxy_legacy_support {
@@ -646,6 +647,53 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool,
                up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt));
        }
 
+       /* Parse token_bucket configuration for weighted load balancing */
+       elt = ucl_object_lookup(obj, "token_bucket");
+       if (elt != NULL && ucl_object_type(elt) == UCL_OBJECT && up->u != NULL) {
+               gsize max_tokens = 10000, scale = 1024, min_tokens = 1, base_cost = 10;
+               const ucl_object_t *tb_elt;
+
+               if ((tb_elt = ucl_object_lookup(elt, "max_tokens")) != NULL) {
+                       max_tokens = ucl_object_toint(tb_elt);
+                       if (max_tokens == 0) {
+                               msg_warn_pool("token_bucket.max_tokens must be > 0, using default 10000");
+                               max_tokens = 10000;
+                       }
+               }
+               if ((tb_elt = ucl_object_lookup(elt, "scale")) != NULL) {
+                       scale = ucl_object_toint(tb_elt);
+                       if (scale == 0) {
+                               msg_warn_pool("token_bucket.scale cannot be 0 (division by zero), using default 1024");
+                               scale = 1024;
+                       }
+               }
+               if ((tb_elt = ucl_object_lookup(elt, "min_tokens")) != NULL) {
+                       min_tokens = ucl_object_toint(tb_elt);
+               }
+               if ((tb_elt = ucl_object_lookup(elt, "base_cost")) != NULL) {
+                       base_cost = ucl_object_toint(tb_elt);
+               }
+
+               /* Validate relationships */
+               if (min_tokens > max_tokens) {
+                       msg_warn_pool("token_bucket.min_tokens (%zu) > max_tokens (%zu), clamping",
+                                                 min_tokens, max_tokens);
+                       min_tokens = max_tokens;
+               }
+               if (base_cost >= max_tokens) {
+                       msg_warn_pool("token_bucket.base_cost (%zu) >= max_tokens (%zu), reducing to max/2",
+                                                 base_cost, max_tokens);
+                       base_cost = max_tokens / 2;
+               }
+
+               /* Enable token bucket rotation and configure parameters */
+               rspamd_upstreams_set_rotation(up->u, RSPAMD_UPSTREAM_TOKEN_BUCKET);
+               rspamd_upstreams_set_token_bucket(up->u, max_tokens, scale, min_tokens, base_cost);
+
+               msg_info_pool_check("upstream %s: token_bucket enabled (max=%zu, scale=%zu, min=%zu, base=%zu)",
+                                                       up->name, max_tokens, scale, min_tokens, base_cost);
+       }
+
        /*
         * Accept lua function here in form
         * fun :: String -> UCL
@@ -1119,6 +1167,12 @@ static void
 proxy_backend_close_connection(struct rspamd_proxy_backend_connection *conn)
 {
        if (conn && !(conn->flags & RSPAMD_BACKEND_CLOSED)) {
+               /* Return any reserved tokens if not already returned (safety net) */
+               if (conn->reserved_tokens > 0 && conn->up) {
+                       rspamd_upstream_return_tokens(conn->up, conn->reserved_tokens, FALSE);
+                       conn->reserved_tokens = 0;
+               }
+
                if (conn->backend_conn) {
                        rspamd_http_connection_reset(conn->backend_conn);
                        rspamd_http_connection_unref(conn->backend_conn);
@@ -2113,6 +2167,13 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError *
                                                                                          : "self-scan",
                                         err,
                                         session->ctx->max_retries - session->retries);
+
+       /* Return reserved tokens on error (token bucket load balancing) */
+       if (bk_conn->reserved_tokens > 0 && bk_conn->up) {
+               rspamd_upstream_return_tokens(bk_conn->up, bk_conn->reserved_tokens, FALSE);
+               bk_conn->reserved_tokens = 0;
+       }
+
        rspamd_upstream_fail(bk_conn->up, FALSE, err ? err->message : "unknown");
        proxy_backend_close_connection(session->master_conn);
 
@@ -2215,6 +2276,12 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
                }
        }
 
+       /* Return reserved tokens on success (token bucket load balancing) */
+       if (bk_conn->reserved_tokens > 0 && bk_conn->up) {
+               rspamd_upstream_return_tokens(bk_conn->up, bk_conn->reserved_tokens, TRUE);
+               bk_conn->reserved_tokens = 0;
+       }
+
        rspamd_upstream_ok(bk_conn->up);
 
        /* Handle keepalive for master connection */
@@ -2602,8 +2669,30 @@ proxy_send_master_message(struct rspamd_proxy_session *session)
                gpointer hash_key = rspamd_inet_address_get_hash_key(session->client_addr,
                                                                                                                         &hash_len);
 
-               if (session->ctx->max_retries > 1 &&
-                       session->retries == session->ctx->max_retries) {
+               /* Initialize reserved_tokens to 0 */
+               session->master_conn->reserved_tokens = 0;
+
+               /* Check if token bucket algorithm is configured */
+               if (rspamd_upstreams_get_rotation(backend->u) == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
+                       /* Calculate message size for token bucket */
+                       gsize message_size = 0;
+
+                       if (session->map && session->map_len) {
+                               message_size = session->map_len;
+                       }
+                       else if (session->client_message && session->client_message->body_buf.len > 0) {
+                               message_size = session->client_message->body_buf.len;
+                       }
+
+                       /* Use token bucket selection */
+                       session->master_conn->up = rspamd_upstream_get_token_bucket(
+                               backend->u,
+                               (session->retries > 0) ? session->master_conn->up : NULL,
+                               message_size,
+                               &session->master_conn->reserved_tokens);
+               }
+               else if (session->ctx->max_retries > 1 &&
+                                session->retries == session->ctx->max_retries) {
 
                        session->master_conn->up = rspamd_upstream_get_except(backend->u,
                                                                                                                                  session->master_conn->up,