ev_timer ev;
double last_fail;
double last_resolve;
+ /* Probe/half-open state */
+ double next_probe_at;
+ double probe_backoff;
+ unsigned int half_open_inflight;
gpointer ud;
enum rspamd_upstream_flag flags;
struct upstream_list *ls;
double dns_timeout;
double lazy_resolve_time;
double resolve_min_interval;
+ double probe_max_backoff;
+ double probe_jitter;
unsigned int max_errors;
unsigned int dns_retransmits;
};
static const double default_lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME;
#define DEFAULT_RESOLVE_MIN_INTERVAL 60.0
static const double default_resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL;
+#define DEFAULT_PROBE_MAX_BACKOFF 600.0
+static const double default_probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF;
+#define DEFAULT_PROBE_JITTER 0.3
+static const double default_probe_jitter = DEFAULT_PROBE_JITTER;
static const struct upstream_limits default_limits = {
.revive_time = DEFAULT_REVIVE_TIME,
.max_errors = DEFAULT_MAX_ERRORS,
.lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME,
.resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL,
+ .probe_max_backoff = DEFAULT_PROBE_MAX_BACKOFF,
+ .probe_jitter = DEFAULT_PROBE_JITTER,
};
static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
if (cfg->upstream_resolve_min_interval) {
ctx->limits.resolve_min_interval = cfg->upstream_resolve_min_interval;
}
+ if (cfg->upstream_probe_max_backoff) {
+ ctx->limits.probe_max_backoff = cfg->upstream_probe_max_backoff;
+ }
+ if (cfg->upstream_probe_jitter) {
+ ctx->limits.probe_jitter = cfg->upstream_probe_jitter;
+ }
/* Some sanity checks */
if (ctx->limits.resolve_min_interval > ctx->limits.revive_time) {
msg_debug_upstream("mark upstream %s inactive; revive in %.0f seconds",
upstream->name, ntim);
+ /* Initialize probe scheduling */
+ if (upstream->probe_backoff <= 0) {
+ upstream->probe_backoff = ls->limits->revive_time;
+ }
+ if (upstream->ctx && upstream->ctx->event_loop) {
+ upstream->next_probe_at = ev_now(upstream->ctx->event_loop) +
+ rspamd_time_jitter(upstream->probe_backoff,
+ upstream->probe_backoff * ls->limits->probe_jitter);
+ }
+ else {
+ double now = rspamd_get_ticks(FALSE);
+ upstream->next_probe_at = now + rspamd_time_jitter(upstream->probe_backoff,
+ upstream->probe_backoff * ls->limits->probe_jitter);
+ }
ev_timer_init(&upstream->ev, rspamd_upstream_revive_cb, ntim, 0);
upstream->ev.data = upstream;
}
}
+ /* If this was a half-open probe, schedule next probe with backoff */
+ if (upstream->half_open_inflight > 0) {
+ double now = upstream->ctx && upstream->ctx->event_loop ? ev_now(upstream->ctx->event_loop) : rspamd_get_ticks(FALSE);
+ if (upstream->probe_backoff <= 0) {
+ upstream->probe_backoff = upstream->ls->limits->revive_time;
+ }
+ upstream->probe_backoff = MIN(upstream->probe_backoff * 2.0, upstream->ls->limits->probe_max_backoff);
+ upstream->next_probe_at = now + rspamd_time_jitter(upstream->probe_backoff,
+ upstream->probe_backoff * upstream->ls->limits->probe_jitter);
+ upstream->half_open_inflight = 0;
+ }
+
RSPAMD_UPSTREAM_UNLOCK(upstream);
}
}
struct upstream_list_watcher *w;
RSPAMD_UPSTREAM_LOCK(upstream);
+ /* Success handling */
+ if (upstream->half_open_inflight > 0) {
+ /* Successful probe: mark alive and reset backoff */
+ upstream->half_open_inflight = 0;
+ upstream->probe_backoff = upstream->ls ? upstream->ls->limits->revive_time : default_revive_time;
+ upstream->next_probe_at = 0;
+ if (upstream->ls && upstream->active_idx == -1) {
+ /* Activate this upstream */
+ rspamd_upstream_set_active(upstream->ls, upstream);
+ }
+ }
+
if (upstream->errors > 0 && upstream->active_idx != -1 && upstream->ls) {
/* We touch upstream if and only if it is active */
msg_debug_upstream("reset errors on upstream %s (was %ud)", upstream->name, upstream->errors);
RSPAMD_UPSTREAM_LOCK(ups);
if (ups->alive->len == 0) {
- /* No alive upstreams; do not force-revive to avoid tight retry loops */
+ /* Probe mode: find the earliest probe-ready upstream and allow one inflight */
+ double now = ups->ctx && ups->ctx->event_loop ? ev_now(ups->ctx->event_loop) : rspamd_get_ticks(FALSE);
+ struct upstream *candidate = NULL;
+ double min_probe = HUGE_VAL;
+
+ for (unsigned int i = 0; i < ups->ups->len; i++) {
+ struct upstream *cur = g_ptr_array_index(ups->ups, i);
+ if (cur->active_idx >= 0 || (except && cur == except)) {
+ continue;
+ }
+
+ if (cur->next_probe_at == 0) {
+ /* Initialize probe schedule based on revive_time */
+ cur->probe_backoff = cur->probe_backoff > 0 ? cur->probe_backoff : ups->limits->revive_time;
+ cur->next_probe_at = now + rspamd_time_jitter(cur->probe_backoff,
+ cur->probe_backoff * ups->limits->probe_jitter);
+ }
+
+ if (cur->next_probe_at <= now && cur->half_open_inflight == 0) {
+ candidate = cur;
+ break;
+ }
+
+ if (cur->next_probe_at < min_probe) {
+ min_probe = cur->next_probe_at;
+ }
+ }
+
+ if (candidate) {
+ candidate->half_open_inflight = 1; /* allow one request */
+ up = candidate;
+ }
+
RSPAMD_UPSTREAM_UNLOCK(ups);
- return NULL;
+
+ return up; /* can be NULL if not ready yet */
}
RSPAMD_UPSTREAM_UNLOCK(ups);