]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Rework] Replace broken Jump Hash with Ring Hash (Ketama) for consistent upstream...
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 10 Feb 2026 14:52:30 +0000 (14:52 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 10 Feb 2026 14:52:30 +0000 (14:52 +0000)
Jump Consistent Hash (Lamping & Veach 2014) only handles bucket
addition/removal at the end of the range.  When an upstream in the
middle failed, the old code rehashed with mum_hash_step and retried
up to 20 times, which destroyed the consistency property: keys that
mapped to the dead node were redistributed randomly instead of
deterministically, and didn't return when the node recovered.

Replace with a Ketama-style ring hash:
- Each alive upstream gets MAX(weight,1)*100 virtual nodes on a
  sorted hash ring (keyed by name, order-independent).
- Lookup is a binary search: O(log(n*v)) instead of O(ln n) * retries.
- When an upstream fails, only its ~1/n fraction of keys slide to the
  next ring point — true minimal disruption.
- When it recovers, the same keys return — true consistency.
- The 'except' parameter walks forward on the ring instead of rehashing.
- Ring is rebuilt lazily (dirty flag set on active/inactive transitions).

src/libutil/upstream.c

index 3725e80474c8d232586abe53b2b918af06302e3e..6ddf71e32971226898c0018ee27b556a491b6fa3 100644 (file)
@@ -24,7 +24,6 @@
 #include "contrib/libev/ev.h"
 #include "logger.h"
 #include "contrib/librdns/rdns.h"
-#include "contrib/mumhash/mum.h"
 #include "heap.h"
 
 #include <math.h>
@@ -51,6 +50,12 @@ struct upstream_list_watcher {
        struct upstream_list_watcher *next, *prev;
 };
 
+/* Ring hash point for consistent hashing (Ketama) */
+struct upstream_ring_point {
+       uint64_t hash;
+       struct upstream *up;
+};
+
 /* Heap element for token bucket selection */
 struct upstream_token_heap_entry {
        unsigned int pri;    /* Priority = inflight_tokens (lower = better) */
@@ -133,6 +138,11 @@ struct upstream_list {
        unsigned int cur_elt;
        enum rspamd_upstream_rotation rot_alg;
 
+       /* Ring hash for consistent hashing (Ketama) */
+       struct upstream_ring_point *ring;
+       unsigned int ring_len;
+       gboolean ring_dirty;
+
        /* Token bucket heap for weighted selection */
        upstream_token_heap_t token_heap;
        gboolean token_bucket_initialized;
@@ -396,6 +406,9 @@ rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
        g_ptr_array_add(ls->alive, upstream);
        upstream->active_idx = ls->alive->len - 1;
 
+       /* Invalidate ring hash */
+       ls->ring_dirty = TRUE;
+
        /* Initialize token bucket state */
        upstream->heap_idx = UINT_MAX;
        if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
@@ -922,6 +935,9 @@ rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream
        g_ptr_array_remove_index(ls->alive, upstream->active_idx);
        upstream->active_idx = -1;
 
+       /* Invalidate ring hash */
+       ls->ring_dirty = TRUE;
+
        /* Remove from token bucket heap if present */
        if (ls->token_bucket_initialized && upstream->heap_idx != UINT_MAX) {
                struct upstream_token_heap_entry *entry;
@@ -1679,6 +1695,11 @@ void rspamd_upstreams_destroy(struct upstream_list *ups)
        struct upstream_list_watcher *w, *tmp;
 
        if (ups != NULL) {
+               /* Clean up ring hash */
+               g_free(ups->ring);
+               ups->ring = NULL;
+               ups->ring_len = 0;
+
                /* Clean up token bucket heap */
                if (ups->token_bucket_initialized) {
                        rspamd_heap_destroy(upstream_token_heap, &ups->token_heap);
@@ -1726,6 +1747,7 @@ rspamd_upstream_restore_cb(gpointer elt, gpointer ls)
 
        g_ptr_array_add(ups->alive, up);
        up->active_idx = ups->alive->len - 1;
+       ups->ring_dirty = TRUE;
        RSPAMD_UPSTREAM_UNLOCK(up);
 
        DL_FOREACH(up->ls->watchers, w)
@@ -1830,24 +1852,79 @@ rspamd_upstream_get_round_robin(struct upstream_list *ups,
 }
 
 /*
- * The key idea of this function is obtained from the following paper:
- * A Fast, Minimal Memory, Consistent Hash Algorithm
- * John Lamping, Eric Veach
+ * Ring hash (Ketama-style) consistent hashing.
  *
- * http://arxiv.org/abs/1406.2294
+ * Each alive upstream gets a number of virtual nodes placed on a hash ring.
+ * Lookup hashes the key and binary-searches for the next ring point.
+ * When an upstream fails, only its virtual nodes disappear; keys that
+ * mapped to them naturally slide to the next point on the ring, giving
+ * minimal disruption (only ~1/n of keys move for each removed upstream).
  */
-static uint32_t
-rspamd_consistent_hash(uint64_t key, uint32_t nbuckets)
+
+/* Virtual nodes per unit of weight (weight 0 is treated as 1) */
+#define RSPAMD_RING_VNODES 100
+
+static int
+rspamd_upstream_ring_cmp(const void *a, const void *b)
+{
+       const struct upstream_ring_point *p1 = a, *p2 = b;
+
+       if (p1->hash < p2->hash) {
+               return -1;
+       }
+       if (p1->hash > p2->hash) {
+               return 1;
+       }
+
+       return 0;
+}
+
+static void
+rspamd_upstream_ring_build(struct upstream_list *ups)
 {
-       int64_t b = -1, j = 0;
+       unsigned int i, j;
+       struct upstream *up;
+       unsigned int total_vnodes = 0;
+
+       g_free(ups->ring);
+       ups->ring = NULL;
+       ups->ring_len = 0;
+
+       if (ups->alive->len == 0) {
+               ups->ring_dirty = FALSE;
+               return;
+       }
+
+       /* Calculate total ring points needed */
+       for (i = 0; i < ups->alive->len; i++) {
+               up = g_ptr_array_index(ups->alive, i);
+               total_vnodes += MAX(up->weight, 1) * RSPAMD_RING_VNODES;
+       }
+
+       ups->ring = g_malloc(total_vnodes * sizeof(struct upstream_ring_point));
+
+       for (i = 0; i < ups->alive->len; i++) {
+               up = g_ptr_array_index(ups->alive, i);
+               unsigned int nvnodes = MAX(up->weight, 1) * RSPAMD_RING_VNODES;
 
-       while (j < nbuckets) {
-               b = j;
-               key *= 2862933555777941757ULL + 1;
-               j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL);
+               for (j = 0; j < nvnodes; j++) {
+                       char vnode_key[280]; /* upstream name (253 max) + : + digits */
+                       int len = rspamd_snprintf(vnode_key, sizeof(vnode_key),
+                                                                         "%s:%ud", up->name, j);
+                       uint64_t h = rspamd_cryptobox_fast_hash_specific(
+                               RSPAMD_CRYPTOBOX_XXHASH64,
+                               vnode_key, len, ups->hash_seed);
+
+                       ups->ring[ups->ring_len].hash = h;
+                       ups->ring[ups->ring_len].up = up;
+                       ups->ring_len++;
+               }
        }
 
-       return b;
+       qsort(ups->ring, ups->ring_len, sizeof(struct upstream_ring_point),
+                 rspamd_upstream_ring_cmp);
+
+       ups->ring_dirty = FALSE;
 }
 
 static struct upstream *
@@ -1856,40 +1933,64 @@ rspamd_upstream_get_hashed(struct upstream_list *ups,
                                                   const uint8_t *key, unsigned int keylen)
 {
        uint64_t k;
-       uint32_t idx;
-       static const unsigned int max_tries = 20;
-       struct upstream *up = NULL;
+       struct upstream *up;
+
+       RSPAMD_UPSTREAM_LOCK(ups);
+
+       /* Lazy ring rebuild */
+       if (ups->ring_dirty || ups->ring == NULL) {
+               rspamd_upstream_ring_build(ups);
+       }
+
+       if (ups->ring_len == 0) {
+               RSPAMD_UPSTREAM_UNLOCK(ups);
+               return NULL;
+       }
 
-       /* Generate 64 bits input key */
+       /* Hash the lookup key */
        k = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_XXHASH64,
                                                                                        key, keylen, ups->hash_seed);
 
-       RSPAMD_UPSTREAM_LOCK(ups);
-       /*
-        * Select new upstream from all upstreams
-        */
-       for (unsigned int i = 0; i < max_tries; i++) {
-               idx = rspamd_consistent_hash(k, ups->ups->len);
-               up = g_ptr_array_index(ups->ups, idx);
+       /* Binary search for first ring point >= k */
+       unsigned int lo = 0, hi = ups->ring_len;
 
-               if (up->active_idx < 0 || (except != NULL && up == except)) {
-                       /* Found inactive or excluded upstream */
-                       k = mum_hash_step(k, ups->hash_seed);
+       while (lo < hi) {
+               unsigned int mid = lo + (hi - lo) / 2;
+
+               if (ups->ring[mid].hash < k) {
+                       lo = mid + 1;
                }
                else {
-                       break;
+                       hi = mid;
                }
        }
-       RSPAMD_UPSTREAM_UNLOCK(ups);
 
-       if (up->active_idx >= 0) {
-               return up;
+       /* Wrap around */
+       if (lo >= ups->ring_len) {
+               lo = 0;
+       }
+
+       up = ups->ring[lo].up;
+
+       /* Handle 'except': walk forward on ring to find a different upstream */
+       if (except != NULL && up == except) {
+               for (unsigned int i = 1; i < ups->ring_len; i++) {
+                       unsigned int idx = (lo + i) % ups->ring_len;
+
+                       if (ups->ring[idx].up != except) {
+                               up = ups->ring[idx].up;
+                               break;
+                       }
+               }
+
+               if (up == except) {
+                       /* All ring points belong to the excluded upstream */
+                       RSPAMD_UPSTREAM_UNLOCK(ups);
+                       return NULL;
+               }
        }
 
-       /* We failed to find any active upstream */
-       up = rspamd_upstream_get_random(ups, except);
-       msg_info("failed to find hashed upstream for %s, fallback to random: %s",
-                        ups->ups_line, up->name);
+       RSPAMD_UPSTREAM_UNLOCK(ups);
 
        return up;
 }