From: Vsevolod Stakhov Date: Sat, 9 May 2026 10:10:16 +0000 (+0100) Subject: [Feature] upstream: expand each SRV target into its own upstream X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=31cf7dfeb447e7c250711fc787300b38666ef1e2;p=thirdparty%2Frspamd.git [Feature] upstream: expand each SRV target into its own upstream The previous SRV path collapsed every target's A/AAAA records into a single struct upstream. SRV weight was dropped on the floor (see the "contradicts with upstreams logic" comment that has been there since forever), the 4-errors-in-10s budget was shared across the whole cluster, and modern selection algorithms (P2C, token bucket, ring hash, slow start, latency EWMA) had nothing to choose from since they operate at the upstream level. Refactor so each SRV reply entry materialises its own struct upstream member. Members are first-class participants in every rotation algorithm, with their own error budget, per-target weight, latency EWMA and address list. The `service=...` config syntax is unchanged. Lifecycle: - Parse-time: parent placeholder gets the SRV_RESOLVE flag and a pre-allocated GHashTable keyed by "fqdn:port". - DNS callback: convert reply entries to plain rspamd_upstream_srv_entry and call the new common rspamd_upstream_srv_apply, which diffs the snapshot against the parent's member set. - New target: create member in PENDING_RESOLVE state, kick off A/AAAA; the existing promote_pending machinery moves it into `alive` once addresses arrive. - Existing target: refresh weight/priority, re-resolve A/AAAA. - Dropped target: graceful drain — pull from `alive`, fire OFFLINE watcher, restore token bucket inflight, remove from ls->ups, then arm a one-shot revive_time timer (reusing revive_cb's is_draining short-circuit) as a grace window for inflight selectors. With no event loop the drain is synchronous. Bookkeeping: SRV parents are invisible to rspamd_upstreams_count, rspamd_upstreams_foreach, and the probe-mode iterator — they're not selectable upstreams. set_active and resolve_addrs short-circuit on the SRV_RESOLVE flag so the parent only owns the lazy-resolve timer. Out of scope (follow-ups): RFC 2782 priority-tier failover (we record srv_priority but don't filter selection by it) and adapting addr_next callers like fuzzy_check to retry across members via get_except. Internal API for tests lives in upstream_internal.h. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index b240d90a60..075aa5b255 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -15,6 +15,7 @@ */ #include "config.h" #include "upstream.h" +#include "upstream_internal.h" #include "ottery.h" #include "ref.h" #include "cfg_file.h" @@ -120,6 +121,36 @@ struct upstream { gsize available_tokens; /* Current available tokens */ gsize inflight_tokens; /* Tokens reserved by in-flight requests */ double last_refill_at; /* Last lazy-refill timestamp (ev_now/ticks); 0 = uninit */ + + /* + * SRV-derived member fields. Set on members (FLAG_SRV_MEMBER); zero + * on non-SRV upstreams and on SRV parents. srv_priority and srv_weight + * mirror RFC 2782 fields from the originating SRV reply entry; they + * survive re-resolves until the corresponding target disappears. + * srv_parent is a weak back-pointer used to remove the member from the + * parent's hash table during drain. It does not hold a ref on the + * parent. + */ + struct upstream *srv_parent; + unsigned int srv_priority; + unsigned int srv_weight; + + /* + * Owned by SRV parents (FLAG_SRV_RESOLVE). Hash table from "fqdn:port" + * to struct upstream * (the member). Used by the re-resolve diff path + * to identify add/remove/update of members. NULL on members and on + * non-SRV upstreams. + */ + GHashTable *srv_members; + + /* + * Tombstone for graceful drain. Set when a member is removed from the + * parent's set (target disappeared from a re-resolve). Once true, the + * revive timer must not re-activate the upstream — it should silently + * release the timer ref and leave the upstream waiting for inflight + * selectors to release their refs, after which the dtor runs. + */ + gboolean is_draining; #ifdef UPSTREAMS_THREAD_SAFE rspamd_mutex_t *lock; #endif @@ -277,6 +308,12 @@ static const struct upstream_limits default_limits = { }; static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int); +static void rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents); +static void rspamd_upstream_dtor(struct upstream *up); +static void rspamd_upstream_resolve_addrs(const struct upstream_list *ls, + struct upstream *upstream); +static void rspamd_upstream_set_inactive(struct upstream_list *ls, + struct upstream *upstream); void rspamd_upstreams_library_config(struct rspamd_config *cfg, struct upstream_ctx *ctx, @@ -449,10 +486,21 @@ rspamd_upstream_addr_sort_func(gconstpointer a, gconstpointer b) static void rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream) { - gboolean is_pending; + gboolean is_pending = FALSE; RSPAMD_UPSTREAM_LOCK(ls); + /* + * SRV parents are placeholders that own member upstreams; they must + * never become selectable. Skip the alive-list bookkeeping but keep + * the lazy-resolve timer setup below — that timer is what drives the + * periodic SRV re-resolution. + */ + if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + upstream->active_idx = -1; + goto schedule_resolve; + } + is_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0; if (!is_pending) { @@ -473,6 +521,7 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream) upstream->active_idx = -1; } +schedule_resolve: if (upstream->ctx && upstream->ctx->configured && !((upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE) || (upstream->flags & RSPAMD_UPSTREAM_FLAG_DNS))) { @@ -662,7 +711,7 @@ rspamd_upstream_promote_pending(struct upstream *upstream) struct upstream_list *ls = upstream->ls; struct upstream_list_watcher *w; - if (ls == NULL) { + if (ls == NULL || upstream->is_draining) { return; } @@ -739,124 +788,442 @@ rspamd_upstream_dns_cb(struct rdns_reply *reply, void *arg) REF_RELEASE(up); } -struct rspamd_upstream_srv_dns_cb { - struct upstream *up; - unsigned int priority; - unsigned int port; - unsigned int requests_inflight; -}; +/* + * Build a stable "fqdn:port" key for indexing SRV members on the parent. + * Allocated with g_malloc; the parent's hash table owns the string and + * frees it via the value-destroy callback registered at hash creation. + */ +static char * +rspamd_upstream_srv_member_key(const char *target, uint16_t port) +{ + return g_strdup_printf("%s:%u", target, (unsigned int) port); +} + +/* + * Create a brand-new SRV member upstream, register it with the parent, + * push it into the upstream list and ctx queue, and kick off A/AAAA + * resolution. The member starts in PENDING_RESOLVE state and becomes + * selectable only after rspamd_upstream_promote_pending fires from + * rspamd_upstream_update_addrs once a non-empty A/AAAA reply arrives. + * + * Caller holds parent's lock. ls and ctx must be non-NULL (i.e. parent + * is still attached to a live list). + */ +static struct upstream * +rspamd_upstream_srv_create_member(struct upstream *parent, + const char *target, + uint16_t port, + uint16_t srv_weight, + uint16_t srv_priority) +{ + struct upstream_list *ls = parent->ls; + struct upstream_ctx *ctx = parent->ctx; + struct upstream *member; + rspamd_mempool_t *pool = ctx ? ctx->pool : NULL; + unsigned int h; + char *key; + + g_assert(ls != NULL); + g_assert(ctx != NULL); + + member = g_malloc0(sizeof(*member)); + member->name = pool ? rspamd_mempool_strdup(pool, target) : g_strdup(target); + member->srv_parent = parent; + member->srv_priority = srv_priority; + member->srv_weight = srv_weight; + /* + * RFC 2782 weight 0 means "rarely used but selectable". We clamp to >=1 + * so the existing weighted-RR path doesn't treat the member as + * effectively disabled. True weight-0 semantics are a follow-up. + */ + member->weight = MAX((unsigned int) srv_weight, 1u); + member->cur_weight = member->weight; + member->deferred_port = port; + member->active_idx = -1; + /* + * Members inherit the list-level flags applied to the parent at + * creation time but never carry the SRV_RESOLVE marker themselves — + * that flag is the parent placeholder's identifying mark. + */ + member->flags = (parent->flags & ~RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) | + RSPAMD_UPSTREAM_FLAG_SRV_MEMBER | + RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE; + + g_ptr_array_add(ls->ups, member); + member->ud = parent->ud; + member->ls = ls; + REF_INIT_RETAIN(member, rspamd_upstream_dtor); +#ifdef UPSTREAMS_THREAD_SAFE + member->lock = rspamd_mutex_new(); +#endif + member->ctx = ctx; + REF_RETAIN(ctx); + g_queue_push_tail(ctx->upstreams, member); + member->ctx_pos = g_queue_peek_tail_link(ctx->upstreams); + + h = rspamd_cryptobox_fast_hash(member->name, strlen(member->name), 0); + memset(member->uid, 0, sizeof(member->uid)); + rspamd_encode_base32_buf((const unsigned char *) &h, sizeof(h), + member->uid, sizeof(member->uid) - 1, + RSPAMD_BASE32_DEFAULT); + + key = rspamd_upstream_srv_member_key(target, port); + g_hash_table_insert(parent->srv_members, key, member); + + { + struct upstream *upstream = member; + msg_info_upstream("created SRV member %s for %s " + "(target=%s port=%ud weight=%ud priority=%ud)", + member->uid, parent->name, target, + (unsigned int) port, + (unsigned int) srv_weight, + (unsigned int) srv_priority); + } + + /* + * set_active arms the lazy-resolve timer; for PENDING_RESOLVE members + * it leaves them out of `alive`. The first successful A/AAAA reply + * promotes the member via rspamd_upstream_promote_pending. + */ + rspamd_upstream_set_active(ls, member); + /* + * Issue the A/AAAA query immediately rather than waiting for the lazy + * timer; users expect new SRV targets to start serving traffic + * promptly after a re-resolve. + */ + rspamd_upstream_resolve_addrs(ls, member); -/* Used when we have resolved SRV record and resolved addrs */ + return member; +} + +/* + * Graceful drain: remove a member from selection, unlink it from the + * parent's hash, drop its presence in `ls->ups`. The set_inactive call + * keeps the upstream pinned in memory until its revive timer fires — + * by that point any in-flight selectors will have called fail/ok/release + * and the dtor can run safely. + * + * The is_draining flag tells revive_cb to skip set_active when the + * timer fires, so the drained member stays out of selection forever. + */ static void -rspamd_upstream_dns_srv_phase2_cb(struct rdns_reply *reply, void *arg) +rspamd_upstream_srv_drain_member(struct upstream *member) { - struct rspamd_upstream_srv_dns_cb *cbdata = - (struct rspamd_upstream_srv_dns_cb *) arg; - struct upstream *up; - struct rdns_reply_entry *entry; - struct upstream_inet_addr_entry *up_ent; + struct upstream *parent = member->srv_parent; + struct upstream_list *ls = member->ls; + struct upstream *upstream = member; /* for the logging macro */ - up = cbdata->up; + if (!(member->flags & RSPAMD_UPSTREAM_FLAG_SRV_MEMBER)) { + return; + } - if (reply->code == RDNS_RC_NOERROR) { - entry = reply->entries; + msg_info_upstream("drain SRV member %s (%s)", + member->uid, member->name); - RSPAMD_UPSTREAM_LOCK(up); - while (entry) { + member->is_draining = TRUE; - if (entry->type == RDNS_REQUEST_A) { - up_ent = g_malloc0(sizeof(*up_ent)); - up_ent->addr = rspamd_inet_address_new(AF_INET, - &entry->content.a.addr); - up_ent->priority = cbdata->priority; - rspamd_inet_address_set_port(up_ent->addr, cbdata->port); - LL_PREPEND(up->new_addrs, up_ent); + /* Stop any timer that might re-activate the member. */ + if (member->ctx && member->ctx->event_loop && ev_can_stop(&member->ev)) { + ev_timer_stop(member->ctx->event_loop, &member->ev); + } + + /* Unlink from the parent's index. */ + if (parent && parent->srv_members) { + GHashTableIter it; + gpointer k, v; + + g_hash_table_iter_init(&it, parent->srv_members); + while (g_hash_table_iter_next(&it, &k, &v)) { + if (v == member) { + g_hash_table_iter_remove(&it); + break; } - else if (entry->type == RDNS_REQUEST_AAAA) { - up_ent = g_malloc0(sizeof(*up_ent)); - up_ent->addr = rspamd_inet_address_new(AF_INET6, - &entry->content.aaa.addr); - up_ent->priority = cbdata->priority; - rspamd_inet_address_set_port(up_ent->addr, cbdata->port); - LL_PREPEND(up->new_addrs, up_ent); + } + } + + /* + * Pull from the alive list directly, mirroring the relevant subset + * of set_inactive. We deliberately do NOT go through set_inactive + * itself: it pins the upstream with REF_RETAIN+revive timer + * expecting the timer to release later, but for a drained member + * there is no "later" — we want the dtor to run as soon as inflight + * selectors release their refs. + */ + if (ls != NULL && member->active_idx != -1) { + struct upstream_list_watcher *w; + + RSPAMD_UPSTREAM_LOCK(ls); + g_ptr_array_remove_index(ls->alive, member->active_idx); + member->active_idx = -1; + ls->ring_dirty = TRUE; + + if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET && + member->inflight_tokens > 0) { + member->available_tokens += member->inflight_tokens; + if (member->available_tokens > member->max_tokens) { + member->available_tokens = member->max_tokens; } - entry = entry->next; + member->inflight_tokens = 0; } - RSPAMD_UPSTREAM_UNLOCK(up); + /* Reindex */ + for (unsigned int i = 0; i < ls->alive->len; i++) { + struct upstream *cur = g_ptr_array_index(ls->alive, i); + cur->active_idx = i; + } + + DL_FOREACH(ls->watchers, w) + { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) { + w->func(member, RSPAMD_UPSTREAM_WATCH_OFFLINE, + member->errors, w->ud); + } + } + + RSPAMD_UPSTREAM_UNLOCK(ls); } - up->dns_requests--; - cbdata->requests_inflight--; + /* Remove from ls->ups so traversal/probe paths stop seeing it. */ + if (ls != NULL) { + for (unsigned int i = 0; i < ls->ups->len; i++) { + if (g_ptr_array_index(ls->ups, i) == member) { + g_ptr_array_remove_index(ls->ups, i); + break; + } + } + } + + /* Sever the parent link so the dtor doesn't re-touch parent state. */ + member->srv_parent = NULL; - if (cbdata->requests_inflight == 0) { - g_free(cbdata); + /* + * Production grace window: a caller may have just received this + * member from rspamd_upstream_get_* and not yet called fail/ok. + * Arm a one-shot timer (reusing revive_cb, which checks is_draining + * and bails to REF_RELEASE on fire) to keep the upstream alive long + * enough for inflight selectors to drain naturally. We use + * revive_time as the grace period — same TTL the rest of the system + * already assumes for inactive upstreams. + * + * Without an event loop (tests, early startup), no inflight is + * possible; we skip the timer and let REF_RELEASE below run the + * dtor synchronously. + */ + if (member->ctx && member->ctx->event_loop && ls != NULL) { + double ntim = rspamd_time_jitter(ls->limits->revive_time, + ls->limits->revive_time * + ls->limits->revive_jitter); + REF_RETAIN(member); + ev_timer_init(&member->ev, rspamd_upstream_revive_cb, ntim, 0); + member->ev.data = member; + if (member->ctx->configured) { + ev_timer_start(member->ctx->event_loop, &member->ev); + } } - if (up->dns_requests == 0) { - rspamd_upstream_update_addrs(up); + /* Release the original creation ref. */ + REF_RELEASE(member); +} + +/* + * Apply a snapshot of SRV targets to a parent. Public-internal entry + * point: takes a plain-data array decoupled from the DNS client struct + * layout, so tests can drive expansion without DNS. + * + * Caller must ensure parent has srv_members allocated (true for any + * upstream created via "service=..."). Acquires/releases the parent + * lock internally. + */ +void rspamd_upstream_srv_apply(struct upstream *parent, + const struct rspamd_upstream_srv_entry *entries, + size_t n) +{ + GHashTable *seen; + GList *to_drain = NULL, *cur; + GHashTableIter iter; + gpointer k, v; + size_t i; + struct upstream *upstream = parent; /* for the logging macro */ + + if (parent == NULL || parent->srv_members == NULL) { + return; } - REF_RELEASE(up); + RSPAMD_UPSTREAM_LOCK(parent); + + seen = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); + + for (i = 0; i < n; i++) { + const struct rspamd_upstream_srv_entry *e = &entries[i]; + struct upstream *existing; + char *key; + + msg_debug_upstream("apply SRV target for %s: %s " + "(weight=%ud priority=%ud port=%ud)", + parent->name, e->target, + (unsigned int) e->weight, + (unsigned int) e->priority, + (unsigned int) e->port); + + key = rspamd_upstream_srv_member_key(e->target, e->port); + existing = g_hash_table_lookup(parent->srv_members, key); + + if (existing != NULL) { + gboolean topology_changed = FALSE; + + if (existing->srv_weight != e->weight) { + existing->srv_weight = e->weight; + existing->weight = MAX((unsigned int) e->weight, 1u); + if (existing->cur_weight == 0) { + existing->cur_weight = existing->weight; + } + topology_changed = TRUE; + } + if (existing->srv_priority != e->priority) { + existing->srv_priority = e->priority; + topology_changed = TRUE; + } + if (topology_changed && parent->ls != NULL) { + parent->ls->ring_dirty = TRUE; + } + /* Refresh A/AAAA so address changes propagate. */ + if (parent->ls != NULL) { + rspamd_upstream_resolve_addrs(parent->ls, existing); + } + } + else { + rspamd_upstream_srv_create_member(parent, e->target, e->port, + e->weight, e->priority); + } + + g_hash_table_insert(seen, key, NULL); + } + + /* Anything in srv_members not in `seen` is gone — drain it. */ + g_hash_table_iter_init(&iter, parent->srv_members); + while (g_hash_table_iter_next(&iter, &k, &v)) { + if (!g_hash_table_contains(seen, k)) { + to_drain = g_list_prepend(to_drain, v); + } + } + + for (cur = to_drain; cur != NULL; cur = cur->next) { + rspamd_upstream_srv_drain_member((struct upstream *) cur->data); + } + g_list_free(to_drain); + g_hash_table_unref(seen); + + RSPAMD_UPSTREAM_UNLOCK(parent); +} + +struct upstream * +rspamd_upstream_srv_test_get_parent(struct upstream_list *ups) +{ + unsigned int i; + + if (ups == NULL) { + return NULL; + } + + for (i = 0; i < ups->ups->len; i++) { + struct upstream *up = g_ptr_array_index(ups->ups, i); + if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + return up; + } + } + + return NULL; +} + +void rspamd_upstream_member_force_alive_for_test(struct upstream *member, + const char *ip_str) +{ + rspamd_inet_addr_t *addr = NULL; + + g_assert(member != NULL); + g_assert(member->flags & RSPAMD_UPSTREAM_FLAG_SRV_MEMBER); + + if (!rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str), + RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + g_assert_not_reached(); + } + + rspamd_inet_address_set_port(addr, member->deferred_port); + rspamd_upstream_add_addr(member, addr); + + /* + * Drop the PENDING_RESOLVE flag and place the member into the alive + * list directly. set_active would re-arm the lazy-resolve timer; in + * tests the runtime has no event loop, so we do the bookkeeping by + * hand to keep the test deterministic. + */ + RSPAMD_UPSTREAM_LOCK(member->ls); + member->flags &= ~RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE; + g_ptr_array_add(member->ls->alive, member); + member->active_idx = member->ls->alive->len - 1; + member->ls->ring_dirty = TRUE; + RSPAMD_UPSTREAM_UNLOCK(member->ls); } +/* + * SRV reply handler: convert the rdns reply into the plain-data entry + * vector and hand it to rspamd_upstream_srv_apply. Each SRV target lives + * as its own struct upstream — own error budget, latency EWMA, + * addresses, and full first-class participation in every selection + * algorithm. + * + * On reply errors (NXDOMAIN, timeout) we deliberately do nothing: the + * existing member set remains in place and the next re-resolve gets to + * try again. Otherwise one bad query would tear down the whole cluster. + */ static void rspamd_upstream_dns_srv_cb(struct rdns_reply *reply, void *arg) { - struct upstream *upstream = (struct upstream *) arg; + struct upstream *parent = (struct upstream *) arg; + struct upstream *upstream = parent; /* for the logging macro */ struct rdns_reply_entry *entry; - struct rspamd_upstream_srv_dns_cb *ncbdata; - if (reply->code == RDNS_RC_NOERROR) { - entry = reply->entries; + if (parent->ls == NULL || parent->is_draining) { + /* Parent destroyed or drained mid-flight; just release the ref. */ + parent->dns_requests--; + REF_RELEASE(parent); + return; + } - RSPAMD_UPSTREAM_LOCK(upstream); - while (entry) { - /* XXX: we ignore weight as it contradicts with upstreams logic */ - if (entry->type == RDNS_REQUEST_SRV) { - msg_debug_upstream("got srv reply for %s: %s " - "(weight=%d, priority=%d, port=%d)", - upstream->name, entry->content.srv.target, - entry->content.srv.weight, entry->content.srv.priority, - entry->content.srv.port); - ncbdata = g_malloc0(sizeof(*ncbdata)); - ncbdata->priority = entry->content.srv.weight; - ncbdata->port = entry->content.srv.port; - /* XXX: for all entries? */ - upstream->ttl = entry->ttl; + if (reply->code == RDNS_RC_NOERROR) { + GArray *flat = g_array_new(FALSE, FALSE, + sizeof(struct rspamd_upstream_srv_entry)); - if (rdns_make_request_full(upstream->ctx->res, - rspamd_upstream_dns_srv_phase2_cb, ncbdata, - upstream->ls->limits->dns_timeout, - upstream->ls->limits->dns_retransmits, - 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) { - upstream->dns_requests++; - REF_RETAIN(upstream); - ncbdata->requests_inflight++; - } + for (entry = reply->entries; entry != NULL; entry = entry->next) { + if (entry->type != RDNS_REQUEST_SRV) { + continue; + } - if (rdns_make_request_full(upstream->ctx->res, - rspamd_upstream_dns_srv_phase2_cb, ncbdata, - upstream->ls->limits->dns_timeout, - upstream->ls->limits->dns_retransmits, - 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) { - upstream->dns_requests++; - REF_RETAIN(upstream); - ncbdata->requests_inflight++; - } + parent->ttl = entry->ttl; - if (ncbdata->requests_inflight == 0) { - g_free(ncbdata); - } - } - entry = entry->next; + struct rspamd_upstream_srv_entry e = { + .target = entry->content.srv.target, + .port = entry->content.srv.port, + .weight = entry->content.srv.weight, + .priority = entry->content.srv.priority, + }; + g_array_append_val(flat, e); } - RSPAMD_UPSTREAM_UNLOCK(upstream); + rspamd_upstream_srv_apply(parent, + (const struct rspamd_upstream_srv_entry *) flat->data, + flat->len); + g_array_free(flat, TRUE); + } + else { + msg_info_upstream("SRV resolution for %s returned %s; keeping " + "existing %ud member(s)", + parent->name, rdns_strerror(reply->code), + parent->srv_members ? (unsigned int) g_hash_table_size(parent->srv_members) : 0u); } - upstream->dns_requests--; - REF_RELEASE(upstream); + parent->dns_requests--; + REF_RELEASE(parent); } static void @@ -867,6 +1234,20 @@ rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents) RSPAMD_UPSTREAM_LOCK(upstream); ev_timer_stop(loop, w); + /* + * Drained SRV members must not re-enter the alive list. The drain + * helper unlinks them from `ls->ups` and `srv_members` and flips this + * flag; the only thing left is to release the timer ref so the dtor + * runs once inflight selectors release theirs. + */ + if (upstream->is_draining) { + msg_debug_upstream("skip revive for drained upstream %s", + upstream->name); + RSPAMD_UPSTREAM_UNLOCK(upstream); + REF_RELEASE(upstream); + return; + } + msg_debug_upstream("revive upstream %s", upstream->name); if (upstream->ls) { @@ -884,6 +1265,15 @@ static void rspamd_upstream_resolve_addrs(const struct upstream_list *ls, struct upstream *upstream) { + /* + * Drained SRV members and SRV parents must never resolve. Parents + * resolve SRV through the dedicated path below; the early bail here + * matters when set_inactive is invoked on a draining member: it would + * otherwise re-issue A/AAAA and pollute the just-released state. + */ + if (upstream->is_draining) { + return; + } if ((upstream->flags & RSPAMD_UPSTREAM_FLAG_DNS)) { /* For DNS upstreams: resolve synchronously using getaddrinfo if name */ @@ -1398,7 +1788,26 @@ rspamd_upstreams_create(struct upstream_ctx *ctx) gsize rspamd_upstreams_count(struct upstream_list *ups) { - return ups != NULL ? ups->ups->len : 0; + gsize n = 0; + unsigned int i; + struct upstream *up; + + if (ups == NULL) { + return 0; + } + + /* + * SRV parents are placeholders, not selectable upstreams. Count only + * first-class entries so callers see the real cluster size. + */ + for (i = 0; i < ups->ups->len; i++) { + up = g_ptr_array_index(ups->ups, i); + if (!(up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE)) { + n++; + } + } + + return n; } gsize rspamd_upstreams_alive(struct upstream_list *ups) @@ -1424,6 +1833,18 @@ rspamd_upstream_dtor(struct upstream *up) g_ptr_array_free(up->addrs.addr, TRUE); } + /* + * SRV parents own a hash table of "fqdn:port" → member upstream. The + * members themselves carry their own refs and are freed independently + * once their drain ref counts reach zero, so we only release the hash + * itself here. Drain on rspamd_upstreams_destroy already cleared the + * entries by the time the parent's dtor runs. + */ + if (up->srv_members != NULL) { + g_hash_table_unref(up->srv_members); + up->srv_members = NULL; + } + #ifdef UPSTREAMS_THREAD_SAFE rspamd_mutex_free(up->lock); #endif @@ -1650,6 +2071,15 @@ rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str, (int) (plus_pos - service_pos), service_pos, (int) (semicolon_pos - (plus_pos + 1)), plus_pos + 1); upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE; + /* + * Pre-allocate the member index. The hash owns the keys + * (g_free destructor) and not the values; member upstreams + * are released through their own ref counts during drain + * or list teardown. + */ + upstream->srv_members = g_hash_table_new_full(g_str_hash, + g_str_equal, + g_free, NULL); ret = RSPAMD_PARSE_ADDR_RESOLVED; if (ups->ctx) { @@ -2497,6 +2927,13 @@ rspamd_upstream_get_common(struct upstream_list *ups, if (cur->active_idx >= 0 || (except && cur == except)) { continue; } + /* + * SRV parents never enter selection; they only own member + * upstreams. Skip them so probe mode considers real backends. + */ + if (cur->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + continue; + } /* * Pending-resolve upstreams have no addresses yet — they can't * be probed. The lazy resolver will move them into `alive` once @@ -2662,12 +3099,22 @@ void rspamd_upstreams_foreach(struct upstream_list *ups, rspamd_upstream_traverse_func cb, void *ud) { struct upstream *up; - unsigned int i; + unsigned int i, idx = 0; for (i = 0; i < ups->ups->len; i++) { up = g_ptr_array_index(ups->ups, i); - cb(up, i, ud); + /* + * Skip SRV parent placeholders — they aren't selectable + * upstreams and exposing them to consumers (foreach is the + * public iteration API) would surprise callers that expect + * to enumerate real backends. + */ + if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + continue; + } + + cb(up, idx++, ud); } } diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index d8b58b6ee1..7f61a2de8a 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -48,6 +48,12 @@ enum rspamd_upstream_rotation { enum rspamd_upstream_flag { RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0), + /* + * Upstream is an SRV "parent": a placeholder that owns SRV-derived + * member upstreams. Never put into the alive list, never selected + * directly; its only job is to host the lazy SRV re-resolve timer + * and a hash table of members keyed by "fqdn:port". + */ RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1), RSPAMD_UPSTREAM_FLAG_DNS = (1 << 2), /* @@ -57,6 +63,16 @@ enum rspamd_upstream_flag { * and the upstream becomes selectable. */ RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE = (1 << 3), + /* + * SRV-derived member: a first-class upstream created by expanding a + * single SRV target into its own struct upstream. Holds its own error + * budget, latency EWMA, weight, addresses, and is selectable via the + * normal rotation algorithms. Lifetime is bound to the parent's SRV + * member table; on graceful drain (target gone from re-resolve), the + * member is removed from selection and released after its inflight + * refs drop. + */ + RSPAMD_UPSTREAM_FLAG_SRV_MEMBER = (1 << 4), }; struct rspamd_config; diff --git a/src/libutil/upstream_internal.h b/src/libutil/upstream_internal.h new file mode 100644 index 0000000000..d9ff21826b --- /dev/null +++ b/src/libutil/upstream_internal.h @@ -0,0 +1,81 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Internal hooks into the upstream subsystem. NOT a public API — the + * stability story is "tests and same-tree consumers only". The public + * surface lives in upstream.h. + */ +#ifndef RSPAMD_UPSTREAM_INTERNAL_H +#define RSPAMD_UPSTREAM_INTERNAL_H + +#include "config.h" +#include "upstream.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Plain-data view of one SRV target. Used both by the real DNS callback + * (built from rdns_reply_entry) and the test entry point — keeping the + * diff logic agnostic of the DNS client struct layout. + */ +struct rspamd_upstream_srv_entry { + const char *target; + uint16_t port; + uint16_t weight; + uint16_t priority; +}; + +/* + * Apply a snapshot of SRV targets to a parent upstream: + * - new keys → create member upstream + * - existing keys → refresh weight/priority and re-resolve A/AAAA + * - keys present on parent but missing in `entries` → graceful drain + * + * Caller must own `parent` (refcount keeps it alive); the parent must + * have been created with the SRV_RESOLVE flag (e.g. via + * `rspamd_upstreams_add_upstream` with a "service=..." string). + * + * Tests use this to drive SRV expansion deterministically without DNS. + */ +void rspamd_upstream_srv_apply(struct upstream *parent, + const struct rspamd_upstream_srv_entry *entries, + size_t n); + +/* + * Force a freshly-created SRV member out of PENDING_RESOLVE into the + * alive list with a single synthetic loopback address. Test-only: + * production code never bypasses DNS like this. `ip_str` must be a + * numeric IPv4 / IPv6 literal (parsed by rspamd_parse_inet_address). + */ +void rspamd_upstream_member_force_alive_for_test(struct upstream *member, + const char *ip_str); + +/* + * Return the first SRV parent placeholder in the list, or NULL if none. + * Test-only: SRV parents are deliberately invisible to the public + * iteration APIs (foreach, count) since they aren't selectable + * upstreams. Tests need a way to reach the parent for srv_apply. + */ +struct upstream *rspamd_upstream_srv_test_get_parent(struct upstream_list *ups); + +#ifdef __cplusplus +} +#endif + +#endif /* RSPAMD_UPSTREAM_INTERNAL_H */