From: Vsevolod Stakhov Date: Fri, 1 May 2026 08:12:45 +0000 (+0100) Subject: [Fix] upstream: lazy time-based refill for token bucket X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=cdbe58aec9981186961fd55070be703358d125ab;p=thirdparty%2Frspamd.git [Fix] upstream: lazy time-based refill for token bucket return_tokens with success=false decremented inflight but never returned tokens to available_tokens, so a flapping upstream's bucket drained monotonically toward zero and never recovered. Selection then permanently fell into the least-inflight fallback path, defeating the cost signal. Add a real refill rate (token_bucket_refill_per_s, default = max/60 so a quiet bucket fully regenerates in 60s of wall time). Call lazy refill from get_token_bucket and return_tokens; failure no longer permanently penalises the bucket. Within-tick test workloads see dt small enough that floor(dt * rate) == 0, so existing assertions are unaffected. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 6c0673da5e..a43e026497 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -97,6 +97,7 @@ struct upstream { gsize max_tokens; /* Maximum token capacity */ gsize available_tokens; /* Current available tokens */ gsize inflight_tokens; /* Tokens reserved by in-flight requests */ + double last_refill_at; /* Last lazy-refill timestamp (ev_now/ticks); 0 = uninit */ #ifdef UPSTREAMS_THREAD_SAFE rspamd_mutex_t *lock; #endif @@ -115,10 +116,11 @@ struct upstream_limits { unsigned int dns_retransmits; /* Token bucket configuration */ - gsize token_bucket_max; /* Max tokens per upstream (default: 10000) */ - gsize token_bucket_scale; /* Bytes per token (default: 1024) */ - gsize token_bucket_min; /* Min tokens for selection (default: 1) */ - gsize token_bucket_base_cost; /* Base cost per request (default: 10) */ + gsize token_bucket_max; /* Max tokens per upstream (default: 10000) */ + gsize token_bucket_scale; /* Bytes per token (default: 1024) */ + gsize token_bucket_min; /* Min tokens for selection (default: 1) */ + gsize token_bucket_base_cost; /* Base cost per request (default: 10) */ + gsize token_bucket_refill_per_s; /* Lazy refill rate (default: max/60) */ }; struct upstream_list { @@ -206,6 +208,8 @@ static const double default_probe_jitter = DEFAULT_PROBE_JITTER; #define DEFAULT_TOKEN_BUCKET_SCALE 1024 #define DEFAULT_TOKEN_BUCKET_MIN 1 #define DEFAULT_TOKEN_BUCKET_BASE_COST 10 +/* Default refill rate: full bucket regenerates in 60s of wall time. */ +#define DEFAULT_TOKEN_BUCKET_REFILL_PER_S (DEFAULT_TOKEN_BUCKET_MAX / 60) /* * Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the @@ -229,6 +233,7 @@ static const struct upstream_limits default_limits = { .token_bucket_scale = DEFAULT_TOKEN_BUCKET_SCALE, .token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN, .token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST, + .token_bucket_refill_per_s = DEFAULT_TOKEN_BUCKET_REFILL_PER_S, }; static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int); @@ -2571,9 +2576,67 @@ rspamd_upstream_ensure_tokens(struct upstream_list *ups, struct upstream *up) up->max_tokens = ups->limits->token_bucket_max; up->available_tokens = up->max_tokens; up->inflight_tokens = 0; + up->last_refill_at = 0; } } +/* + * Lazy time-based refill. Adds floor(dt * refill_per_s) tokens to + * available_tokens, capped at max_tokens. Called from selection and return + * paths so that an upstream that has been quiet (or that lost tokens to a + * failure) gradually regains capacity without any timer fan-out. + */ +static inline void +rspamd_upstream_refill_tokens(struct upstream *up, + const struct upstream_limits *limits, + double now) +{ + gsize add; + double dt; + + if (limits->token_bucket_refill_per_s == 0 || up->max_tokens == 0) { + up->last_refill_at = now; + return; + } + + if (up->last_refill_at <= 0) { + up->last_refill_at = now; + return; + } + + dt = now - up->last_refill_at; + if (dt <= 0) { + return; + } + + add = (gsize) (dt * (double) limits->token_bucket_refill_per_s); + if (add == 0) { + /* Don't update last_refill_at; let small increments accumulate. */ + return; + } + + if (up->available_tokens + add < up->available_tokens) { + /* Overflow guard */ + up->available_tokens = up->max_tokens; + } + else { + up->available_tokens += add; + if (up->available_tokens > up->max_tokens) { + up->available_tokens = up->max_tokens; + } + } + up->last_refill_at = now; +} + +static inline double +rspamd_upstream_now(const struct upstream *up) +{ + if (up->ctx && up->ctx->event_loop) { + return ev_now(up->ctx->event_loop); + } + return rspamd_get_ticks(FALSE); +} + struct upstream * rspamd_upstream_get_token_bucket(struct upstream_list *ups, struct upstream *except, @@ -2603,6 +2666,14 @@ rspamd_upstream_get_token_bucket(struct upstream_list *ups, token_cost = rspamd_upstream_calculate_tokens(ups->limits, message_size); + double now; + if (ups->ctx && ups->ctx->event_loop) { + now = ev_now(ups->ctx->event_loop); + } + else { + now = rspamd_get_ticks(FALSE); + } + /* * Linear scan over alive[]: prefer the lowest-inflight upstream that has * sufficient available tokens. If no upstream is eligible, fall back to @@ -2619,6 +2690,7 @@ rspamd_upstream_get_token_bucket(struct upstream_list *ups, } rspamd_upstream_ensure_tokens(ups, up); + rspamd_upstream_refill_tokens(up, ups->limits, now); if (up->inflight_tokens < least_loaded_inflight) { least_loaded_inflight = up->inflight_tokens; @@ -2678,7 +2750,8 @@ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean s up->inflight_tokens = 0; } - /* Only restore available tokens on success */ + /* Only restore available tokens on success; failure relies on lazy + * refill below to gradually restore capacity. */ if (success) { up->available_tokens += tokens; /* Cap at max tokens */ @@ -2687,6 +2760,13 @@ void rspamd_upstream_return_tokens(struct upstream *up, gsize tokens, gboolean s } } + /* Lazy refill makes failure non-permanent: a flapping upstream that + * loses tokens to failures regains them over time when the bucket is + * touched. */ + if (ls != NULL) { + rspamd_upstream_refill_tokens(up, ls->limits, rspamd_upstream_now(up)); + } + RSPAMD_UPSTREAM_UNLOCK(up); if (ls) { RSPAMD_UPSTREAM_UNLOCK(ls);