From: Vsevolod Stakhov Date: Tue, 10 Feb 2026 14:52:30 +0000 (+0000) Subject: [Rework] Replace broken Jump Hash with Ring Hash (Ketama) for consistent upstream... X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4ea7504663056c3a54476c47264c8df691087596;p=thirdparty%2Frspamd.git [Rework] Replace broken Jump Hash with Ring Hash (Ketama) for consistent upstream hashing Jump Consistent Hash (Lamping & Veach 2014) only handles bucket addition/removal at the end of the range. When an upstream in the middle failed, the old code rehashed with mum_hash_step and retried up to 20 times, which destroyed the consistency property: keys that mapped to the dead node were redistributed randomly instead of deterministically, and didn't return when the node recovered. Replace with a Ketama-style ring hash: - Each alive upstream gets MAX(weight,1)*100 virtual nodes on a sorted hash ring (keyed by name, order-independent). - Lookup is a binary search: O(log(n*v)) instead of O(ln n) * retries. - When an upstream fails, only its ~1/n fraction of keys slide to the next ring point — true minimal disruption. - When it recovers, the same keys return — true consistency. - The 'except' parameter walks forward on the ring instead of rehashing. - Ring is rebuilt lazily (dirty flag set on active/inactive transitions). --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 3725e80474..6ddf71e329 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -24,7 +24,6 @@ #include "contrib/libev/ev.h" #include "logger.h" #include "contrib/librdns/rdns.h" -#include "contrib/mumhash/mum.h" #include "heap.h" #include @@ -51,6 +50,12 @@ struct upstream_list_watcher { struct upstream_list_watcher *next, *prev; }; +/* Ring hash point for consistent hashing (Ketama) */ +struct upstream_ring_point { + uint64_t hash; + struct upstream *up; +}; + /* Heap element for token bucket selection */ struct upstream_token_heap_entry { unsigned int pri; /* Priority = inflight_tokens (lower = better) */ @@ -133,6 +138,11 @@ struct upstream_list { unsigned int cur_elt; enum rspamd_upstream_rotation rot_alg; + /* Ring hash for consistent hashing (Ketama) */ + struct upstream_ring_point *ring; + unsigned int ring_len; + gboolean ring_dirty; + /* Token bucket heap for weighted selection */ upstream_token_heap_t token_heap; gboolean token_bucket_initialized; @@ -396,6 +406,9 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream) g_ptr_array_add(ls->alive, upstream); upstream->active_idx = ls->alive->len - 1; + /* Invalidate ring hash */ + ls->ring_dirty = TRUE; + /* Initialize token bucket state */ upstream->heap_idx = UINT_MAX; if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) { @@ -922,6 +935,9 @@ rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream g_ptr_array_remove_index(ls->alive, upstream->active_idx); upstream->active_idx = -1; + /* Invalidate ring hash */ + ls->ring_dirty = TRUE; + /* Remove from token bucket heap if present */ if (ls->token_bucket_initialized && upstream->heap_idx != UINT_MAX) { struct upstream_token_heap_entry *entry; @@ -1679,6 +1695,11 @@ void rspamd_upstreams_destroy(struct upstream_list *ups) struct upstream_list_watcher *w, *tmp; if (ups != NULL) { + /* Clean up ring hash */ + g_free(ups->ring); + ups->ring = NULL; + ups->ring_len = 0; + /* Clean up token bucket heap */ if (ups->token_bucket_initialized) { rspamd_heap_destroy(upstream_token_heap, &ups->token_heap); @@ -1726,6 +1747,7 @@ rspamd_upstream_restore_cb(gpointer elt, gpointer ls) g_ptr_array_add(ups->alive, up); up->active_idx = ups->alive->len - 1; + ups->ring_dirty = TRUE; RSPAMD_UPSTREAM_UNLOCK(up); DL_FOREACH(up->ls->watchers, w) @@ -1830,24 +1852,79 @@ rspamd_upstream_get_round_robin(struct upstream_list *ups, } /* - * The key idea of this function is obtained from the following paper: - * A Fast, Minimal Memory, Consistent Hash Algorithm - * John Lamping, Eric Veach + * Ring hash (Ketama-style) consistent hashing. * - * http://arxiv.org/abs/1406.2294 + * Each alive upstream gets a number of virtual nodes placed on a hash ring. + * Lookup hashes the key and binary-searches for the next ring point. + * When an upstream fails, only its virtual nodes disappear; keys that + * mapped to them naturally slide to the next point on the ring, giving + * minimal disruption (only ~1/n of keys move for each removed upstream). */ -static uint32_t -rspamd_consistent_hash(uint64_t key, uint32_t nbuckets) + +/* Virtual nodes per unit of weight (weight 0 is treated as 1) */ +#define RSPAMD_RING_VNODES 100 + +static int +rspamd_upstream_ring_cmp(const void *a, const void *b) +{ + const struct upstream_ring_point *p1 = a, *p2 = b; + + if (p1->hash < p2->hash) { + return -1; + } + if (p1->hash > p2->hash) { + return 1; + } + + return 0; +} + +static void +rspamd_upstream_ring_build(struct upstream_list *ups) { - int64_t b = -1, j = 0; + unsigned int i, j; + struct upstream *up; + unsigned int total_vnodes = 0; + + g_free(ups->ring); + ups->ring = NULL; + ups->ring_len = 0; + + if (ups->alive->len == 0) { + ups->ring_dirty = FALSE; + return; + } + + /* Calculate total ring points needed */ + for (i = 0; i < ups->alive->len; i++) { + up = g_ptr_array_index(ups->alive, i); + total_vnodes += MAX(up->weight, 1) * RSPAMD_RING_VNODES; + } + + ups->ring = g_malloc(total_vnodes * sizeof(struct upstream_ring_point)); + + for (i = 0; i < ups->alive->len; i++) { + up = g_ptr_array_index(ups->alive, i); + unsigned int nvnodes = MAX(up->weight, 1) * RSPAMD_RING_VNODES; - while (j < nbuckets) { - b = j; - key *= 2862933555777941757ULL + 1; - j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL); + for (j = 0; j < nvnodes; j++) { + char vnode_key[280]; /* upstream name (253 max) + : + digits */ + int len = rspamd_snprintf(vnode_key, sizeof(vnode_key), + "%s:%ud", up->name, j); + uint64_t h = rspamd_cryptobox_fast_hash_specific( + RSPAMD_CRYPTOBOX_XXHASH64, + vnode_key, len, ups->hash_seed); + + ups->ring[ups->ring_len].hash = h; + ups->ring[ups->ring_len].up = up; + ups->ring_len++; + } } - return b; + qsort(ups->ring, ups->ring_len, sizeof(struct upstream_ring_point), + rspamd_upstream_ring_cmp); + + ups->ring_dirty = FALSE; } static struct upstream * @@ -1856,40 +1933,64 @@ rspamd_upstream_get_hashed(struct upstream_list *ups, const uint8_t *key, unsigned int keylen) { uint64_t k; - uint32_t idx; - static const unsigned int max_tries = 20; - struct upstream *up = NULL; + struct upstream *up; + + RSPAMD_UPSTREAM_LOCK(ups); + + /* Lazy ring rebuild */ + if (ups->ring_dirty || ups->ring == NULL) { + rspamd_upstream_ring_build(ups); + } + + if (ups->ring_len == 0) { + RSPAMD_UPSTREAM_UNLOCK(ups); + return NULL; + } - /* Generate 64 bits input key */ + /* Hash the lookup key */ k = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_XXHASH64, key, keylen, ups->hash_seed); - RSPAMD_UPSTREAM_LOCK(ups); - /* - * Select new upstream from all upstreams - */ - for (unsigned int i = 0; i < max_tries; i++) { - idx = rspamd_consistent_hash(k, ups->ups->len); - up = g_ptr_array_index(ups->ups, idx); + /* Binary search for first ring point >= k */ + unsigned int lo = 0, hi = ups->ring_len; - if (up->active_idx < 0 || (except != NULL && up == except)) { - /* Found inactive or excluded upstream */ - k = mum_hash_step(k, ups->hash_seed); + while (lo < hi) { + unsigned int mid = lo + (hi - lo) / 2; + + if (ups->ring[mid].hash < k) { + lo = mid + 1; } else { - break; + hi = mid; } } - RSPAMD_UPSTREAM_UNLOCK(ups); - if (up->active_idx >= 0) { - return up; + /* Wrap around */ + if (lo >= ups->ring_len) { + lo = 0; + } + + up = ups->ring[lo].up; + + /* Handle 'except': walk forward on ring to find a different upstream */ + if (except != NULL && up == except) { + for (unsigned int i = 1; i < ups->ring_len; i++) { + unsigned int idx = (lo + i) % ups->ring_len; + + if (ups->ring[idx].up != except) { + up = ups->ring[idx].up; + break; + } + } + + if (up == except) { + /* All ring points belong to the excluded upstream */ + RSPAMD_UPSTREAM_UNLOCK(ups); + return NULL; + } } - /* We failed to find any active upstream */ - up = rspamd_upstream_get_random(ups, except); - msg_info("failed to find hashed upstream for %s, fallback to random: %s", - ups->ups_line, up->name); + RSPAMD_UPSTREAM_UNLOCK(ups); return up; }