gsize max_tokens; /* Maximum token capacity */
gsize available_tokens; /* Current available tokens */
gsize inflight_tokens; /* Tokens reserved by in-flight requests */
+ double last_refill_at; /* Last lazy-refill timestamp (ev_now/ticks); 0 = uninit */
#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_t *lock;
#endif
unsigned int dns_retransmits;
/* Token bucket configuration */
- gsize token_bucket_max; /* Max tokens per upstream (default: 10000) */
- gsize token_bucket_scale; /* Bytes per token (default: 1024) */
- gsize token_bucket_min; /* Min tokens for selection (default: 1) */
- gsize token_bucket_base_cost; /* Base cost per request (default: 10) */
+ gsize token_bucket_max; /* Max tokens per upstream (default: 10000) */
+ gsize token_bucket_scale; /* Bytes per token (default: 1024) */
+ gsize token_bucket_min; /* Min tokens for selection (default: 1) */
+ gsize token_bucket_base_cost; /* Base cost per request (default: 10) */
+ gsize token_bucket_refill_per_s; /* Lazy refill rate (default: max/60) */
};
struct upstream_list {
#define DEFAULT_TOKEN_BUCKET_SCALE 1024
#define DEFAULT_TOKEN_BUCKET_MIN 1
#define DEFAULT_TOKEN_BUCKET_BASE_COST 10
+/* Default refill rate: full bucket regenerates in 60s of wall time. */
+#define DEFAULT_TOKEN_BUCKET_REFILL_PER_S (DEFAULT_TOKEN_BUCKET_MAX / 60)
/*
* Initial delay before retrying DNS for a PENDING_RESOLVE upstream, and the
.token_bucket_scale = DEFAULT_TOKEN_BUCKET_SCALE,
.token_bucket_min = DEFAULT_TOKEN_BUCKET_MIN,
.token_bucket_base_cost = DEFAULT_TOKEN_BUCKET_BASE_COST,
+ .token_bucket_refill_per_s = DEFAULT_TOKEN_BUCKET_REFILL_PER_S,
};
static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
up->max_tokens = ups->limits->token_bucket_max;
up->available_tokens = up->max_tokens;
up->inflight_tokens = 0;
+ up->last_refill_at = 0;
}
}
+/*
+ * Lazy time-based refill. Adds floor(dt * refill_per_s) tokens to
+ * available_tokens, capped at max_tokens. Called from selection and return
+ * paths so that an upstream that has been quiet (or that lost tokens to a
+ * failure) gradually regains capacity without any timer fan-out.
+ */
+static inline void
+rspamd_upstream_refill_tokens(struct upstream *up,
+ const struct upstream_limits *limits,
+ double now)
+{
+ gsize add;
+ double dt;
+
+ if (limits->token_bucket_refill_per_s == 0 || up->max_tokens == 0) {
+ up->last_refill_at = now;
+ return;
+ }
+
+ if (up->last_refill_at <= 0) {
+ up->last_refill_at = now;
+ return;
+ }
+
+ dt = now - up->last_refill_at;
+ if (dt <= 0) {
+ return;
+ }
+
+ add = (gsize) (dt * (double) limits->token_bucket_refill_per_s);
+ if (add == 0) {
+ /* Don't update last_refill_at; let small increments accumulate. */
+ return;
+ }
+
+ if (up->available_tokens + add < up->available_tokens) {
+ /* Overflow guard */
+ up->available_tokens = up->max_tokens;
+ }
+ else {
+ up->available_tokens += add;
+ if (up->available_tokens > up->max_tokens) {
+ up->available_tokens = up->max_tokens;
+ }
+ }
+ up->last_refill_at = now;
+}
+
+static inline double
+rspamd_upstream_now(const struct upstream *up)
+{
+ if (up->ctx && up->ctx->event_loop) {
+ return ev_now(up->ctx->event_loop);
+ }
+ return rspamd_get_ticks(FALSE);
+}
+
struct upstream *
rspamd_upstream_get_token_bucket(struct upstream_list *ups,
struct upstream *except,
token_cost = rspamd_upstream_calculate_tokens(ups->limits, message_size);
+ double now;
+ if (ups->ctx && ups->ctx->event_loop) {
+ now = ev_now(ups->ctx->event_loop);
+ }
+ else {
+ now = rspamd_get_ticks(FALSE);
+ }
+
/*
* Linear scan over alive[]: prefer the lowest-inflight upstream that has
* sufficient available tokens. If no upstream is eligible, fall back to
}
rspamd_upstream_ensure_tokens(ups, up);
+ rspamd_upstream_refill_tokens(up, ups->limits, now);
if (up->inflight_tokens < least_loaded_inflight) {
least_loaded_inflight = up->inflight_tokens;
up->inflight_tokens = 0;
}
- /* Only restore available tokens on success */
+ /* Only restore available tokens on success; failure relies on lazy
+ * refill below to gradually restore capacity. */
if (success) {
up->available_tokens += tokens;
/* Cap at max tokens */
}
}
+ /* Lazy refill makes failure non-permanent: a flapping upstream that
+ * loses tokens to failures regains them over time when the bucket is
+ * touched. */
+ if (ls != NULL) {
+ rspamd_upstream_refill_tokens(up, ls->limits, rspamd_upstream_now(up));
+ }
+
RSPAMD_UPSTREAM_UNLOCK(up);
if (ls) {
RSPAMD_UPSTREAM_UNLOCK(ls);