*/
#include "config.h"
#include "upstream.h"
+#include "upstream_internal.h"
#include "ottery.h"
#include "ref.h"
#include "cfg_file.h"
gsize available_tokens; /* Current available tokens */
gsize inflight_tokens; /* Tokens reserved by in-flight requests */
double last_refill_at; /* Last lazy-refill timestamp (ev_now/ticks); 0 = uninit */
+
+ /*
+ * SRV-derived member fields. Set on members (FLAG_SRV_MEMBER); zero
+ * on non-SRV upstreams and on SRV parents. srv_priority and srv_weight
+ * mirror RFC 2782 fields from the originating SRV reply entry; they
+ * survive re-resolves until the corresponding target disappears.
+ * srv_parent is a weak back-pointer used to remove the member from the
+ * parent's hash table during drain. It does not hold a ref on the
+ * parent.
+ */
+ struct upstream *srv_parent;
+ unsigned int srv_priority;
+ unsigned int srv_weight;
+
+ /*
+ * Owned by SRV parents (FLAG_SRV_RESOLVE). Hash table from "fqdn:port"
+ * to struct upstream * (the member). Used by the re-resolve diff path
+ * to identify add/remove/update of members. NULL on members and on
+ * non-SRV upstreams.
+ */
+ GHashTable *srv_members;
+
+ /*
+ * Tombstone for graceful drain. Set when a member is removed from the
+ * parent's set (target disappeared from a re-resolve). Once true, the
+ * revive timer must not re-activate the upstream — it should silently
+ * release the timer ref and leave the upstream waiting for inflight
+ * selectors to release their refs, after which the dtor runs.
+ */
+ gboolean is_draining;
#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_t *lock;
#endif
};
static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
+static void rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents);
+static void rspamd_upstream_dtor(struct upstream *up);
+static void rspamd_upstream_resolve_addrs(const struct upstream_list *ls,
+ struct upstream *upstream);
+static void rspamd_upstream_set_inactive(struct upstream_list *ls,
+ struct upstream *upstream);
void rspamd_upstreams_library_config(struct rspamd_config *cfg,
struct upstream_ctx *ctx,
static void
rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
{
- gboolean is_pending;
+ gboolean is_pending = FALSE;
RSPAMD_UPSTREAM_LOCK(ls);
+ /*
+ * SRV parents are placeholders that own member upstreams; they must
+ * never become selectable. Skip the alive-list bookkeeping but keep
+ * the lazy-resolve timer setup below — that timer is what drives the
+ * periodic SRV re-resolution.
+ */
+ if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ upstream->active_idx = -1;
+ goto schedule_resolve;
+ }
+
is_pending = (upstream->flags & RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE) != 0;
if (!is_pending) {
upstream->active_idx = -1;
}
+schedule_resolve:
if (upstream->ctx && upstream->ctx->configured &&
!((upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE) ||
(upstream->flags & RSPAMD_UPSTREAM_FLAG_DNS))) {
struct upstream_list *ls = upstream->ls;
struct upstream_list_watcher *w;
- if (ls == NULL) {
+ if (ls == NULL || upstream->is_draining) {
return;
}
REF_RELEASE(up);
}
-struct rspamd_upstream_srv_dns_cb {
- struct upstream *up;
- unsigned int priority;
- unsigned int port;
- unsigned int requests_inflight;
-};
+/*
+ * Build a stable "fqdn:port" key for indexing SRV members on the parent.
+ * Allocated with g_malloc; the parent's hash table owns the string and
+ * frees it via the value-destroy callback registered at hash creation.
+ */
+static char *
+rspamd_upstream_srv_member_key(const char *target, uint16_t port)
+{
+ return g_strdup_printf("%s:%u", target, (unsigned int) port);
+}
+
+/*
+ * Create a brand-new SRV member upstream, register it with the parent,
+ * push it into the upstream list and ctx queue, and kick off A/AAAA
+ * resolution. The member starts in PENDING_RESOLVE state and becomes
+ * selectable only after rspamd_upstream_promote_pending fires from
+ * rspamd_upstream_update_addrs once a non-empty A/AAAA reply arrives.
+ *
+ * Caller holds parent's lock. ls and ctx must be non-NULL (i.e. parent
+ * is still attached to a live list).
+ */
+static struct upstream *
+rspamd_upstream_srv_create_member(struct upstream *parent,
+ const char *target,
+ uint16_t port,
+ uint16_t srv_weight,
+ uint16_t srv_priority)
+{
+ struct upstream_list *ls = parent->ls;
+ struct upstream_ctx *ctx = parent->ctx;
+ struct upstream *member;
+ rspamd_mempool_t *pool = ctx ? ctx->pool : NULL;
+ unsigned int h;
+ char *key;
+
+ g_assert(ls != NULL);
+ g_assert(ctx != NULL);
+
+ member = g_malloc0(sizeof(*member));
+ member->name = pool ? rspamd_mempool_strdup(pool, target) : g_strdup(target);
+ member->srv_parent = parent;
+ member->srv_priority = srv_priority;
+ member->srv_weight = srv_weight;
+ /*
+ * RFC 2782 weight 0 means "rarely used but selectable". We clamp to >=1
+ * so the existing weighted-RR path doesn't treat the member as
+ * effectively disabled. True weight-0 semantics are a follow-up.
+ */
+ member->weight = MAX((unsigned int) srv_weight, 1u);
+ member->cur_weight = member->weight;
+ member->deferred_port = port;
+ member->active_idx = -1;
+ /*
+ * Members inherit the list-level flags applied to the parent at
+ * creation time but never carry the SRV_RESOLVE marker themselves —
+ * that flag is the parent placeholder's identifying mark.
+ */
+ member->flags = (parent->flags & ~RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) |
+ RSPAMD_UPSTREAM_FLAG_SRV_MEMBER |
+ RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE;
+
+ g_ptr_array_add(ls->ups, member);
+ member->ud = parent->ud;
+ member->ls = ls;
+ REF_INIT_RETAIN(member, rspamd_upstream_dtor);
+#ifdef UPSTREAMS_THREAD_SAFE
+ member->lock = rspamd_mutex_new();
+#endif
+ member->ctx = ctx;
+ REF_RETAIN(ctx);
+ g_queue_push_tail(ctx->upstreams, member);
+ member->ctx_pos = g_queue_peek_tail_link(ctx->upstreams);
+
+ h = rspamd_cryptobox_fast_hash(member->name, strlen(member->name), 0);
+ memset(member->uid, 0, sizeof(member->uid));
+ rspamd_encode_base32_buf((const unsigned char *) &h, sizeof(h),
+ member->uid, sizeof(member->uid) - 1,
+ RSPAMD_BASE32_DEFAULT);
+
+ key = rspamd_upstream_srv_member_key(target, port);
+ g_hash_table_insert(parent->srv_members, key, member);
+
+ {
+ struct upstream *upstream = member;
+ msg_info_upstream("created SRV member %s for %s "
+ "(target=%s port=%ud weight=%ud priority=%ud)",
+ member->uid, parent->name, target,
+ (unsigned int) port,
+ (unsigned int) srv_weight,
+ (unsigned int) srv_priority);
+ }
+
+ /*
+ * set_active arms the lazy-resolve timer; for PENDING_RESOLVE members
+ * it leaves them out of `alive`. The first successful A/AAAA reply
+ * promotes the member via rspamd_upstream_promote_pending.
+ */
+ rspamd_upstream_set_active(ls, member);
+ /*
+ * Issue the A/AAAA query immediately rather than waiting for the lazy
+ * timer; users expect new SRV targets to start serving traffic
+ * promptly after a re-resolve.
+ */
+ rspamd_upstream_resolve_addrs(ls, member);
-/* Used when we have resolved SRV record and resolved addrs */
+ return member;
+}
+
+/*
+ * Graceful drain: remove a member from selection, unlink it from the
+ * parent's hash, drop its presence in `ls->ups`. The set_inactive call
+ * keeps the upstream pinned in memory until its revive timer fires —
+ * by that point any in-flight selectors will have called fail/ok/release
+ * and the dtor can run safely.
+ *
+ * The is_draining flag tells revive_cb to skip set_active when the
+ * timer fires, so the drained member stays out of selection forever.
+ */
static void
-rspamd_upstream_dns_srv_phase2_cb(struct rdns_reply *reply, void *arg)
+rspamd_upstream_srv_drain_member(struct upstream *member)
{
- struct rspamd_upstream_srv_dns_cb *cbdata =
- (struct rspamd_upstream_srv_dns_cb *) arg;
- struct upstream *up;
- struct rdns_reply_entry *entry;
- struct upstream_inet_addr_entry *up_ent;
+ struct upstream *parent = member->srv_parent;
+ struct upstream_list *ls = member->ls;
+ struct upstream *upstream = member; /* for the logging macro */
- up = cbdata->up;
+ if (!(member->flags & RSPAMD_UPSTREAM_FLAG_SRV_MEMBER)) {
+ return;
+ }
- if (reply->code == RDNS_RC_NOERROR) {
- entry = reply->entries;
+ msg_info_upstream("drain SRV member %s (%s)",
+ member->uid, member->name);
- RSPAMD_UPSTREAM_LOCK(up);
- while (entry) {
+ member->is_draining = TRUE;
- if (entry->type == RDNS_REQUEST_A) {
- up_ent = g_malloc0(sizeof(*up_ent));
- up_ent->addr = rspamd_inet_address_new(AF_INET,
- &entry->content.a.addr);
- up_ent->priority = cbdata->priority;
- rspamd_inet_address_set_port(up_ent->addr, cbdata->port);
- LL_PREPEND(up->new_addrs, up_ent);
+ /* Stop any timer that might re-activate the member. */
+ if (member->ctx && member->ctx->event_loop && ev_can_stop(&member->ev)) {
+ ev_timer_stop(member->ctx->event_loop, &member->ev);
+ }
+
+ /* Unlink from the parent's index. */
+ if (parent && parent->srv_members) {
+ GHashTableIter it;
+ gpointer k, v;
+
+ g_hash_table_iter_init(&it, parent->srv_members);
+ while (g_hash_table_iter_next(&it, &k, &v)) {
+ if (v == member) {
+ g_hash_table_iter_remove(&it);
+ break;
}
- else if (entry->type == RDNS_REQUEST_AAAA) {
- up_ent = g_malloc0(sizeof(*up_ent));
- up_ent->addr = rspamd_inet_address_new(AF_INET6,
- &entry->content.aaa.addr);
- up_ent->priority = cbdata->priority;
- rspamd_inet_address_set_port(up_ent->addr, cbdata->port);
- LL_PREPEND(up->new_addrs, up_ent);
+ }
+ }
+
+ /*
+ * Pull from the alive list directly, mirroring the relevant subset
+ * of set_inactive. We deliberately do NOT go through set_inactive
+ * itself: it pins the upstream with REF_RETAIN+revive timer
+ * expecting the timer to release later, but for a drained member
+ * there is no "later" — we want the dtor to run as soon as inflight
+ * selectors release their refs.
+ */
+ if (ls != NULL && member->active_idx != -1) {
+ struct upstream_list_watcher *w;
+
+ RSPAMD_UPSTREAM_LOCK(ls);
+ g_ptr_array_remove_index(ls->alive, member->active_idx);
+ member->active_idx = -1;
+ ls->ring_dirty = TRUE;
+
+ if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET &&
+ member->inflight_tokens > 0) {
+ member->available_tokens += member->inflight_tokens;
+ if (member->available_tokens > member->max_tokens) {
+ member->available_tokens = member->max_tokens;
}
- entry = entry->next;
+ member->inflight_tokens = 0;
}
- RSPAMD_UPSTREAM_UNLOCK(up);
+ /* Reindex */
+ for (unsigned int i = 0; i < ls->alive->len; i++) {
+ struct upstream *cur = g_ptr_array_index(ls->alive, i);
+ cur->active_idx = i;
+ }
+
+ DL_FOREACH(ls->watchers, w)
+ {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
+ w->func(member, RSPAMD_UPSTREAM_WATCH_OFFLINE,
+ member->errors, w->ud);
+ }
+ }
+
+ RSPAMD_UPSTREAM_UNLOCK(ls);
}
- up->dns_requests--;
- cbdata->requests_inflight--;
+ /* Remove from ls->ups so traversal/probe paths stop seeing it. */
+ if (ls != NULL) {
+ for (unsigned int i = 0; i < ls->ups->len; i++) {
+ if (g_ptr_array_index(ls->ups, i) == member) {
+ g_ptr_array_remove_index(ls->ups, i);
+ break;
+ }
+ }
+ }
+
+ /* Sever the parent link so the dtor doesn't re-touch parent state. */
+ member->srv_parent = NULL;
- if (cbdata->requests_inflight == 0) {
- g_free(cbdata);
+ /*
+ * Production grace window: a caller may have just received this
+ * member from rspamd_upstream_get_* and not yet called fail/ok.
+ * Arm a one-shot timer (reusing revive_cb, which checks is_draining
+ * and bails to REF_RELEASE on fire) to keep the upstream alive long
+ * enough for inflight selectors to drain naturally. We use
+ * revive_time as the grace period — same TTL the rest of the system
+ * already assumes for inactive upstreams.
+ *
+ * Without an event loop (tests, early startup), no inflight is
+ * possible; we skip the timer and let REF_RELEASE below run the
+ * dtor synchronously.
+ */
+ if (member->ctx && member->ctx->event_loop && ls != NULL) {
+ double ntim = rspamd_time_jitter(ls->limits->revive_time,
+ ls->limits->revive_time *
+ ls->limits->revive_jitter);
+ REF_RETAIN(member);
+ ev_timer_init(&member->ev, rspamd_upstream_revive_cb, ntim, 0);
+ member->ev.data = member;
+ if (member->ctx->configured) {
+ ev_timer_start(member->ctx->event_loop, &member->ev);
+ }
}
- if (up->dns_requests == 0) {
- rspamd_upstream_update_addrs(up);
+ /* Release the original creation ref. */
+ REF_RELEASE(member);
+}
+
+/*
+ * Apply a snapshot of SRV targets to a parent. Public-internal entry
+ * point: takes a plain-data array decoupled from the DNS client struct
+ * layout, so tests can drive expansion without DNS.
+ *
+ * Caller must ensure parent has srv_members allocated (true for any
+ * upstream created via "service=..."). Acquires/releases the parent
+ * lock internally.
+ */
+void rspamd_upstream_srv_apply(struct upstream *parent,
+ const struct rspamd_upstream_srv_entry *entries,
+ size_t n)
+{
+ GHashTable *seen;
+ GList *to_drain = NULL, *cur;
+ GHashTableIter iter;
+ gpointer k, v;
+ size_t i;
+ struct upstream *upstream = parent; /* for the logging macro */
+
+ if (parent == NULL || parent->srv_members == NULL) {
+ return;
}
- REF_RELEASE(up);
+ RSPAMD_UPSTREAM_LOCK(parent);
+
+ seen = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
+
+ for (i = 0; i < n; i++) {
+ const struct rspamd_upstream_srv_entry *e = &entries[i];
+ struct upstream *existing;
+ char *key;
+
+ msg_debug_upstream("apply SRV target for %s: %s "
+ "(weight=%ud priority=%ud port=%ud)",
+ parent->name, e->target,
+ (unsigned int) e->weight,
+ (unsigned int) e->priority,
+ (unsigned int) e->port);
+
+ key = rspamd_upstream_srv_member_key(e->target, e->port);
+ existing = g_hash_table_lookup(parent->srv_members, key);
+
+ if (existing != NULL) {
+ gboolean topology_changed = FALSE;
+
+ if (existing->srv_weight != e->weight) {
+ existing->srv_weight = e->weight;
+ existing->weight = MAX((unsigned int) e->weight, 1u);
+ if (existing->cur_weight == 0) {
+ existing->cur_weight = existing->weight;
+ }
+ topology_changed = TRUE;
+ }
+ if (existing->srv_priority != e->priority) {
+ existing->srv_priority = e->priority;
+ topology_changed = TRUE;
+ }
+ if (topology_changed && parent->ls != NULL) {
+ parent->ls->ring_dirty = TRUE;
+ }
+ /* Refresh A/AAAA so address changes propagate. */
+ if (parent->ls != NULL) {
+ rspamd_upstream_resolve_addrs(parent->ls, existing);
+ }
+ }
+ else {
+ rspamd_upstream_srv_create_member(parent, e->target, e->port,
+ e->weight, e->priority);
+ }
+
+ g_hash_table_insert(seen, key, NULL);
+ }
+
+ /* Anything in srv_members not in `seen` is gone — drain it. */
+ g_hash_table_iter_init(&iter, parent->srv_members);
+ while (g_hash_table_iter_next(&iter, &k, &v)) {
+ if (!g_hash_table_contains(seen, k)) {
+ to_drain = g_list_prepend(to_drain, v);
+ }
+ }
+
+ for (cur = to_drain; cur != NULL; cur = cur->next) {
+ rspamd_upstream_srv_drain_member((struct upstream *) cur->data);
+ }
+ g_list_free(to_drain);
+ g_hash_table_unref(seen);
+
+ RSPAMD_UPSTREAM_UNLOCK(parent);
+}
+
+struct upstream *
+rspamd_upstream_srv_test_get_parent(struct upstream_list *ups)
+{
+ unsigned int i;
+
+ if (ups == NULL) {
+ return NULL;
+ }
+
+ for (i = 0; i < ups->ups->len; i++) {
+ struct upstream *up = g_ptr_array_index(ups->ups, i);
+ if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ return up;
+ }
+ }
+
+ return NULL;
+}
+
+void rspamd_upstream_member_force_alive_for_test(struct upstream *member,
+ const char *ip_str)
+{
+ rspamd_inet_addr_t *addr = NULL;
+
+ g_assert(member != NULL);
+ g_assert(member->flags & RSPAMD_UPSTREAM_FLAG_SRV_MEMBER);
+
+ if (!rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str),
+ RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
+ g_assert_not_reached();
+ }
+
+ rspamd_inet_address_set_port(addr, member->deferred_port);
+ rspamd_upstream_add_addr(member, addr);
+
+ /*
+ * Drop the PENDING_RESOLVE flag and place the member into the alive
+ * list directly. set_active would re-arm the lazy-resolve timer; in
+ * tests the runtime has no event loop, so we do the bookkeeping by
+ * hand to keep the test deterministic.
+ */
+ RSPAMD_UPSTREAM_LOCK(member->ls);
+ member->flags &= ~RSPAMD_UPSTREAM_FLAG_PENDING_RESOLVE;
+ g_ptr_array_add(member->ls->alive, member);
+ member->active_idx = member->ls->alive->len - 1;
+ member->ls->ring_dirty = TRUE;
+ RSPAMD_UPSTREAM_UNLOCK(member->ls);
}
+/*
+ * SRV reply handler: convert the rdns reply into the plain-data entry
+ * vector and hand it to rspamd_upstream_srv_apply. Each SRV target lives
+ * as its own struct upstream — own error budget, latency EWMA,
+ * addresses, and full first-class participation in every selection
+ * algorithm.
+ *
+ * On reply errors (NXDOMAIN, timeout) we deliberately do nothing: the
+ * existing member set remains in place and the next re-resolve gets to
+ * try again. Otherwise one bad query would tear down the whole cluster.
+ */
static void
rspamd_upstream_dns_srv_cb(struct rdns_reply *reply, void *arg)
{
- struct upstream *upstream = (struct upstream *) arg;
+ struct upstream *parent = (struct upstream *) arg;
+ struct upstream *upstream = parent; /* for the logging macro */
struct rdns_reply_entry *entry;
- struct rspamd_upstream_srv_dns_cb *ncbdata;
- if (reply->code == RDNS_RC_NOERROR) {
- entry = reply->entries;
+ if (parent->ls == NULL || parent->is_draining) {
+ /* Parent destroyed or drained mid-flight; just release the ref. */
+ parent->dns_requests--;
+ REF_RELEASE(parent);
+ return;
+ }
- RSPAMD_UPSTREAM_LOCK(upstream);
- while (entry) {
- /* XXX: we ignore weight as it contradicts with upstreams logic */
- if (entry->type == RDNS_REQUEST_SRV) {
- msg_debug_upstream("got srv reply for %s: %s "
- "(weight=%d, priority=%d, port=%d)",
- upstream->name, entry->content.srv.target,
- entry->content.srv.weight, entry->content.srv.priority,
- entry->content.srv.port);
- ncbdata = g_malloc0(sizeof(*ncbdata));
- ncbdata->priority = entry->content.srv.weight;
- ncbdata->port = entry->content.srv.port;
- /* XXX: for all entries? */
- upstream->ttl = entry->ttl;
+ if (reply->code == RDNS_RC_NOERROR) {
+ GArray *flat = g_array_new(FALSE, FALSE,
+ sizeof(struct rspamd_upstream_srv_entry));
- if (rdns_make_request_full(upstream->ctx->res,
- rspamd_upstream_dns_srv_phase2_cb, ncbdata,
- upstream->ls->limits->dns_timeout,
- upstream->ls->limits->dns_retransmits,
- 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
- upstream->dns_requests++;
- REF_RETAIN(upstream);
- ncbdata->requests_inflight++;
- }
+ for (entry = reply->entries; entry != NULL; entry = entry->next) {
+ if (entry->type != RDNS_REQUEST_SRV) {
+ continue;
+ }
- if (rdns_make_request_full(upstream->ctx->res,
- rspamd_upstream_dns_srv_phase2_cb, ncbdata,
- upstream->ls->limits->dns_timeout,
- upstream->ls->limits->dns_retransmits,
- 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
- upstream->dns_requests++;
- REF_RETAIN(upstream);
- ncbdata->requests_inflight++;
- }
+ parent->ttl = entry->ttl;
- if (ncbdata->requests_inflight == 0) {
- g_free(ncbdata);
- }
- }
- entry = entry->next;
+ struct rspamd_upstream_srv_entry e = {
+ .target = entry->content.srv.target,
+ .port = entry->content.srv.port,
+ .weight = entry->content.srv.weight,
+ .priority = entry->content.srv.priority,
+ };
+ g_array_append_val(flat, e);
}
- RSPAMD_UPSTREAM_UNLOCK(upstream);
+ rspamd_upstream_srv_apply(parent,
+ (const struct rspamd_upstream_srv_entry *) flat->data,
+ flat->len);
+ g_array_free(flat, TRUE);
+ }
+ else {
+ msg_info_upstream("SRV resolution for %s returned %s; keeping "
+ "existing %ud member(s)",
+ parent->name, rdns_strerror(reply->code),
+ parent->srv_members ? (unsigned int) g_hash_table_size(parent->srv_members) : 0u);
}
- upstream->dns_requests--;
- REF_RELEASE(upstream);
+ parent->dns_requests--;
+ REF_RELEASE(parent);
}
static void
RSPAMD_UPSTREAM_LOCK(upstream);
ev_timer_stop(loop, w);
+ /*
+ * Drained SRV members must not re-enter the alive list. The drain
+ * helper unlinks them from `ls->ups` and `srv_members` and flips this
+ * flag; the only thing left is to release the timer ref so the dtor
+ * runs once inflight selectors release theirs.
+ */
+ if (upstream->is_draining) {
+ msg_debug_upstream("skip revive for drained upstream %s",
+ upstream->name);
+ RSPAMD_UPSTREAM_UNLOCK(upstream);
+ REF_RELEASE(upstream);
+ return;
+ }
+
msg_debug_upstream("revive upstream %s", upstream->name);
if (upstream->ls) {
rspamd_upstream_resolve_addrs(const struct upstream_list *ls,
struct upstream *upstream)
{
+ /*
+ * Drained SRV members and SRV parents must never resolve. Parents
+ * resolve SRV through the dedicated path below; the early bail here
+ * matters when set_inactive is invoked on a draining member: it would
+ * otherwise re-issue A/AAAA and pollute the just-released state.
+ */
+ if (upstream->is_draining) {
+ return;
+ }
if ((upstream->flags & RSPAMD_UPSTREAM_FLAG_DNS)) {
/* For DNS upstreams: resolve synchronously using getaddrinfo if name */
gsize rspamd_upstreams_count(struct upstream_list *ups)
{
- return ups != NULL ? ups->ups->len : 0;
+ gsize n = 0;
+ unsigned int i;
+ struct upstream *up;
+
+ if (ups == NULL) {
+ return 0;
+ }
+
+ /*
+ * SRV parents are placeholders, not selectable upstreams. Count only
+ * first-class entries so callers see the real cluster size.
+ */
+ for (i = 0; i < ups->ups->len; i++) {
+ up = g_ptr_array_index(ups->ups, i);
+ if (!(up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE)) {
+ n++;
+ }
+ }
+
+ return n;
}
gsize rspamd_upstreams_alive(struct upstream_list *ups)
g_ptr_array_free(up->addrs.addr, TRUE);
}
+ /*
+ * SRV parents own a hash table of "fqdn:port" → member upstream. The
+ * members themselves carry their own refs and are freed independently
+ * once their drain ref counts reach zero, so we only release the hash
+ * itself here. Drain on rspamd_upstreams_destroy already cleared the
+ * entries by the time the parent's dtor runs.
+ */
+ if (up->srv_members != NULL) {
+ g_hash_table_unref(up->srv_members);
+ up->srv_members = NULL;
+ }
+
#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_free(up->lock);
#endif
(int) (plus_pos - service_pos), service_pos,
(int) (semicolon_pos - (plus_pos + 1)), plus_pos + 1);
upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
+ /*
+ * Pre-allocate the member index. The hash owns the keys
+ * (g_free destructor) and not the values; member upstreams
+ * are released through their own ref counts during drain
+ * or list teardown.
+ */
+ upstream->srv_members = g_hash_table_new_full(g_str_hash,
+ g_str_equal,
+ g_free, NULL);
ret = RSPAMD_PARSE_ADDR_RESOLVED;
if (ups->ctx) {
if (cur->active_idx >= 0 || (except && cur == except)) {
continue;
}
+ /*
+ * SRV parents never enter selection; they only own member
+ * upstreams. Skip them so probe mode considers real backends.
+ */
+ if (cur->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ continue;
+ }
/*
* Pending-resolve upstreams have no addresses yet — they can't
* be probed. The lazy resolver will move them into `alive` once
rspamd_upstream_traverse_func cb, void *ud)
{
struct upstream *up;
- unsigned int i;
+ unsigned int i, idx = 0;
for (i = 0; i < ups->ups->len; i++) {
up = g_ptr_array_index(ups->ups, i);
- cb(up, i, ud);
+ /*
+ * Skip SRV parent placeholders — they aren't selectable
+ * upstreams and exposing them to consumers (foreach is the
+ * public iteration API) would surprise callers that expect
+ * to enumerate real backends.
+ */
+ if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ continue;
+ }
+
+ cb(up, idx++, ud);
}
}