From 7984ea11efc5318ef4f587726cdc7c0886d9f5a5 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 8 Dec 2018 14:44:52 +0000 Subject: [PATCH] [Feature] Fuzzy_storage: add preliminary support of rate limits --- src/fuzzy_storage.c | 222 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 221 insertions(+), 1 deletion(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index a3486635f0..7fa821e9b6 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -17,8 +17,8 @@ * Rspamd fuzzy storage server */ -#include #include "config.h" +#include "libserver/fuzzy_wire.h" #include "util.h" #include "rspamd.h" #include "map.h" @@ -37,14 +37,20 @@ #include "libutil/map_private.h" #include "libutil/hash.h" #include "libutil/http_private.h" +#include "libutil/hash.h" #include "unix-std.h" +#include + /* Resync value in seconds */ #define DEFAULT_SYNC_TIMEOUT 60.0 #define DEFAULT_KEYPAIR_CACHE_SIZE 512 #define DEFAULT_MASTER_TIMEOUT 10.0 #define DEFAULT_UPDATES_MAXFAIL 3 #define COOKIE_SIZE 128 +#define DEFAULT_MAX_BUCKETS 2000 +#define DEFAULT_BUCKET_TTL 3600 +#define DEFAULT_BUCKET_MASK 24 static const gchar *local_db_name = "local"; @@ -115,6 +121,12 @@ struct rspamd_fuzzy_mirror { struct rspamd_cryptobox_pubkey *key; }; +struct rspamd_leaky_bucket_elt { + rspamd_inet_addr_t *addr; + gdouble last; + gdouble cur; +}; + static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL; struct rspamd_fuzzy_storage_ctx { @@ -132,6 +144,7 @@ struct rspamd_fuzzy_storage_ctx { struct rspamd_radix_map_helper *update_ips; struct rspamd_radix_map_helper *master_ips; struct rspamd_radix_map_helper *blocked_ips; + struct rspamd_radix_map_helper *ratelimit_whitelist; struct rspamd_cryptobox_keypair *sync_keypair; struct rspamd_cryptobox_pubkey *master_key; @@ -141,6 +154,7 @@ struct rspamd_fuzzy_storage_ctx { const ucl_object_t *update_map; const ucl_object_t *masters_map; const ucl_object_t *blocked_map; + const ucl_object_t *ratelimit_whitelist_map; GHashTable *master_flags; guint keypair_cache_size; @@ -148,6 +162,7 @@ struct rspamd_fuzzy_storage_ctx { struct event peer_ev; struct event stat_ev; struct timeval stat_tv; + /* Local keypair */ struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */ struct fuzzy_key *default_key; @@ -160,11 +175,21 @@ struct rspamd_fuzzy_storage_ctx { gchar *collection_id_file; struct rspamd_keypair_cache *keypair_cache; rspamd_lru_hash_t *errors_ips; + rspamd_lru_hash_t *ratelimit_buckets; struct rspamd_fuzzy_backend *backend; GArray *updates_pending; guint updates_failed; guint updates_maxfail; guint32 collection_id; + + /* Ratelimits */ + guint leaky_bucket_ttl; + guint leaky_bucket_mask; + guint max_buckets; + gboolean ratelimit_log_only; + gdouble leaky_bucket_burst; + gdouble leaky_bucket_rate; + struct rspamd_worker *worker; struct rspamd_http_connection_router *collection_rt; const ucl_object_t *skip_map; @@ -230,6 +255,105 @@ struct fuzzy_master_update_session { static void rspamd_fuzzy_write_reply (struct fuzzy_session *session); +static gboolean +rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) +{ + rspamd_inet_addr_t *masked; + struct rspamd_leaky_bucket_elt *elt; + struct timeval tv; + gdouble now; + + if (session->ctx->ratelimit_whitelist != NULL) { + if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist, + session->addr) != NULL) { + return TRUE; + } + } + + /* + if (rspamd_inet_address_is_local (session->addr, TRUE)) { + return TRUE; + } + */ + + masked = rspamd_inet_address_copy (session->addr); + + if (rspamd_inet_address_get_af (masked) == AF_INET) { + rspamd_inet_address_apply_mask (masked, + MIN (session->ctx->leaky_bucket_mask, 32)); + } + else { + /* Must be at least /64 */ + rspamd_inet_address_apply_mask (masked, + MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128)); + } + +#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC + event_base_gettimeofday_cached (session->ctx->ev_base, &tv); +#else + gettimeofday (&tv, NULL); +#endif + + now = tv_to_double (&tv); + elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked, + tv.tv_sec); + + if (elt) { + gboolean ratelimited = FALSE; + + if (isnan (elt->cur)) { + /* Ratelimit exceeded, preserve it for the whole ttl */ + ratelimited = TRUE; + } + else { + /* Update bucket */ + if (elt->last < now) { + elt->cur -= session->ctx->leaky_bucket_rate * (now - elt->last); + elt->last = now; + + if (elt->cur < 0) { + elt->cur = 0; + } + } + else { + elt->last = now; + } + + /* Check bucket */ + if (elt->cur >= session->ctx->leaky_bucket_burst) { + + msg_info ("ratelimiting %s (%s), %.1f max elts", + rspamd_inet_address_to_string (session->addr), + rspamd_inet_address_to_string (masked), + session->ctx->leaky_bucket_burst); + elt->cur = NAN; + } + else { + elt->cur ++; /* Allow one more request */ + } + } + + rspamd_inet_address_free (masked); + + return !ratelimited; + } + else { + /* New bucket */ + elt = g_malloc (sizeof (*elt)); + elt->addr = masked; /* transfer ownership */ + elt->cur = 1; + elt->last = now; + + rspamd_lru_hash_insert (session->ctx->ratelimit_buckets, + masked, + elt, + tv.tv_sec, + session->ctx->leaky_bucket_ttl); + } + + return TRUE; +} + static gboolean rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write) { @@ -259,6 +383,15 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write) } /* Non write */ + if (session->ctx->ratelimit_buckets) { + if (session->ctx->ratelimit_log_only) { + (void)rspamd_fuzzy_check_ratelimit (session); /* Check but ignore */ + } + else { + return rspamd_fuzzy_check_ratelimit (session); + } + } + return TRUE; } @@ -299,6 +432,15 @@ struct fuzzy_slave_connection { gint sock; }; +static void +fuzzy_rl_bucket_free (gpointer p) +{ + struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *)p; + + rspamd_inet_address_free (elt->addr); + g_free (elt); +} + static void fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn) { @@ -2492,6 +2634,11 @@ init_fuzzy (struct rspamd_config *cfg) (rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors); ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL; ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id"; + ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK; + ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL; + ctx->max_buckets = DEFAULT_MAX_BUCKETS; + ctx->leaky_bucket_burst = NAN; + ctx->leaky_bucket_rate = NAN; rspamd_rcl_register_worker_option (cfg, type, @@ -2682,6 +2829,65 @@ init_fuzzy (struct rspamd_config *cfg) 0, "Skip specific hashes from the map"); + /* Ratelimits */ + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_whitelist", + rspamd_rcl_parse_struct_ucl, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_whitelist_map), + 0, + "Skip specific addresses from rate limiting"); + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_max_buckets", + rspamd_rcl_parse_struct_integer, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_buckets), + RSPAMD_CL_FLAG_UINT, + "Maximum number of leaky buckets (default: " G_STRINGIFY(DEFAULT_MAX_BUCKETS) ")"); + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_network_mask", + rspamd_rcl_parse_struct_integer, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_mask), + RSPAMD_CL_FLAG_UINT, + "Network mask to apply for IPv4 rate addresses (default: " G_STRINGIFY(DEFAULT_BUCKET_MASK) ")"); + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_bucket_ttl", + rspamd_rcl_parse_struct_time, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_ttl), + RSPAMD_CL_FLAG_TIME_INTEGER, + "Time to live for ratelimit element (default: " G_STRINGIFY(DEFAULT_BUCKET_TTL) ")"); + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_rate", + rspamd_rcl_parse_struct_double, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_rate), + 0, + "Leak rate in requests per second"); + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_burst", + rspamd_rcl_parse_struct_double, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_burst), + 0, + "Peak value for ratelimit bucket"); + rspamd_rcl_register_worker_option (cfg, + type, + "ratelimit_log_only", + rspamd_rcl_parse_struct_boolean, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_log_only), + 0, + "Don't really ban on ratelimit reaching, just log"); + + return ctx; } @@ -2959,6 +3165,20 @@ start_fuzzy (struct rspamd_worker *worker) &ctx->blocked_ips, NULL); } + /* Create radix trees */ + if (ctx->ratelimit_whitelist_map != NULL) { + rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->ratelimit_whitelist_map, + "Skip ratelimits from specific ip addresses/networks", + &ctx->ratelimit_whitelist, NULL); + } + + /* Ratelimits */ + if (!isnan (ctx->leaky_bucket_rate) && !isnan (ctx->leaky_bucket_burst)) { + ctx->ratelimit_buckets = rspamd_lru_hash_new_full (ctx->max_buckets, + NULL, fuzzy_rl_bucket_free, + rspamd_inet_address_hash, rspamd_inet_address_equal); + } + /* Maps events */ ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, -- 2.47.3