]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] upstream: lazy time-based refill for token bucket
authorVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:12:45 +0000 (09:12 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Fri, 1 May 2026 08:12:45 +0000 (09:12 +0100)
return_tokens with success=false decremented inflight but never
returned tokens to available_tokens, so a flapping upstream's bucket
drained monotonically toward zero and never recovered. Selection
then permanently fell into the least-inflight fallback path,
defeating the cost signal.

Add a real refill rate (token_bucket_refill_per_s, default = max/60
so a quiet bucket fully regenerates in 60s of wall time). Call lazy
refill from get_token_bucket and return_tokens; failure no longer
permanently penalises the bucket. Within-tick test workloads see dt
small enough that floor(dt * rate) == 0, so existing assertions are
unaffected.

src/libutil/upstream.c

index 6c0673da5e5327dbc17bb5014f7a438de259e6b8..a43e026497b0b5251f89c37f2d2f4d14c160a16c 100644 (file)
@@ -97,6 +97,7 @@ struct upstream {
        gsize max_tokens;       /* Maximum token capacity */
        gsize available_tokens; /* Current available tokens */
        gsize inflight_tokens;  /* Tokens reserved by in-flight requests */
+       double last_refill_at;  /* Last lazy-refill timestamp (ev_now/ticks); 0 = uninit */
 #ifdef UPSTREAMS_THREAD_SAFE
        rspamd_mutex_t *lock;
 #endif
@@ -115,10 +116,11 @@ struct upstream_limits {
        unsigned int dns_retransmits;
 
        /* Token bucket configuration */
-       gsize token_bucket_max;       /* Max tokens per upstream (default: 10000) */
-       gsize token_bucket_scale;     /* Bytes per token (default: 1024) */
-       gsize token_bucket_min;       /* Min tokens for selection (default: 1) */
-       gsize token_bucket_base_cost; /* Base cost per request (default: 10) */
+       gsize token_bucket_max;          /* Max tokens per upstream (default: 10000) */
+       gsize token_bucket_scale;        /* Bytes per token (default: 1024) */
+       gsize token_bucket_min;          /* Min tokens for selection (default: 1) */
+       gsize token_bucket_base_cost;    /* Base cost per request (default: 10) */
+       gsize token_bucket_refill_per_s; /* Lazy refill rate (default: max/60) */
 };
 
 struct upstream_list {
@@ -206,6 +208,8 @@ static const double default_probe_jitter = DEFAULT_PROBE_JITTER;
 #define DEFAULT_TOKEN_BUCKET_SCALE 1024
 #define DEFAULT_TOKEN_BUCKET_MIN 1
 #define DEFAULT_TOKEN_BUCKET_BASE_COST 10
+/* Default refill rate: full bucket regenerates in 60s of wall time. */
+#define DEFAULT_TOKEN_BUCKET_REFILL_PER_S (DEFAULT_TOKEN_BUCKET_MAX / 60)
 
 /*
  * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the
@@ -229,6 +233,7 @@ static const struct upstream_limits default_limits = {
        .token_bucket_scale = DEFAULT_TOKEN_BUCKET_SCALE,
        .token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN,
        .token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST,
+       .token_bucket_refill_per_s = DEFAULT_TOKEN_BUCKET_REFILL_PER_S,
 };
 
 static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
@@ -2571,9 +2576,67 @@ rspamd_upstream_ensure_tokens(struct upstream_list *ups, struct upstream *up)
                up->max_tokens = ups->limits->token_bucket_max;
                up->available_tokens = up->max_tokens;
                up->inflight_tokens = 0;
+               up->last_refill_at = 0;
        }
 }
 
+/*
+ * Lazy time-based refill. Adds floor(dt * refill_per_s) tokens to
+ * available_tokens, capped at max_tokens. Called from selection and return
+ * paths so that an upstream that has been quiet (or that lost tokens to a
+ * failure) gradually regains capacity without any timer fan-out.
+ */
+static inline void
+rspamd_upstream_refill_tokens(struct upstream *up,
+                                                         const struct upstream_limits *limits,
+                                                         double now)
+{
+       gsize add;
+       double dt;
+
+       if (limits->token_bucket_refill_per_s == 0 || up->max_tokens == 0) {
+               up->last_refill_at = now;
+               return;
+       }
+
+       if (up->last_refill_at <= 0) {
+               up->last_refill_at = now;
+               return;
+       }
+
+       dt = now - up->last_refill_at;
+       if (dt <= 0) {
+               return;
+       }
+
+       add = (gsize) (dt * (double) limits->token_bucket_refill_per_s);
+       if (add == 0) {
+               /* Don't update last_refill_at; let small increments accumulate. */
+               return;
+       }
+
+       if (up->available_tokens + add < up->available_tokens) {
+               /* Overflow guard */
+               up->available_tokens = up->max_tokens;
+       }
+       else {
+               up->available_tokens += add;
+               if (up->available_tokens > up->max_tokens) {
+                       up->available_tokens = up->max_tokens;
+               }
+       }
+       up->last_refill_at = now;
+}
+
+static inline double
+rspamd_upstream_now(const struct upstream *up)
+{
+       if (up->ctx && up->ctx->event_loop) {
+               return ev_now(up->ctx->event_loop);
+       }
+       return rspamd_get_ticks(FALSE);
+}
+
 struct upstream *
 rspamd_upstream_get_token_bucket(struct upstream_list *ups,
                                                                 struct upstream *except,
@@ -2603,6 +2666,14 @@ rspamd_upstream_get_token_bucket(struct upstream_list *ups,
 
        token_cost = rspamd_upstream_calculate_tokens(ups->limits, message_size);
 
+       double now;
+       if (ups->ctx && ups->ctx->event_loop) {
+               now = ev_now(ups->ctx->event_loop);
+       }
+       else {
+               now = rspamd_get_ticks(FALSE);
+       }
+
        /*
         * Linear scan over alive[]: prefer the lowest-inflight upstream that has
         * sufficient available tokens. If no upstream is eligible, fall back to
@@ -2619,6 +2690,7 @@ rspamd_upstream_get_token_bucket(struct upstream_list *ups,
                }
 
                rspamd_upstream_ensure_tokens(ups, up);
+               rspamd_upstream_refill_tokens(up, ups->limits, now);
 
                if (up->inflight_tokens < least_loaded_inflight) {
                        least_loaded_inflight = up->inflight_tokens;
@@ -2678,7 +2750,8 @@ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean s
                up->inflight_tokens = 0;
        }
 
-       /* Only restore available tokens on success */
+       /* Only restore available tokens on success; failure relies on lazy
+        * refill below to gradually restore capacity. */
        if (success) {
                up->available_tokens += tokens;
                /* Cap at max tokens */
@@ -2687,6 +2760,13 @@ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean s
                }
        }
 
+       /* Lazy refill makes failure non-permanent: a flapping upstream that
+        * loses tokens to failures regains them over time when the bucket is
+        * touched. */
+       if (ls != NULL) {
+               rspamd_upstream_refill_tokens(up, ls->limits, rspamd_upstream_now(up));
+       }
+
        RSPAMD_UPSTREAM_UNLOCK(up);
        if (ls) {
                RSPAMD_UPSTREAM_UNLOCK(ls);