#include "contrib/libev/ev.h"
#include "logger.h"
#include "contrib/librdns/rdns.h"
-#include "contrib/mumhash/mum.h"
#include "heap.h"
#include <math.h>
struct upstream_list_watcher *next, *prev;
};
+/* Ring hash point for consistent hashing (Ketama) */
+struct upstream_ring_point {
+ uint64_t hash;
+ struct upstream *up;
+};
+
/* Heap element for token bucket selection */
struct upstream_token_heap_entry {
unsigned int pri; /* Priority = inflight_tokens (lower = better) */
unsigned int cur_elt;
enum rspamd_upstream_rotation rot_alg;
+ /* Ring hash for consistent hashing (Ketama) */
+ struct upstream_ring_point *ring;
+ unsigned int ring_len;
+ gboolean ring_dirty;
+
/* Token bucket heap for weighted selection */
upstream_token_heap_t token_heap;
gboolean token_bucket_initialized;
g_ptr_array_add(ls->alive, upstream);
upstream->active_idx = ls->alive->len - 1;
+ /* Invalidate ring hash */
+ ls->ring_dirty = TRUE;
+
/* Initialize token bucket state */
upstream->heap_idx = UINT_MAX;
if (ls->rot_alg == RSPAMD_UPSTREAM_TOKEN_BUCKET) {
g_ptr_array_remove_index(ls->alive, upstream->active_idx);
upstream->active_idx = -1;
+ /* Invalidate ring hash */
+ ls->ring_dirty = TRUE;
+
/* Remove from token bucket heap if present */
if (ls->token_bucket_initialized && upstream->heap_idx != UINT_MAX) {
struct upstream_token_heap_entry *entry;
struct upstream_list_watcher *w, *tmp;
if (ups != NULL) {
+ /* Clean up ring hash */
+ g_free(ups->ring);
+ ups->ring = NULL;
+ ups->ring_len = 0;
+
/* Clean up token bucket heap */
if (ups->token_bucket_initialized) {
rspamd_heap_destroy(upstream_token_heap, &ups->token_heap);
g_ptr_array_add(ups->alive, up);
up->active_idx = ups->alive->len - 1;
+ ups->ring_dirty = TRUE;
RSPAMD_UPSTREAM_UNLOCK(up);
DL_FOREACH(up->ls->watchers, w)
}
/*
- * The key idea of this function is obtained from the following paper:
- * A Fast, Minimal Memory, Consistent Hash Algorithm
- * John Lamping, Eric Veach
+ * Ring hash (Ketama-style) consistent hashing.
*
- * http://arxiv.org/abs/1406.2294
+ * Each alive upstream gets a number of virtual nodes placed on a hash ring.
+ * Lookup hashes the key and binary-searches for the next ring point.
+ * When an upstream fails, only its virtual nodes disappear; keys that
+ * mapped to them naturally slide to the next point on the ring, giving
+ * minimal disruption (only ~1/n of keys move for each removed upstream).
*/
-static uint32_t
-rspamd_consistent_hash(uint64_t key, uint32_t nbuckets)
+
+/* Virtual nodes per unit of weight (weight 0 is treated as 1) */
+#define RSPAMD_RING_VNODES 100
+
+static int
+rspamd_upstream_ring_cmp(const void *a, const void *b)
+{
+ const struct upstream_ring_point *p1 = a, *p2 = b;
+
+ if (p1->hash < p2->hash) {
+ return -1;
+ }
+ if (p1->hash > p2->hash) {
+ return 1;
+ }
+
+ return 0;
+}
+
+static void
+rspamd_upstream_ring_build(struct upstream_list *ups)
{
- int64_t b = -1, j = 0;
+ unsigned int i, j;
+ struct upstream *up;
+ unsigned int total_vnodes = 0;
+
+ g_free(ups->ring);
+ ups->ring = NULL;
+ ups->ring_len = 0;
+
+ if (ups->alive->len == 0) {
+ ups->ring_dirty = FALSE;
+ return;
+ }
+
+ /* Calculate total ring points needed */
+ for (i = 0; i < ups->alive->len; i++) {
+ up = g_ptr_array_index(ups->alive, i);
+ total_vnodes += MAX(up->weight, 1) * RSPAMD_RING_VNODES;
+ }
+
+ ups->ring = g_malloc(total_vnodes * sizeof(struct upstream_ring_point));
+
+ for (i = 0; i < ups->alive->len; i++) {
+ up = g_ptr_array_index(ups->alive, i);
+ unsigned int nvnodes = MAX(up->weight, 1) * RSPAMD_RING_VNODES;
- while (j < nbuckets) {
- b = j;
- key *= 2862933555777941757ULL + 1;
- j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL);
+ for (j = 0; j < nvnodes; j++) {
+ char vnode_key[280]; /* upstream name (253 max) + : + digits */
+ int len = rspamd_snprintf(vnode_key, sizeof(vnode_key),
+ "%s:%ud", up->name, j);
+ uint64_t h = rspamd_cryptobox_fast_hash_specific(
+ RSPAMD_CRYPTOBOX_XXHASH64,
+ vnode_key, len, ups->hash_seed);
+
+ ups->ring[ups->ring_len].hash = h;
+ ups->ring[ups->ring_len].up = up;
+ ups->ring_len++;
+ }
}
- return b;
+ qsort(ups->ring, ups->ring_len, sizeof(struct upstream_ring_point),
+ rspamd_upstream_ring_cmp);
+
+ ups->ring_dirty = FALSE;
}
static struct upstream *
const uint8_t *key, unsigned int keylen)
{
uint64_t k;
- uint32_t idx;
- static const unsigned int max_tries = 20;
- struct upstream *up = NULL;
+ struct upstream *up;
+
+ RSPAMD_UPSTREAM_LOCK(ups);
+
+ /* Lazy ring rebuild */
+ if (ups->ring_dirty || ups->ring == NULL) {
+ rspamd_upstream_ring_build(ups);
+ }
+
+ if (ups->ring_len == 0) {
+ RSPAMD_UPSTREAM_UNLOCK(ups);
+ return NULL;
+ }
- /* Generate 64 bits input key */
+ /* Hash the lookup key */
k = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_XXHASH64,
key, keylen, ups->hash_seed);
- RSPAMD_UPSTREAM_LOCK(ups);
- /*
- * Select new upstream from all upstreams
- */
- for (unsigned int i = 0; i < max_tries; i++) {
- idx = rspamd_consistent_hash(k, ups->ups->len);
- up = g_ptr_array_index(ups->ups, idx);
+ /* Binary search for first ring point >= k */
+ unsigned int lo = 0, hi = ups->ring_len;
- if (up->active_idx < 0 || (except != NULL && up == except)) {
- /* Found inactive or excluded upstream */
- k = mum_hash_step(k, ups->hash_seed);
+ while (lo < hi) {
+ unsigned int mid = lo + (hi - lo) / 2;
+
+ if (ups->ring[mid].hash < k) {
+ lo = mid + 1;
}
else {
- break;
+ hi = mid;
}
}
- RSPAMD_UPSTREAM_UNLOCK(ups);
- if (up->active_idx >= 0) {
- return up;
+ /* Wrap around */
+ if (lo >= ups->ring_len) {
+ lo = 0;
+ }
+
+ up = ups->ring[lo].up;
+
+ /* Handle 'except': walk forward on ring to find a different upstream */
+ if (except != NULL && up == except) {
+ for (unsigned int i = 1; i < ups->ring_len; i++) {
+ unsigned int idx = (lo + i) % ups->ring_len;
+
+ if (ups->ring[idx].up != except) {
+ up = ups->ring[idx].up;
+ break;
+ }
+ }
+
+ if (up == except) {
+ /* All ring points belong to the excluded upstream */
+ RSPAMD_UPSTREAM_UNLOCK(ups);
+ return NULL;
+ }
}
- /* We failed to find any active upstream */
- up = rspamd_upstream_get_random(ups, except);
- msg_info("failed to find hashed upstream for %s, fallback to random: %s",
- ups->ups_line, up->name);
+ RSPAMD_UPSTREAM_UNLOCK(ups);
return up;
}