]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Refactor] fuzzy storage: split helper code (#5875) master
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 5 Feb 2026 15:38:07 +0000 (15:38 +0000)
committerGitHub <noreply@github.com>
Thu, 5 Feb 2026 15:38:07 +0000 (15:38 +0000)
src/fuzzy_storage.c
src/libserver/CMakeLists.txt
src/libserver/fuzzy_storage_internal.h [new file with mode: 0644]
src/libserver/fuzzy_storage_keys.c [new file with mode: 0644]
src/libserver/fuzzy_storage_ratelimit.c [new file with mode: 0644]
src/libserver/fuzzy_storage_stat.c [new file with mode: 0644]

index 0aab908e5870716fb4e9b7bf0203b19c05de15e6..1b649de3c6bd42f33342ccc233fc27fb18fe47ed 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "config.h"
 #include "libserver/fuzzy_wire.h"
 
 #include "config.h"
 #include "libserver/fuzzy_wire.h"
+#include "libserver/fuzzy_storage_internal.h"
 #include "util.h"
 #include "rspamd.h"
 #include "libserver/maps/map.h"
 #include "util.h"
 #include "rspamd.h"
 #include "libserver/maps/map.h"
@@ -75,176 +76,8 @@ worker_t fuzzy_worker = {
        RSPAMD_WORKER_VER                                    /* Version info */
 };
 
        RSPAMD_WORKER_VER                                    /* Version info */
 };
 
-struct fuzzy_global_stat {
-       uint64_t fuzzy_hashes;
-       /**< number of fuzzy hashes stored                                      */
-       uint64_t fuzzy_hashes_expired;
-       /**< number of fuzzy hashes expired                                     */
-       uint64_t fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX];
-       /**< amount of check requests for each epoch            */
-       uint64_t fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX];
-       /**< amount of shingle check requests for each epoch    */
-       uint64_t fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX];
-       /**< amount of invalid requests                         */
-       uint64_t invalid_requests;
-       /**< amount of delayed hashes found                             */
-       uint64_t delayed_hashes;
-};
-
-struct fuzzy_key_stat {
-       uint64_t checked;
-       uint64_t matched;
-       uint64_t added;
-       uint64_t deleted;
-       uint64_t errors;
-       /* Store averages for checked/matched per minute */
-       struct rspamd_counter_data checked_ctr;
-       struct rspamd_counter_data matched_ctr;
-       double last_checked_time;
-       uint64_t last_checked_count;
-       uint64_t last_matched_count;
-       struct rspamd_cryptobox_keypair *keypair;
-       rspamd_lru_hash_t *last_ips;
-
-       ref_entry_t ref;
-};
-
-struct rspamd_leaky_bucket_elt {
-       rspamd_inet_addr_t *addr;
-       double last;
-       double cur;
-};
-
 static const uint64_t rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
 
 static const uint64_t rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
 
-static int64_t
-fuzzy_kp_hash(const unsigned char *p)
-{
-       int64_t res;
-
-       memcpy(&res, p, sizeof(res));
-       return res;
-}
-static bool
-fuzzy_kp_equal(gconstpointer a, gconstpointer b)
-{
-       const unsigned char *pa = a, *pb = b;
-
-       return (memcmp(pa, pb, RSPAMD_FUZZY_KEYLEN) == 0);
-}
-
-enum fuzzy_key_op {
-       FUZZY_KEY_READ = 0x1u << 0,
-       FUZZY_KEY_WRITE = 0x1u << 1,
-       FUZZY_KEY_DELETE = 0x1u << 2,
-};
-KHASH_SET_INIT_INT(fuzzy_key_ids_set);
-KHASH_INIT(fuzzy_key_flag_stat, int, struct fuzzy_key_stat, 1, kh_int_hash_func,
-                  kh_int_hash_equal);
-struct fuzzy_key {
-       char *name;
-       struct rspamd_cryptobox_keypair *key;
-       struct rspamd_cryptobox_pubkey *pk;
-       struct fuzzy_key_stat *stat;
-       khash_t(fuzzy_key_flag_stat) * flags_stat;
-       khash_t(fuzzy_key_ids_set) * forbidden_ids;
-       struct rspamd_leaky_bucket_elt *rl_bucket;
-       ucl_object_t *extensions;
-       double burst;
-       double rate;
-       ev_tstamp expire;
-       bool expired;
-       int flags; /* enum fuzzy_key_op */
-       ref_entry_t ref;
-};
-
-KHASH_INIT(rspamd_fuzzy_keys_hash,
-                  const unsigned char *, struct fuzzy_key *, 1,
-                  fuzzy_kp_hash, fuzzy_kp_equal);
-
-struct rspamd_lua_fuzzy_script {
-       int cbref;
-       struct rspamd_lua_fuzzy_script *next;
-};
-
-struct rspamd_fuzzy_storage_ctx {
-       uint64_t magic;
-       /* Events base */
-       struct ev_loop *event_loop;
-       /* DNS resolver */
-       struct rspamd_dns_resolver *resolver;
-       /* Config */
-       struct rspamd_config *cfg;
-       /* END OF COMMON PART */
-       struct fuzzy_global_stat stat;
-       double expire;
-       double sync_timeout;
-       double delay;
-       double tcp_timeout;
-       struct rspamd_radix_map_helper *update_ips;
-       struct rspamd_hash_map_helper *update_keys;
-       struct rspamd_radix_map_helper *blocked_ips;
-       struct rspamd_radix_map_helper *ratelimit_whitelist;
-       struct rspamd_radix_map_helper *delay_whitelist;
-
-       const ucl_object_t *update_map;
-       const ucl_object_t *update_keys_map;
-       const ucl_object_t *delay_whitelist_map;
-       const ucl_object_t *blocked_map;
-       const ucl_object_t *ratelimit_whitelist_map;
-       const ucl_object_t *dynamic_keys_map;
-
-       unsigned int keypair_cache_size;
-       ev_timer stat_ev;
-       ev_io peer_ev;
-
-       /* Local keypair */
-       struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
-       struct fuzzy_key *default_key;
-       khash_t(rspamd_fuzzy_keys_hash) * keys;
-       /* Those are loaded via map */
-       khash_t(rspamd_fuzzy_keys_hash) * dynamic_keys;
-
-       gboolean encrypted_only;
-       gboolean read_only;
-       gboolean dedicated_update_worker;
-       struct rspamd_keypair_cache *keypair_cache;
-       struct rspamd_http_context *http_ctx;
-       rspamd_lru_hash_t *errors_ips;
-       rspamd_lru_hash_t *ratelimit_buckets;
-       struct rspamd_fuzzy_backend *backend;
-       GArray *updates_pending;
-       unsigned int updates_failed;
-       unsigned int updates_maxfail;
-       /* Used to send data between workers */
-       int peer_fd;
-
-       /* Ratelimits */
-       unsigned int leaky_bucket_ttl;
-       unsigned int leaky_bucket_mask;
-       unsigned int max_buckets;
-       gboolean ratelimit_log_only;
-       double leaky_bucket_burst;
-       double leaky_bucket_rate;
-
-       struct rspamd_worker *worker;
-       const ucl_object_t *skip_map;
-       struct rspamd_hash_map_helper *skip_hashes;
-       struct rspamd_lua_fuzzy_script *lua_pre_handlers;
-       struct rspamd_lua_fuzzy_script *lua_post_handlers;
-       struct rspamd_lua_fuzzy_script *lua_blacklist_handlers;
-       khash_t(fuzzy_key_ids_set) * default_forbidden_ids;
-       /* Ids that should not override other ids */
-       khash_t(fuzzy_key_ids_set) * weak_ids;
-};
-
-enum fuzzy_cmd_type {
-       CMD_NORMAL,
-       CMD_SHINGLE,
-       CMD_ENCRYPTED_NORMAL,
-       CMD_ENCRYPTED_SHINGLE
-};
-
 struct rspamd_fuzzy_tcp_frame {
        uint16_t size_hdr;                           /* We have to write this as well */
        struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
 struct rspamd_fuzzy_tcp_frame {
        uint16_t size_hdr;                           /* We have to write this as well */
        struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
@@ -301,30 +134,6 @@ struct fuzzy_udp_session {
        ref_entry_t ref;
 };
 
        ref_entry_t ref;
 };
 
-/* Legacy structure name for compatibility during refactoring */
-struct fuzzy_session {
-       struct rspamd_worker *worker;
-       rspamd_inet_addr_t *addr;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-
-       struct rspamd_fuzzy_shingle_cmd cmd;       /* Can handle both shingles and non-shingles */
-       struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
-       struct fuzzy_key_stat *ip_stat;
-
-       enum rspamd_fuzzy_epoch epoch;
-       enum fuzzy_cmd_type cmd_type;
-       int fd;
-       ev_tstamp timestamp;
-       struct ev_io io;
-       ref_entry_t ref;
-       struct fuzzy_key *key;
-       struct rspamd_fuzzy_cmd_extension *extensions;
-       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
-
-       /* If this is a TCP session, this pointer will be set */
-       struct fuzzy_tcp_session *tcp_session;
-};
-
 struct fuzzy_peer_request {
        ev_io io_ev;
        struct fuzzy_peer_cmd cmd;
 struct fuzzy_peer_request {
        ev_io io_ev;
        struct fuzzy_peer_cmd cmd;
@@ -337,598 +146,15 @@ struct rspamd_updates_cbdata {
        gboolean final;
 };
 
        gboolean final;
 };
 
-enum rspamd_ratelimit_event_type {
-       RATELIMIT_EVENT_NEW,
-       RATELIMIT_EVENT_EXISTING,
-       RATELIMIT_EVENT_BLACKLIST,
-};
-
-struct rspamd_ratelimit_callback_ctx {
-       rspamd_inet_addr_t *addr;              /* Client IP */
-       const char *reason;                    /* "ratelimit" or "blacklisted" */
-       enum rspamd_ratelimit_event_type type; /* new, existing, blacklist */
-
-       /* Rate limit bucket state (optional) */
-       struct rspamd_leaky_bucket_elt *bucket;
-       double max_burst;
-       double max_rate;
-
-       /* Session context (optional - only for per-key limits) */
-       struct fuzzy_session *session;
-};
-
 static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
 static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
 static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
                                                                                 struct fuzzy_tcp_reply_queue_elt *reply);
 static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
                                                                                                   const char *source, gboolean final);
 static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
 static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
 static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
                                                                                 struct fuzzy_tcp_reply_queue_elt *reply);
 static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
                                                                                                   const char *source, gboolean final);
-static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                                                 rspamd_inet_addr_t *addr);
-static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                                                               rspamd_inet_addr_t *addr,
-                                                                                               const char *reason);
-static void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                                                                const struct rspamd_ratelimit_callback_ctx *cb_ctx);
-static struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg,
-                                                                                                       const ucl_object_t *obj,
-                                                                                                       khash_t(rspamd_fuzzy_keys_hash) * target);
 static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
 static void accept_tcp_socket(EV_P_ ev_io *w, int revents);
 
 static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
 static void accept_tcp_socket(EV_P_ ev_io *w, int revents);
 
-static ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt);
-struct fuzzy_keymap_ucl_buf {
-       rspamd_fstring_t *buf;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-};
-
-/* Callbacks for reading json dynamic rules */
-static char *
-ucl_keymap_read_cb(char *chunk,
-                                  int len,
-                                  struct map_cb_data *data,
-                                  gboolean final)
-{
-       struct fuzzy_keymap_ucl_buf *jb, *pd;
-
-       pd = data->prev_data;
-
-       g_assert(pd != NULL);
-
-       if (data->cur_data == NULL) {
-               jb = g_malloc0(sizeof(*jb));
-               jb->ctx = pd->ctx;
-               data->cur_data = jb;
-       }
-       else {
-               jb = data->cur_data;
-       }
-
-       if (jb->buf == NULL) {
-               /* Allocate memory for buffer */
-               jb->buf = rspamd_fstring_sized_new(MAX(len, 4096));
-       }
-
-       jb->buf = rspamd_fstring_append(jb->buf, chunk, len);
-
-       return NULL;
-}
-
-static void
-ucl_keymap_fin_cb(struct map_cb_data *data, void **target)
-{
-       struct fuzzy_keymap_ucl_buf *jb;
-       ucl_object_t *top;
-       struct ucl_parser *parser;
-       struct rspamd_config *cfg;
-
-       /* Now parse ucl */
-       if (data->cur_data) {
-               jb = data->cur_data;
-               cfg = jb->ctx->cfg;
-       }
-       else {
-               msg_err("no cur data in the map! might be a bug");
-               return;
-       }
-
-       if (jb->buf->len == 0) {
-               msg_err_config("no data read");
-
-               return;
-       }
-
-       parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
-
-       if (!ucl_parser_add_chunk(parser, jb->buf->str, jb->buf->len)) {
-               msg_err_config("cannot load ucl data: parse error %s",
-                                          ucl_parser_get_error(parser));
-               ucl_parser_free(parser);
-               return;
-       }
-
-       top = ucl_parser_get_object(parser);
-       ucl_parser_free(parser);
-
-       if (ucl_object_type(top) != UCL_ARRAY) {
-               ucl_object_unref(top);
-               msg_err_config("loaded ucl is not an array");
-               return;
-       }
-
-       if (target) {
-               *target = data->cur_data;
-       }
-
-       if (data->prev_data) {
-               jb = data->prev_data;
-               /* Clean prev data */
-               if (jb->buf) {
-                       rspamd_fstring_free(jb->buf);
-               }
-
-               /* Clean the existing keys */
-               struct fuzzy_key *key;
-               kh_foreach_value(jb->ctx->dynamic_keys, key, {
-                       REF_RELEASE(key);
-               });
-               kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
-
-               /* Insert new keys */
-               const ucl_object_t *cur;
-               ucl_object_iter_t it = NULL;
-               int success = 0;
-
-               while ((cur = ucl_object_iterate(top, &it, true)) != NULL) {
-                       struct fuzzy_key *nk;
-
-                       nk = fuzzy_add_keypair_from_ucl(cfg, cur, jb->ctx->dynamic_keys);
-
-                       if (nk == NULL) {
-                               msg_warn_config("cannot add dynamic keypair");
-                       }
-                       success++;
-               }
-
-               msg_info_config("loaded %d dynamic keypairs", success);
-
-               g_free(jb);
-       }
-
-       ucl_object_unref(top);
-}
-
-static void
-ucl_keymap_dtor_cb(struct map_cb_data *data)
-{
-       struct fuzzy_keymap_ucl_buf *jb;
-
-       if (data->cur_data) {
-               jb = data->cur_data;
-               /* Clean prev data */
-               if (jb->buf) {
-                       rspamd_fstring_free(jb->buf);
-               }
-
-               struct fuzzy_key *key;
-               kh_foreach_value(jb->ctx->dynamic_keys, key, {
-                       REF_RELEASE(key);
-               });
-               /* Clear hash content but don't destroy - mempool destructor will handle it */
-               kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
-
-               g_free(jb);
-       }
-}
-
-enum rspamd_ratelimit_check_result {
-       ratelimit_pass,
-       ratelimit_new,
-       ratelimit_existing,
-};
-
-enum rspamd_ratelimit_check_policy {
-       ratelimit_policy_permanent,
-       ratelimit_policy_normal,
-};
-
-static enum rspamd_ratelimit_check_result
-rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                                       rspamd_inet_addr_t *addr,
-                                                                       ev_tstamp timestamp,
-                                                                       struct rspamd_leaky_bucket_elt *elt,
-                                                                       enum rspamd_ratelimit_check_policy policy,
-                                                                       double max_burst, double max_rate)
-{
-       gboolean ratelimited = FALSE, new_ratelimit = FALSE;
-
-       /* Nothing to check */
-       if (isnan(max_burst) || isnan(max_rate)) {
-               return ratelimit_pass;
-       }
-
-       if (isnan(elt->cur)) {
-               /* There is an issue with the previous logic: the TTL is updated each time
-                * we see that new bucket. Hence, we need to check the `last` and act accordingly
-                */
-               if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
-                       /*
-                                * We reset bucket to it's 90% capacity to allow some requests
-                                * This should cope with the issue when we block an IP network for some burst and never unblock it
-                                */
-                       elt->cur = max_burst * 0.9;
-                       elt->last = timestamp;
-               }
-               else {
-                       ratelimited = TRUE;
-               }
-       }
-       else {
-               /* Update bucket: leak some elements */
-               if (elt->last < timestamp) {
-                       elt->cur -= max_rate * (timestamp - elt->last);
-                       elt->last = timestamp;
-
-                       if (elt->cur < 0) {
-                               elt->cur = 0;
-                       }
-               }
-               else {
-                       elt->last = timestamp;
-               }
-
-               /* Check the bucket */
-               if (elt->cur >= max_burst) {
-
-                       if (policy == ratelimit_policy_permanent) {
-                               elt->cur = NAN;
-                       }
-                       new_ratelimit = TRUE;
-                       ratelimited = TRUE;
-               }
-               else {
-                       elt->cur++; /* Allow one more request */
-               }
-       }
-
-       /* Note: Caller is responsible for calling the ratelimit handlers with
-        * proper context (new vs existing, bucket info, session info, etc.)
-        */
-
-       if (new_ratelimit) {
-               return ratelimit_new;
-       }
-
-       return ratelimited ? ratelimit_existing : ratelimit_pass;
-}
-
-static gboolean
-rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                        rspamd_inet_addr_t *addr,
-                                                        struct rspamd_worker *worker,
-                                                        ev_tstamp timestamp)
-{
-       rspamd_inet_addr_t *masked;
-       struct rspamd_leaky_bucket_elt *elt;
-
-       if (!addr) {
-               return TRUE;
-       }
-
-       if (ctx->ratelimit_whitelist != NULL) {
-               if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
-                                                                               addr) != NULL) {
-                       return TRUE;
-               }
-       }
-
-       /* Skip ratelimit for local addresses */
-       if (rspamd_inet_address_is_local(addr)) {
-               return TRUE;
-       }
-
-       masked = rspamd_inet_address_copy(addr, NULL);
-
-       if (rspamd_inet_address_get_af(masked) == AF_INET) {
-               rspamd_inet_address_apply_mask(masked,
-                                                                          MIN(ctx->leaky_bucket_mask, 32));
-       }
-       else {
-               /* Must be at least /64 */
-               rspamd_inet_address_apply_mask(masked,
-                                                                          MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
-       }
-
-       elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
-                                                                (time_t) timestamp);
-
-       if (elt) {
-               enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr,
-                                                                                                                                                                        timestamp, elt,
-                                                                                                                                                                        ratelimit_policy_permanent,
-                                                                                                                                                                        ctx->leaky_bucket_burst,
-                                                                                                                                                                        ctx->leaky_bucket_rate);
-
-               if (res == ratelimit_new) {
-                       msg_info("ratelimiting %s (%s), %.1f max elts",
-                                        rspamd_inet_address_to_string(addr),
-                                        rspamd_inet_address_to_string(masked),
-                                        ctx->leaky_bucket_burst);
-
-                       struct rspamd_srv_command srv_cmd;
-
-                       srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
-                       srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
-
-                       if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
-                               socklen_t slen;
-                               struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
-
-                               if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
-                                       memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
-                                       msg_debug("propagating blocked address to other workers");
-                                       rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
-                               }
-                               else {
-                                       msg_err("bad address length: %d, expected to be %d",
-                                                       (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
-                               }
-                       }
-
-                       if (ctx->lua_blacklist_handlers) {
-                               struct rspamd_ratelimit_callback_ctx cb_ctx = {
-                                       .addr = addr,
-                                       .reason = "ratelimit",
-                                       .type = RATELIMIT_EVENT_NEW,
-                                       .bucket = elt,
-                                       .max_burst = ctx->leaky_bucket_burst,
-                                       .max_rate = ctx->leaky_bucket_rate,
-                                       .session = NULL,
-                               };
-                               rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
-                       }
-               }
-               else if (res == ratelimit_existing) {
-                       if (ctx->lua_blacklist_handlers) {
-                               struct rspamd_ratelimit_callback_ctx cb_ctx = {
-                                       .addr = addr,
-                                       .reason = "ratelimit",
-                                       .type = RATELIMIT_EVENT_EXISTING,
-                                       .bucket = elt,
-                                       .max_burst = ctx->leaky_bucket_burst,
-                                       .max_rate = ctx->leaky_bucket_rate,
-                                       .session = NULL,
-                               };
-                               rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
-                       }
-               }
-
-               rspamd_inet_address_free(masked);
-
-               return res == ratelimit_pass;
-       }
-       else {
-               /* New bucket */
-               elt = g_malloc(sizeof(*elt));
-               elt->addr = masked; /* transfer ownership */
-               elt->cur = 1;
-               elt->last = timestamp;
-
-               rspamd_lru_hash_insert(ctx->ratelimit_buckets,
-                                                          masked,
-                                                          elt,
-                                                          timestamp,
-                                                          ctx->leaky_bucket_ttl);
-       }
-
-       return TRUE;
-}
-
-/*
- * Push bucket info as a Lua table
- */
-static void
-rspamd_fuzzy_bucket_info_tolua(lua_State *L,
-                                                          const struct rspamd_ratelimit_callback_ctx *cb_ctx)
-{
-       if (!cb_ctx->bucket) {
-               lua_pushnil(L);
-               return;
-       }
-
-       lua_createtable(L, 0, 6);
-
-       /* bucket_level - current fill level (nil if permanently blocked) */
-       if (isnan(cb_ctx->bucket->cur)) {
-               lua_pushnil(L);
-               lua_setfield(L, -2, "bucket_level");
-               lua_pushboolean(L, TRUE);
-               lua_setfield(L, -2, "is_permanent");
-       }
-       else {
-               lua_pushnumber(L, cb_ctx->bucket->cur);
-               lua_setfield(L, -2, "bucket_level");
-               lua_pushboolean(L, FALSE);
-               lua_setfield(L, -2, "is_permanent");
-       }
-
-       /* max_burst */
-       if (!isnan(cb_ctx->max_burst)) {
-               lua_pushnumber(L, cb_ctx->max_burst);
-               lua_setfield(L, -2, "max_burst");
-       }
-
-       /* max_rate */
-       if (!isnan(cb_ctx->max_rate)) {
-               lua_pushnumber(L, cb_ctx->max_rate);
-               lua_setfield(L, -2, "max_rate");
-       }
-
-       /* exceeded_by - how much over the limit */
-       if (!isnan(cb_ctx->bucket->cur) && !isnan(cb_ctx->max_burst) &&
-               cb_ctx->bucket->cur > cb_ctx->max_burst) {
-               lua_pushnumber(L, cb_ctx->bucket->cur - cb_ctx->max_burst);
-               lua_setfield(L, -2, "exceeded_by");
-       }
-
-       /* last_seen */
-       lua_pushnumber(L, cb_ctx->bucket->last);
-       lua_setfield(L, -2, "last_seen");
-}
-
-/*
- * Push extensions from session or callback context as a Lua table
- */
-static void
-rspamd_fuzzy_ratelimit_extensions_tolua(lua_State *L,
-                                                                               const struct rspamd_ratelimit_callback_ctx *cb_ctx)
-{
-       struct rspamd_fuzzy_cmd_extension *ext;
-       rspamd_inet_addr_t *addr;
-
-       lua_createtable(L, 0, 2);
-
-       if (!cb_ctx->session || !cb_ctx->session->extensions) {
-               return;
-       }
-
-       LL_FOREACH(cb_ctx->session->extensions, ext)
-       {
-               switch (ext->ext) {
-               case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN:
-                       lua_pushlstring(L, (const char *) ext->payload, ext->length);
-                       lua_setfield(L, -2, "domain");
-                       break;
-               case RSPAMD_FUZZY_EXT_SOURCE_IP4:
-                       addr = rspamd_inet_address_new(AF_INET, ext->payload);
-                       rspamd_lua_ip_push(L, addr);
-                       rspamd_inet_address_free(addr);
-                       lua_setfield(L, -2, "source_ip");
-                       break;
-               case RSPAMD_FUZZY_EXT_SOURCE_IP6:
-                       addr = rspamd_inet_address_new(AF_INET6, ext->payload);
-                       rspamd_lua_ip_push(L, addr);
-                       rspamd_inet_address_free(addr);
-                       lua_setfield(L, -2, "source_ip");
-                       break;
-               }
-       }
-}
-
-/*
- * Enhanced Lua callback for ratelimit/blacklist events
- * Passes 7 arguments to Lua:
- * 1. ip (rspamd_ip) - Client IP address
- * 2. reason (string) - "ratelimit" or "blacklisted"
- * 3. event_type (string) - "new", "existing", or "blacklist"
- * 4. ratelimit_info (table/nil) - Bucket state details
- * 5. digest (rspamd_text/nil) - Hash if session available
- * 6. extensions (table) - Domain, source IP from extensions
- */
-static void
-rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                                        const struct rspamd_ratelimit_callback_ctx *cb_ctx)
-{
-       if (ctx->lua_blacklist_handlers == NULL) {
-               return;
-       }
-
-       struct rspamd_lua_fuzzy_script *cur;
-       LL_FOREACH(ctx->lua_blacklist_handlers, cur)
-       {
-               lua_State *L = ctx->cfg->lua_state;
-               int err_idx, ret;
-               const int nargs = 6;
-
-               lua_pushcfunction(L, &rspamd_lua_traceback);
-               err_idx = lua_gettop(L);
-               lua_checkstack(L, err_idx + nargs + 2);
-               lua_rawgeti(L, LUA_REGISTRYINDEX, cur->cbref);
-
-               /* Arg 1: client IP */
-               rspamd_lua_ip_push(L, cb_ctx->addr);
-
-               /* Arg 2: block reason */
-               lua_pushstring(L, cb_ctx->reason);
-
-               /* Arg 3: event type */
-               switch (cb_ctx->type) {
-               case RATELIMIT_EVENT_NEW:
-                       lua_pushliteral(L, "new");
-                       break;
-               case RATELIMIT_EVENT_EXISTING:
-                       lua_pushliteral(L, "existing");
-                       break;
-               case RATELIMIT_EVENT_BLACKLIST:
-                       lua_pushliteral(L, "blacklist");
-                       break;
-               }
-
-               /* Arg 4: ratelimit_info table (or nil) */
-               rspamd_fuzzy_bucket_info_tolua(L, cb_ctx);
-
-               /* Arg 5: digest (or nil) */
-               if (cb_ctx->session) {
-                       (void) lua_new_text(L, (const char *) cb_ctx->session->cmd.basic.digest,
-                                                               sizeof(cb_ctx->session->cmd.basic.digest), FALSE);
-               }
-               else {
-                       lua_pushnil(L);
-               }
-
-               /* Arg 6: extensions table */
-               rspamd_fuzzy_ratelimit_extensions_tolua(L, cb_ctx);
-
-               if ((ret = lua_pcall(L, nargs, 0, err_idx)) != 0) {
-                       msg_err("call to lua_blacklist_cbref "
-                                       "script failed (%d): %s",
-                                       ret, lua_tostring(L, -1));
-               }
-
-               lua_settop(L, 0);
-       }
-}
-
-/*
- * Backwards-compatible wrapper that calls the enhanced handler
- */
-static void
-rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                                       rspamd_inet_addr_t *addr,
-                                                                       const char *reason)
-{
-       if (ctx->lua_blacklist_handlers == NULL) {
-               return;
-       }
-
-       struct rspamd_ratelimit_callback_ctx cb_ctx = {
-               .addr = addr,
-               .reason = reason,
-               .type = g_strcmp0(reason, "blacklisted") == 0 ? RATELIMIT_EVENT_BLACKLIST : RATELIMIT_EVENT_EXISTING,
-               .bucket = NULL,
-               .max_burst = NAN,
-               .max_rate = NAN,
-               .session = NULL,
-       };
-       rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
-}
-
-static gboolean
-rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
-                                                 rspamd_inet_addr_t *addr)
-{
-       if (ctx->blocked_ips != NULL) {
-               if (rspamd_match_radix_map_addr(ctx->blocked_ips,
-                                                                               addr) != NULL) {
-
-                       rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "blacklisted");
-                       return FALSE;
-               }
-       }
-
-       return TRUE;
-}
-
 static gboolean
 rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx,
                                                 rspamd_inet_addr_t *addr,
 static gboolean
 rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx,
                                                 rspamd_inet_addr_t *addr,
@@ -973,88 +199,15 @@ rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx,
        }
 
        if (key) {
        }
 
        if (key) {
-               if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) {
-                       return TRUE;
-               }
-               else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) {
-                       return TRUE;
-               }
-       }
-
-       return FALSE;
-}
-
-static void
-fuzzy_key_stat_dtor(gpointer p)
-{
-       struct fuzzy_key_stat *st = p;
-
-       if (st->last_ips) {
-               rspamd_lru_hash_destroy(st->last_ips);
-       }
-
-       if (st->keypair) {
-               rspamd_keypair_unref(st->keypair);
-       }
-
-       g_free(st);
-}
-
-static void
-fuzzy_key_stat_unref(gpointer p)
-{
-       struct fuzzy_key_stat *st = p;
-
-       REF_RELEASE(st);
-}
-
-static void
-fuzzy_key_dtor(gpointer p)
-{
-       struct fuzzy_key *key = p;
-
-       if (key) {
-               if (key->key) {
-                       rspamd_keypair_unref(key->key);
-               }
-
-               if (key->stat) {
-                       REF_RELEASE(key->stat);
-               }
-
-               if (key->flags_stat) {
-                       kh_destroy(fuzzy_key_flag_stat, key->flags_stat);
-               }
-
-               if (key->forbidden_ids) {
-                       kh_destroy(fuzzy_key_ids_set, key->forbidden_ids);
-               }
-
-               if (key->rl_bucket) {
-                       /* TODO: save bucket stats */
-                       g_free(key->rl_bucket);
-               }
-
-               if (key->name) {
-                       g_free(key->name);
+               if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) {
+                       return TRUE;
                }
                }
-
-               if (key->extensions) {
-                       ucl_object_unref(key->extensions);
+               else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) {
+                       return TRUE;
                }
                }
-
-               g_free(key);
        }
        }
-}
 
 
-static void
-fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash)
-{
-       struct fuzzy_key *key;
-       kh_foreach_value(hash, key, {
-               REF_RELEASE(key);
-       });
-       kh_destroy(rspamd_fuzzy_keys_hash, hash);
+       return FALSE;
 }
 
 static void
 }
 
 static void
@@ -1065,15 +218,6 @@ fuzzy_count_callback(uint64_t count, void *ud)
        ctx->stat.fuzzy_hashes = count;
 }
 
        ctx->stat.fuzzy_hashes = count;
 }
 
-static void
-fuzzy_rl_bucket_free(gpointer p)
-{
-       struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) p;
-
-       rspamd_inet_address_free(elt->addr);
-       g_free(elt);
-}
-
 static void
 fuzzy_stat_count_callback(uint64_t count, void *ud)
 {
 static void
 fuzzy_stat_count_callback(uint64_t count, void *ud)
 {
@@ -3292,313 +2436,6 @@ rspamd_fuzzy_storage_reload(struct rspamd_main *rspamd_main,
        return TRUE;
 }
 
        return TRUE;
 }
 
-static ucl_object_t *
-rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat)
-{
-       ucl_object_t *res;
-
-       res = ucl_object_typed_new(UCL_OBJECT);
-
-       ucl_object_insert_key(res, ucl_object_fromint(key_stat->checked),
-                                                 "checked", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->checked_ctr.mean),
-                                                 "checked_per_hour", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromint(key_stat->matched),
-                                                 "matched", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->matched_ctr.mean),
-                                                 "matched_per_hour", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromint(key_stat->added),
-                                                 "added", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromint(key_stat->deleted),
-                                                 "deleted", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromint(key_stat->errors),
-                                                 "errors", 0, false);
-
-       return res;
-}
-
-static void
-rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter, struct fuzzy_key *fuzzy_key, ucl_object_t *keys_obj, gboolean ip_stat)
-{
-       struct fuzzy_key_stat *key_stat = fuzzy_key->stat;
-       char keyname[17];
-
-       if (key_stat) {
-               rspamd_snprintf(keyname, sizeof(keyname), "%8bs", pk_iter);
-
-               ucl_object_t *elt = rspamd_fuzzy_storage_stat_key(key_stat);
-
-               if (key_stat->last_ips && ip_stat) {
-                       int i = 0;
-                       ucl_object_t *ip_elt = ucl_object_typed_new(UCL_OBJECT);
-                       gpointer k, v;
-
-                       while ((i = rspamd_lru_hash_foreach(key_stat->last_ips,
-                                                                                               i, &k, &v)) != -1) {
-                               ucl_object_t *ip_cur = rspamd_fuzzy_storage_stat_key(v);
-                               ucl_object_insert_key(ip_elt, ip_cur,
-                                                                         rspamd_inet_address_to_string(k), 0, true);
-                       }
-                       ucl_object_insert_key(elt, ip_elt, "ips", 0, false);
-               }
-
-               int flag;
-               struct fuzzy_key_stat *flag_stat;
-               ucl_object_t *flags_ucl = ucl_object_typed_new(UCL_OBJECT);
-
-               kh_foreach_key_value_ptr(fuzzy_key->flags_stat, flag, flag_stat, {
-                       char intbuf[16];
-                       rspamd_snprintf(intbuf, sizeof(intbuf), "%d", flag);
-                       ucl_object_insert_key(flags_ucl, rspamd_fuzzy_storage_stat_key(flag_stat),
-                                                                 intbuf, 0, true);
-               });
-
-               ucl_object_insert_key(elt, flags_ucl, "flags", 0, false);
-
-               ucl_object_insert_key(elt,
-                                                         rspamd_keypair_to_ucl(fuzzy_key->key, RSPAMD_KEYPAIR_ENCODING_DEFAULT,
-                                                                                                       RSPAMD_KEYPAIR_DUMP_NO_SECRET | RSPAMD_KEYPAIR_DUMP_FLATTENED),
-                                                         "keypair", 0, false);
-
-               if (fuzzy_key->rl_bucket) {
-                       ucl_object_insert_key(elt,
-                                                                 rspamd_leaky_bucket_to_ucl(fuzzy_key->rl_bucket),
-                                                                 "ratelimit", 0, false);
-               }
-
-               ucl_object_insert_key(keys_obj, elt, keyname, 0, true);
-       }
-}
-static ucl_object_t *
-rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt)
-{
-       ucl_object_t *res;
-
-       res = ucl_object_typed_new(UCL_OBJECT);
-
-       ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->cur), "cur", 0, false);
-       ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->last), "last", 0, false);
-
-       return res;
-}
-
-static ucl_object_t *
-rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat)
-{
-       struct fuzzy_key *fuzzy_key;
-       ucl_object_t *obj, *keys_obj, *elt, *ip_elt;
-       const unsigned char *pk_iter;
-
-       obj = ucl_object_typed_new(UCL_OBJECT);
-
-       keys_obj = ucl_object_typed_new(UCL_OBJECT);
-
-       kh_foreach(ctx->keys, pk_iter, fuzzy_key, {
-               rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
-       });
-
-       if (ctx->dynamic_keys) {
-               kh_foreach(ctx->dynamic_keys, pk_iter, fuzzy_key, {
-                       rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
-               });
-       }
-
-       ucl_object_insert_key(obj, keys_obj, "keys", 0, false);
-
-       /* Now generic stats */
-       ucl_object_insert_key(obj,
-                                                 ucl_object_fromint(ctx->stat.fuzzy_hashes),
-                                                 "fuzzy_stored",
-                                                 0,
-                                                 false);
-       ucl_object_insert_key(obj,
-                                                 ucl_object_fromint(ctx->stat.fuzzy_hashes_expired),
-                                                 "fuzzy_expired",
-                                                 0,
-                                                 false);
-       ucl_object_insert_key(obj,
-                                                 ucl_object_fromint(ctx->stat.invalid_requests),
-                                                 "invalid_requests",
-                                                 0,
-                                                 false);
-       ucl_object_insert_key(obj,
-                                                 ucl_object_fromint(ctx->stat.delayed_hashes),
-                                                 "delayed_hashes",
-                                                 0,
-                                                 false);
-
-       if (ctx->errors_ips && ip_stat) {
-               gpointer k, v;
-               int i = 0;
-               ip_elt = ucl_object_typed_new(UCL_OBJECT);
-
-               while ((i = rspamd_lru_hash_foreach(ctx->errors_ips, i, &k, &v)) != -1) {
-                       ucl_object_insert_key(ip_elt,
-                                                                 ucl_object_fromint(*(uint64_t *) v),
-                                                                 rspamd_inet_address_to_string(k), 0, true);
-               }
-
-               ucl_object_insert_key(obj,
-                                                         ip_elt,
-                                                         "errors_ips",
-                                                         0,
-                                                         false);
-       }
-
-       /* Checked by epoch */
-       elt = ucl_object_typed_new(UCL_ARRAY);
-
-       for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
-               ucl_array_append(elt,
-                                                ucl_object_fromint(ctx->stat.fuzzy_hashes_checked[i]));
-       }
-
-       ucl_object_insert_key(obj, elt, "fuzzy_checked", 0, false);
-
-       /* Shingles by epoch */
-       elt = ucl_object_typed_new(UCL_ARRAY);
-
-       for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
-               ucl_array_append(elt,
-                                                ucl_object_fromint(ctx->stat.fuzzy_shingles_checked[i]));
-       }
-
-       ucl_object_insert_key(obj, elt, "fuzzy_shingles", 0, false);
-
-       /* Matched by epoch */
-       elt = ucl_object_typed_new(UCL_ARRAY);
-
-       for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
-               ucl_array_append(elt,
-                                                ucl_object_fromint(ctx->stat.fuzzy_hashes_found[i]));
-       }
-
-       ucl_object_insert_key(obj, elt, "fuzzy_found", 0, false);
-
-
-       return obj;
-}
-
-static void
-rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
-{
-       char path[PATH_MAX];
-
-       rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
-                                       RSPAMD_DBDIR);
-
-       if (access(path, R_OK) != -1) {
-               struct ucl_parser *parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
-               if (ucl_parser_add_file(parser, path)) {
-                       ucl_object_t *obj = ucl_parser_get_object(parser);
-                       int loaded = 0;
-
-                       if (ucl_object_type(obj) == UCL_ARRAY) {
-                               ucl_object_iter_t it = NULL;
-                               const ucl_object_t *cur;
-
-                               while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
-                                       const ucl_object_t *ip, *value, *last;
-                                       const char *ip_str;
-                                       double limit_val, last_val;
-
-                                       ip = ucl_object_find_key(cur, "ip");
-                                       value = ucl_object_find_key(cur, "value");
-                                       last = ucl_object_find_key(cur, "last");
-
-                                       if (ip == NULL || value == NULL || last == NULL) {
-                                               msg_err("invalid ratelimit object");
-                                               continue;
-                                       }
-
-                                       ip_str = ucl_object_tostring(ip);
-                                       limit_val = ucl_object_todouble(value);
-                                       last_val = ucl_object_todouble(last);
-
-                                       if (ip_str == NULL || isnan(last_val)) {
-                                               msg_err("invalid ratelimit object");
-                                               continue;
-                                       }
-
-                                       rspamd_inet_addr_t *addr;
-                                       if (rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str),
-                                                                                                 RSPAMD_INET_ADDRESS_PARSE_NO_UNIX | RSPAMD_INET_ADDRESS_PARSE_NO_PORT)) {
-                                               struct rspamd_leaky_bucket_elt *elt = g_malloc(sizeof(*elt));
-
-                                               elt->cur = limit_val;
-                                               elt->last = last_val;
-                                               elt->addr = addr;
-                                               rspamd_lru_hash_insert(ctx->ratelimit_buckets, addr, elt, elt->last, ctx->leaky_bucket_ttl);
-                                               loaded++;
-                                       }
-                                       else {
-                                               msg_err("invalid ratelimit ip: %s", ip_str);
-                                               continue;
-                                       }
-                               }
-
-                               msg_info("loaded %d ratelimit objects", loaded);
-                       }
-
-                       ucl_object_unref(obj);
-               }
-
-               ucl_parser_free(parser);
-       }
-}
-
-static void
-rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
-{
-       char path[PATH_MAX];
-
-       rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl.new",
-                                       RSPAMD_DBDIR);
-       FILE *f = fopen(path, "w");
-
-       if (f != NULL) {
-               ucl_object_t *top = ucl_object_typed_new(UCL_ARRAY);
-               int it = 0;
-               gpointer k, v;
-
-               ucl_object_reserve(top, rspamd_lru_hash_size(ctx->ratelimit_buckets));
-
-               while ((it = rspamd_lru_hash_foreach(ctx->ratelimit_buckets, it, &k, &v)) != -1) {
-                       ucl_object_t *cur = ucl_object_typed_new(UCL_OBJECT);
-                       struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) v;
-
-                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->cur), "value", 0, false);
-                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->last), "last", 0, false);
-                       ucl_object_insert_key(cur, ucl_object_fromstring(rspamd_inet_address_to_string(elt->addr)), "ip", 0, false);
-                       ucl_array_append(top, cur);
-               }
-
-               if (ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, ucl_object_emit_file_funcs(f), NULL)) {
-                       char npath[PATH_MAX];
-
-                       fflush(f);
-                       rspamd_snprintf(npath, sizeof(npath), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
-                                                       RSPAMD_DBDIR);
-
-                       if (rename(path, npath) == -1) {
-                               msg_warn("cannot rename %s to %s: %s", path, npath, strerror(errno));
-                       }
-                       else {
-                               msg_info("saved %d ratelimits in %s", rspamd_lru_hash_size(ctx->ratelimit_buckets), npath);
-                       }
-               }
-               else {
-                       msg_warn("cannot serialize ratelimit buckets to %s: %s", path, strerror(errno));
-               }
-
-               fclose(f);
-               ucl_object_unref(top);
-       }
-       else {
-               msg_warn("cannot save ratelimit buckets to %s: %s", path, strerror(errno));
-       }
-}
-
 static int
 lua_fuzzy_add_pre_handler(lua_State *L)
 {
 static int
 lua_fuzzy_add_pre_handler(lua_State *L)
 {
@@ -3687,373 +2524,6 @@ lua_fuzzy_add_blacklist_handler(lua_State *L)
        return 0;
 }
 
        return 0;
 }
 
-static gboolean
-rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main,
-                                                 struct rspamd_worker *worker, int fd,
-                                                 int attached_fd,
-                                                 struct rspamd_control_command *cmd,
-                                                 gpointer ud)
-{
-       struct rspamd_fuzzy_storage_ctx *ctx = ud;
-       struct rspamd_control_reply rep;
-       ucl_object_t *obj;
-       struct ucl_emitter_functions *emit_subr;
-       unsigned char fdspace[CMSG_SPACE(sizeof(int))];
-       struct iovec iov;
-       struct msghdr msg;
-       struct cmsghdr *cmsg;
-
-       int outfd = -1;
-       char tmppath[PATH_MAX];
-
-       memset(&rep, 0, sizeof(rep));
-       rep.type = RSPAMD_CONTROL_FUZZY_STAT;
-       rep.id = cmd->id;
-
-       rspamd_snprintf(tmppath, sizeof(tmppath), "%s%c%s-XXXXXXXXXX",
-                                       rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat");
-
-       if ((outfd = mkstemp(tmppath)) == -1) {
-               rep.reply.fuzzy_stat.status = errno;
-               msg_info_main("cannot make temporary stat file for fuzzy stat: %s",
-                                         strerror(errno));
-       }
-       else {
-               const char *backend_id;
-
-               rep.reply.fuzzy_stat.status = 0;
-
-               backend_id = rspamd_fuzzy_backend_id(ctx->backend);
-               if (backend_id) {
-                       memcpy(rep.reply.fuzzy_stat.storage_id,
-                                  backend_id,
-                                  sizeof(rep.reply.fuzzy_stat.storage_id));
-               }
-
-               obj = rspamd_fuzzy_stat_to_ucl(ctx, TRUE);
-               emit_subr = ucl_object_emit_fd_funcs(outfd);
-               ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL);
-               ucl_object_emit_funcs_free(emit_subr);
-               ucl_object_unref(obj);
-               /* Rewind output file */
-               close(outfd);
-               outfd = open(tmppath, O_RDONLY);
-               unlink(tmppath);
-       }
-
-       /* Now we can send outfd and status message */
-       memset(&msg, 0, sizeof(msg));
-
-       /* Attach fd to the message */
-       if (outfd != -1) {
-               memset(fdspace, 0, sizeof(fdspace));
-               msg.msg_control = fdspace;
-               msg.msg_controllen = sizeof(fdspace);
-               cmsg = CMSG_FIRSTHDR(&msg);
-
-               if (cmsg) {
-                       cmsg->cmsg_level = SOL_SOCKET;
-                       cmsg->cmsg_type = SCM_RIGHTS;
-                       cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-                       memcpy(CMSG_DATA(cmsg), &outfd, sizeof(int));
-               }
-       }
-
-       iov.iov_base = &rep;
-       iov.iov_len = sizeof(rep);
-       msg.msg_iov = &iov;
-       msg.msg_iovlen = 1;
-
-       if (sendmsg(fd, &msg, 0) == -1) {
-               msg_err_main("cannot send fuzzy stat: %s", strerror(errno));
-       }
-
-       if (outfd != -1) {
-               close(outfd);
-       }
-
-       return TRUE;
-}
-
-static gboolean
-fuzzy_parse_ids(rspamd_mempool_t *pool,
-                               const ucl_object_t *obj,
-                               gpointer ud,
-                               struct rspamd_rcl_section *section,
-                               GError **err)
-{
-       struct rspamd_rcl_struct_parser *pd = (struct rspamd_rcl_struct_parser *) ud;
-       khash_t(fuzzy_key_ids_set) * target;
-
-       target = *(khash_t(fuzzy_key_ids_set) **) ((char *) pd->user_struct + pd->offset);
-
-       if (ucl_object_type(obj) == UCL_ARRAY) {
-               const ucl_object_t *cur;
-               ucl_object_iter_t it = NULL;
-               uint64_t id;
-
-               while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
-                       if (ucl_object_toint_safe(cur, &id)) {
-                               int r;
-
-                               kh_put(fuzzy_key_ids_set, target, id, &r);
-                       }
-                       else {
-                               return FALSE;
-                       }
-               }
-
-               return TRUE;
-       }
-       else if (ucl_object_type(obj) == UCL_INT) {
-               int r;
-               kh_put(fuzzy_key_ids_set, target, ucl_object_toint(obj), &r);
-
-               return TRUE;
-       }
-
-       return FALSE;
-}
-
-static struct fuzzy_key *
-fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj,
-                                                  khash_t(rspamd_fuzzy_keys_hash) * target)
-{
-       struct rspamd_cryptobox_keypair *kp = rspamd_keypair_from_ucl(obj);
-
-       if (kp == NULL) {
-               return NULL;
-       }
-
-       if (rspamd_keypair_type(kp) != RSPAMD_KEYPAIR_KEX) {
-               rspamd_keypair_unref(kp);
-               return FALSE;
-       }
-
-       struct fuzzy_key *key = g_malloc0(sizeof(*key));
-       REF_INIT_RETAIN(key, fuzzy_key_dtor);
-       key->key = kp;
-       struct fuzzy_key_stat *keystat = g_malloc0(sizeof(*keystat));
-       REF_INIT_RETAIN(keystat, fuzzy_key_stat_dtor);
-       /* Hash of ip -> fuzzy_key_stat */
-       keystat->last_ips = rspamd_lru_hash_new_full(1024,
-                                                                                                (GDestroyNotify) rspamd_inet_address_free,
-                                                                                                fuzzy_key_stat_unref,
-                                                                                                rspamd_inet_address_hash, rspamd_inet_address_equal);
-       key->stat = keystat;
-       key->flags_stat = kh_init(fuzzy_key_flag_stat);
-       key->burst = NAN;
-       key->rate = NAN;
-       key->expire = NAN;
-       key->rl_bucket = NULL;
-       /* Allow read by default */
-       key->flags = FUZZY_KEY_READ;
-       /* Preallocate some space for flags */
-       kh_resize(fuzzy_key_flag_stat, key->flags_stat, 8);
-       const unsigned char *pk = rspamd_keypair_component(kp, RSPAMD_KEYPAIR_COMPONENT_PK,
-                                                                                                          NULL);
-       keystat->keypair = rspamd_keypair_ref(kp);
-       /* We map entries by pubkey in binary form for faster lookup */
-       khiter_t k;
-       int r;
-
-       k = kh_put(rspamd_fuzzy_keys_hash, target, pk, &r);
-
-       if (r == 0) {
-               msg_err("duplicate keypair found: pk=%*bs",
-                               32, pk);
-               REF_RELEASE(key);
-
-               return FALSE;
-       }
-       else if (r == -1) {
-               msg_err("hash insertion error: pk=%*bs",
-                               32, pk);
-               REF_RELEASE(key);
-
-               return FALSE;
-       }
-
-       kh_val(target, k) = key;
-
-       const ucl_object_t *extensions = rspamd_keypair_get_extensions(kp);
-
-       if (extensions) {
-               key->extensions = ucl_object_ref(extensions);
-               lua_State *L = RSPAMD_LUA_CFG_STATE(cfg);
-               const ucl_object_t *forbidden_ids = ucl_object_lookup(extensions, "forbidden_ids");
-
-               if (forbidden_ids && ucl_object_type(forbidden_ids) == UCL_ARRAY) {
-                       key->forbidden_ids = kh_init(fuzzy_key_ids_set);
-                       const ucl_object_t *cur;
-                       ucl_object_iter_t it = NULL;
-
-                       while ((cur = ucl_object_iterate(forbidden_ids, &it, true)) != NULL) {
-                               if (ucl_object_type(cur) == UCL_INT || ucl_object_type(cur) == UCL_FLOAT) {
-                                       int id = ucl_object_toint(cur);
-                                       int r;
-
-                                       kh_put(fuzzy_key_ids_set, key->forbidden_ids, id, &r);
-                               }
-                       }
-               }
-
-               const ucl_object_t *ratelimit = ucl_object_lookup(extensions, "ratelimit");
-
-               static int ratelimit_lua_id = -1;
-
-               if (ratelimit_lua_id == -1) {
-                       /* Load ratelimit parsing function */
-                       if (!rspamd_lua_require_function(L, "plugins/ratelimit", "parse_limit")) {
-                               msg_err_config("cannot load ratelimit parser from ratelimit plugin");
-                       }
-                       else {
-                               ratelimit_lua_id = luaL_ref(L, LUA_REGISTRYINDEX);
-                       }
-               }
-
-               if (ratelimit && ratelimit_lua_id != -1) {
-                       lua_rawgeti(L, LUA_REGISTRYINDEX, ratelimit_lua_id);
-                       lua_pushstring(L, "fuzzy_key_ratelimit");
-                       ucl_object_push_lua(L, ratelimit, false);
-
-                       if (lua_pcall(L, 2, 1, 0) != 0) {
-                               msg_err_config("cannot call ratelimit parser from ratelimit plugin");
-                       }
-                       else {
-                               if (lua_type(L, -1) == LUA_TTABLE) {
-                                       /*
-                                        * The returned table is in form { rate = xx, burst = yy }
-                                        */
-                                       lua_getfield(L, -1, "rate");
-                                       key->rate = lua_tonumber(L, -1);
-                                       lua_pop(L, 1);
-
-                                       lua_getfield(L, -1, "burst");
-                                       key->burst = lua_tonumber(L, -1);
-                                       lua_pop(L, 1);
-
-                                       key->rl_bucket = g_malloc0(sizeof(*key->rl_bucket));
-                               }
-                       }
-
-                       lua_settop(L, 0);
-               }
-
-               const ucl_object_t *expire = ucl_object_lookup(extensions, "expire");
-               if (expire && ucl_object_type(expire) == UCL_STRING) {
-                       struct tm tm;
-
-                       /* DD-MM-YYYY */
-                       char *end = strptime(ucl_object_tostring(expire), "%d-%m-%Y", &tm);
-
-                       if (end != NULL && *end != '\0') {
-                               msg_err_config("cannot parse expire date: %s", ucl_object_tostring(expire));
-                       }
-                       else {
-                               key->expire = mktime(&tm);
-                       }
-               }
-
-               const ucl_object_t *name = ucl_object_lookup(extensions, "name");
-               if (name && ucl_object_type(name) == UCL_STRING) {
-                       key->name = g_strdup(ucl_object_tostring(name));
-               }
-
-               /* Check permissions */
-               const ucl_object_t *read_only = ucl_object_lookup(extensions, "read_only");
-               if (read_only && ucl_object_type(read_only) == UCL_BOOLEAN) {
-                       if (ucl_object_toboolean(read_only)) {
-                               key->flags &= ~(FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
-                       }
-                       else {
-                               key->flags |= (FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
-                       }
-               }
-
-               const ucl_object_t *allowed_ops = ucl_object_lookup(extensions, "allowed_ops");
-               if (allowed_ops && ucl_object_type(allowed_ops) == UCL_ARRAY) {
-                       const ucl_object_t *cur;
-                       ucl_object_iter_t it = NULL;
-                       /* Reset to only allowed */
-                       key->flags = 0;
-
-                       while ((cur = ucl_object_iterate(allowed_ops, &it, true)) != NULL) {
-                               if (ucl_object_type(cur) == UCL_STRING) {
-                                       const char *op = ucl_object_tostring(cur);
-
-                                       if (g_ascii_strcasecmp(op, "read") == 0) {
-                                               key->flags |= FUZZY_KEY_READ;
-                                       }
-                                       else if (g_ascii_strcasecmp(op, "write") == 0) {
-                                               key->flags |= FUZZY_KEY_WRITE;
-                                       }
-                                       else if (g_ascii_strcasecmp(op, "delete") == 0) {
-                                               key->flags |= FUZZY_KEY_DELETE;
-                                       }
-                                       else {
-                                               msg_warn_config("invalid operation: %s", op);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       msg_debug("loaded keypair %*bs; expire=%f; rate=%f; burst=%f; name=%s",
-                         (int) crypto_box_publickeybytes(), pk,
-                         key->expire, key->rate, key->burst, key->name);
-
-       return key;
-}
-
-
-static gboolean
-fuzzy_parse_keypair(rspamd_mempool_t *pool,
-                                       const ucl_object_t *obj,
-                                       gpointer ud,
-                                       struct rspamd_rcl_section *section,
-                                       GError **err)
-{
-       struct rspamd_rcl_struct_parser *pd = ud;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       struct fuzzy_key *key;
-       const ucl_object_t *cur;
-       ucl_object_iter_t it = NULL;
-       gboolean ret;
-
-       ctx = pd->user_struct;
-       pd->offset = G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, default_keypair);
-
-       /*
-        * Single key
-        */
-       if (ucl_object_type(obj) == UCL_STRING || ucl_object_type(obj) == UCL_OBJECT) {
-               ret = rspamd_rcl_parse_struct_keypair(pool, obj, pd, section, err);
-
-               if (!ret) {
-                       return ret;
-               }
-
-               key = fuzzy_add_keypair_from_ucl(ctx->cfg, obj, ctx->keys);
-
-               if (key == NULL) {
-                       return FALSE;
-               }
-
-               /* Use the last one ? */
-               ctx->default_key = key;
-       }
-       else if (ucl_object_type(obj) == UCL_ARRAY) {
-               while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
-                       if (!fuzzy_parse_keypair(pool, cur, pd, section, err)) {
-                               msg_err_pool("cannot parse keypair");
-                       }
-               }
-       }
-
-       return TRUE;
-}
-
 gpointer
 init_fuzzy(struct rspamd_config *cfg)
 {
 gpointer
 init_fuzzy(struct rspamd_config *cfg)
 {
index ad9df344d1942959336161a7702ff1a2beb9f53b..8cc673f138ef75eb0e00e2367e123856fc0bb729 100644 (file)
@@ -13,6 +13,9 @@ SET(LIBRSPAMDSERVERSRC
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_sqlite.c
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_redis.c
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_noop.c
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_sqlite.c
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_redis.c
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_noop.c
+        ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_keys.c
+        ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_ratelimit.c
+        ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_stat.c
         ${CMAKE_CURRENT_SOURCE_DIR}/milter.c
         ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
         ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
         ${CMAKE_CURRENT_SOURCE_DIR}/milter.c
         ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
         ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
diff --git a/src/libserver/fuzzy_storage_internal.h b/src/libserver/fuzzy_storage_internal.h
new file mode 100644 (file)
index 0000000..7b3779a
--- /dev/null
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef RSPAMD_FUZZY_STORAGE_INTERNAL_H
+#define RSPAMD_FUZZY_STORAGE_INTERNAL_H
+
+#include "config.h"
+
+#include "rspamd.h"
+#include "ref.h"
+#include "libserver/fuzzy_wire.h"
+#include "libutil/hash.h"
+#include "contrib/libev/ev.h"
+#include "contrib/libucl/khash.h"
+
+#include <string.h>
+
+struct map_cb_data;
+struct rspamd_rcl_section;
+struct rspamd_rcl_struct_parser;
+struct rspamd_control_command;
+
+struct rspamd_main;
+struct rspamd_worker;
+struct rspamd_dns_resolver;
+struct rspamd_radix_map_helper;
+struct rspamd_hash_map_helper;
+struct rspamd_keypair_cache;
+struct rspamd_http_context;
+struct rspamd_fuzzy_backend;
+
+struct rspamd_cryptobox_keypair;
+struct rspamd_cryptobox_pubkey;
+
+struct fuzzy_tcp_session;
+
+struct fuzzy_global_stat {
+       uint64_t fuzzy_hashes;
+       uint64_t fuzzy_hashes_expired;
+       uint64_t fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX];
+       uint64_t fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX];
+       uint64_t fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX];
+       uint64_t invalid_requests;
+       uint64_t delayed_hashes;
+};
+
+struct fuzzy_key_stat {
+       uint64_t checked;
+       uint64_t matched;
+       uint64_t added;
+       uint64_t deleted;
+       uint64_t errors;
+       struct rspamd_counter_data checked_ctr;
+       struct rspamd_counter_data matched_ctr;
+       double last_checked_time;
+       uint64_t last_checked_count;
+       uint64_t last_matched_count;
+       struct rspamd_cryptobox_keypair *keypair;
+       rspamd_lru_hash_t *last_ips;
+
+       ref_entry_t ref;
+};
+
+struct rspamd_leaky_bucket_elt {
+       rspamd_inet_addr_t *addr;
+       double last;
+       double cur;
+};
+
+static inline int64_t
+fuzzy_kp_hash(const unsigned char *p)
+{
+       int64_t res;
+
+       memcpy(&res, p, sizeof(res));
+       return res;
+}
+
+static inline bool
+fuzzy_kp_equal(gconstpointer a, gconstpointer b)
+{
+       const unsigned char *pa = a, *pb = b;
+
+       return (memcmp(pa, pb, RSPAMD_FUZZY_KEYLEN) == 0);
+}
+
+enum fuzzy_key_op {
+       FUZZY_KEY_READ = 0x1u << 0,
+       FUZZY_KEY_WRITE = 0x1u << 1,
+       FUZZY_KEY_DELETE = 0x1u << 2,
+};
+
+KHASH_SET_INIT_INT(fuzzy_key_ids_set);
+KHASH_INIT(fuzzy_key_flag_stat, int, struct fuzzy_key_stat, 1, kh_int_hash_func,
+                  kh_int_hash_equal);
+
+struct fuzzy_key {
+       char *name;
+       struct rspamd_cryptobox_keypair *key;
+       struct rspamd_cryptobox_pubkey *pk;
+       struct fuzzy_key_stat *stat;
+       khash_t(fuzzy_key_flag_stat) * flags_stat;
+       khash_t(fuzzy_key_ids_set) * forbidden_ids;
+       struct rspamd_leaky_bucket_elt *rl_bucket;
+       ucl_object_t *extensions;
+       double burst;
+       double rate;
+       ev_tstamp expire;
+       bool expired;
+       int flags; /* enum fuzzy_key_op */
+       ref_entry_t ref;
+};
+
+KHASH_INIT(rspamd_fuzzy_keys_hash,
+                  const unsigned char *, struct fuzzy_key *, 1,
+                  fuzzy_kp_hash, fuzzy_kp_equal);
+
+struct rspamd_lua_fuzzy_script {
+       int cbref;
+       struct rspamd_lua_fuzzy_script *next;
+};
+
+struct rspamd_fuzzy_storage_ctx {
+       uint64_t magic;
+       struct ev_loop *event_loop;
+       struct rspamd_dns_resolver *resolver;
+       struct rspamd_config *cfg;
+       struct fuzzy_global_stat stat;
+       double expire;
+       double sync_timeout;
+       double delay;
+       double tcp_timeout;
+       struct rspamd_radix_map_helper *update_ips;
+       struct rspamd_hash_map_helper *update_keys;
+       struct rspamd_radix_map_helper *blocked_ips;
+       struct rspamd_radix_map_helper *ratelimit_whitelist;
+       struct rspamd_radix_map_helper *delay_whitelist;
+
+       const ucl_object_t *update_map;
+       const ucl_object_t *update_keys_map;
+       const ucl_object_t *delay_whitelist_map;
+       const ucl_object_t *blocked_map;
+       const ucl_object_t *ratelimit_whitelist_map;
+       const ucl_object_t *dynamic_keys_map;
+
+       unsigned int keypair_cache_size;
+       ev_timer stat_ev;
+       ev_io peer_ev;
+
+       struct rspamd_cryptobox_keypair *default_keypair;
+       struct fuzzy_key *default_key;
+       khash_t(rspamd_fuzzy_keys_hash) * keys;
+       khash_t(rspamd_fuzzy_keys_hash) * dynamic_keys;
+
+       gboolean encrypted_only;
+       gboolean read_only;
+       gboolean dedicated_update_worker;
+       struct rspamd_keypair_cache *keypair_cache;
+       struct rspamd_http_context *http_ctx;
+       rspamd_lru_hash_t *errors_ips;
+       rspamd_lru_hash_t *ratelimit_buckets;
+       struct rspamd_fuzzy_backend *backend;
+       GArray *updates_pending;
+       unsigned int updates_failed;
+       unsigned int updates_maxfail;
+       int peer_fd;
+
+       unsigned int leaky_bucket_ttl;
+       unsigned int leaky_bucket_mask;
+       unsigned int max_buckets;
+       gboolean ratelimit_log_only;
+       double leaky_bucket_burst;
+       double leaky_bucket_rate;
+
+       struct rspamd_worker *worker;
+       const ucl_object_t *skip_map;
+       struct rspamd_hash_map_helper *skip_hashes;
+       struct rspamd_lua_fuzzy_script *lua_pre_handlers;
+       struct rspamd_lua_fuzzy_script *lua_post_handlers;
+       struct rspamd_lua_fuzzy_script *lua_blacklist_handlers;
+       khash_t(fuzzy_key_ids_set) * default_forbidden_ids;
+       khash_t(fuzzy_key_ids_set) * weak_ids;
+};
+
+enum fuzzy_cmd_type {
+       CMD_NORMAL,
+       CMD_SHINGLE,
+       CMD_ENCRYPTED_NORMAL,
+       CMD_ENCRYPTED_SHINGLE
+};
+
+/* Legacy structure name for compatibility during refactoring */
+struct fuzzy_session {
+       struct rspamd_worker *worker;
+       rspamd_inet_addr_t *addr;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+
+       struct rspamd_fuzzy_shingle_cmd cmd;       /* Can handle both shingles and non-shingles */
+       struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
+       struct fuzzy_key_stat *ip_stat;
+
+       enum rspamd_fuzzy_epoch epoch;
+       enum fuzzy_cmd_type cmd_type;
+       int fd;
+       ev_tstamp timestamp;
+       struct ev_io io;
+       ref_entry_t ref;
+       struct fuzzy_key *key;
+       struct rspamd_fuzzy_cmd_extension *extensions;
+       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+       /* If this is a TCP session, this pointer will be set */
+       struct fuzzy_tcp_session *tcp_session;
+};
+
+enum rspamd_ratelimit_event_type {
+       RATELIMIT_EVENT_NEW,
+       RATELIMIT_EVENT_EXISTING,
+       RATELIMIT_EVENT_BLACKLIST,
+};
+
+struct rspamd_ratelimit_callback_ctx {
+       rspamd_inet_addr_t *addr;
+       const char *reason;
+       enum rspamd_ratelimit_event_type type;
+
+       struct rspamd_leaky_bucket_elt *bucket;
+       double max_burst;
+       double max_rate;
+
+       struct fuzzy_session *session;
+};
+
+struct fuzzy_keymap_ucl_buf {
+       rspamd_fstring_t *buf;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+};
+
+enum rspamd_ratelimit_check_result {
+       ratelimit_pass,
+       ratelimit_new,
+       ratelimit_existing,
+};
+
+enum rspamd_ratelimit_check_policy {
+       ratelimit_policy_permanent,
+       ratelimit_policy_normal,
+};
+
+/* Keys/dynamic map */
+char *ucl_keymap_read_cb(char *chunk, int len, struct map_cb_data *data, gboolean final);
+void ucl_keymap_fin_cb(struct map_cb_data *data, void **target);
+void ucl_keymap_dtor_cb(struct map_cb_data *data);
+
+void fuzzy_key_stat_dtor(gpointer p);
+void fuzzy_key_stat_unref(gpointer p);
+void fuzzy_key_dtor(gpointer p);
+void fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash);
+
+gboolean fuzzy_parse_ids(rspamd_mempool_t *pool, const ucl_object_t *obj,
+                                                gpointer ud, struct rspamd_rcl_section *section, GError **err);
+struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj,
+                                                                                        khash_t(rspamd_fuzzy_keys_hash) * target);
+gboolean fuzzy_parse_keypair(rspamd_mempool_t *pool, const ucl_object_t *obj,
+                                                        gpointer ud, struct rspamd_rcl_section *section, GError **err);
+
+/* Ratelimit */
+void fuzzy_rl_bucket_free(gpointer p);
+
+enum rspamd_ratelimit_check_result rspamd_fuzzy_check_ratelimit_bucket(
+       struct rspamd_fuzzy_storage_ctx *ctx,
+       rspamd_inet_addr_t *addr,
+       ev_tstamp timestamp,
+       struct rspamd_leaky_bucket_elt *elt,
+       enum rspamd_ratelimit_check_policy policy,
+       double max_burst, double max_rate);
+
+gboolean rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                         rspamd_inet_addr_t *addr,
+                                                                         struct rspamd_worker *worker,
+                                                                         ev_tstamp timestamp);
+
+void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                                 const struct rspamd_ratelimit_callback_ctx *cb_ctx);
+void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                                rspamd_inet_addr_t *addr,
+                                                                                const char *reason);
+
+gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                  rspamd_inet_addr_t *addr);
+
+ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt);
+void rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx);
+void rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx);
+
+/* Stats / controller */
+ucl_object_t *rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat);
+void rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter,
+                                                               struct fuzzy_key *fuzzy_key,
+                                                               ucl_object_t *keys_obj,
+                                                               gboolean ip_stat);
+ucl_object_t *rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat);
+
+gboolean rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main,
+                                                                  struct rspamd_worker *worker, int fd,
+                                                                  int attached_fd,
+                                                                  struct rspamd_control_command *cmd,
+                                                                  gpointer ud);
+
+#endif
diff --git a/src/libserver/fuzzy_storage_keys.c b/src/libserver/fuzzy_storage_keys.c
new file mode 100644 (file)
index 0000000..8c12de2
--- /dev/null
@@ -0,0 +1,513 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Rspamd fuzzy storage server: keys and dynamic keymaps
+ */
+
+#include "config.h"
+
+#include "fuzzy_storage_internal.h"
+
+#include "cfg_rcl.h"
+#include "libcryptobox/cryptobox.h"
+#include "libcryptobox/keypair.h"
+#include "libserver/maps/map.h"
+#include "lua/lua_common.h"
+#include "unix-std.h"
+
+#include <math.h>
+#include <string.h>
+#include <time.h>
+
+char *
+ucl_keymap_read_cb(char *chunk, int len,
+                                  struct map_cb_data *data, gboolean final)
+{
+       struct fuzzy_keymap_ucl_buf *jb, *pd;
+
+       pd = data->prev_data;
+
+       g_assert(pd != NULL);
+
+       if (data->cur_data == NULL) {
+               jb = g_malloc0(sizeof(*jb));
+               jb->ctx = pd->ctx;
+               data->cur_data = jb;
+       }
+       else {
+               jb = data->cur_data;
+       }
+
+       if (jb->buf == NULL) {
+               /* Allocate memory for buffer */
+               jb->buf = rspamd_fstring_sized_new(MAX(len, 4096));
+       }
+
+       jb->buf = rspamd_fstring_append(jb->buf, chunk, len);
+
+       return NULL;
+}
+
+void ucl_keymap_fin_cb(struct map_cb_data *data, void **target)
+{
+       struct fuzzy_keymap_ucl_buf *jb;
+       ucl_object_t *top;
+       struct ucl_parser *parser;
+       struct rspamd_config *cfg;
+
+       /* Now parse ucl */
+       if (data->cur_data) {
+               jb = data->cur_data;
+               cfg = jb->ctx->cfg;
+       }
+       else {
+               msg_err("no cur data in the map! might be a bug");
+               return;
+       }
+
+       if (jb->buf->len == 0) {
+               msg_err_config("no data read");
+
+               return;
+       }
+
+       parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
+
+       if (!ucl_parser_add_chunk(parser, jb->buf->str, jb->buf->len)) {
+               msg_err_config("cannot load ucl data: parse error %s",
+                                          ucl_parser_get_error(parser));
+               ucl_parser_free(parser);
+               return;
+       }
+
+       top = ucl_parser_get_object(parser);
+       ucl_parser_free(parser);
+
+       if (ucl_object_type(top) != UCL_ARRAY) {
+               ucl_object_unref(top);
+               msg_err_config("loaded ucl is not an array");
+               return;
+       }
+
+       if (target) {
+               *target = data->cur_data;
+       }
+
+       if (data->prev_data) {
+               jb = data->prev_data;
+               /* Clean prev data */
+               if (jb->buf) {
+                       rspamd_fstring_free(jb->buf);
+               }
+
+               /* Clean the existing keys */
+               struct fuzzy_key *key;
+               kh_foreach_value(jb->ctx->dynamic_keys, key, {
+                       REF_RELEASE(key);
+               });
+               kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
+
+               /* Insert new keys */
+               const ucl_object_t *cur;
+               ucl_object_iter_t it = NULL;
+               int success = 0;
+
+               while ((cur = ucl_object_iterate(top, &it, true)) != NULL) {
+                       struct fuzzy_key *nk;
+
+                       nk = fuzzy_add_keypair_from_ucl(cfg, cur, jb->ctx->dynamic_keys);
+
+                       if (nk == NULL) {
+                               msg_warn_config("cannot add dynamic keypair");
+                       }
+                       success++;
+               }
+
+               msg_info_config("loaded %d dynamic keypairs", success);
+
+               g_free(jb);
+       }
+
+       ucl_object_unref(top);
+}
+
+void ucl_keymap_dtor_cb(struct map_cb_data *data)
+{
+       struct fuzzy_keymap_ucl_buf *jb;
+
+       if (data->cur_data) {
+               jb = data->cur_data;
+               /* Clean prev data */
+               if (jb->buf) {
+                       rspamd_fstring_free(jb->buf);
+               }
+
+               struct fuzzy_key *key;
+               kh_foreach_value(jb->ctx->dynamic_keys, key, {
+                       REF_RELEASE(key);
+               });
+               /* Clear hash content but don't destroy - mempool destructor will handle it */
+               kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
+
+               g_free(jb);
+       }
+}
+
+void fuzzy_key_stat_dtor(gpointer p)
+{
+       struct fuzzy_key_stat *st = p;
+
+       if (st->last_ips) {
+               rspamd_lru_hash_destroy(st->last_ips);
+       }
+
+       if (st->keypair) {
+               rspamd_keypair_unref(st->keypair);
+       }
+
+       g_free(st);
+}
+
+void fuzzy_key_stat_unref(gpointer p)
+{
+       struct fuzzy_key_stat *st = p;
+
+       REF_RELEASE(st);
+}
+
+void fuzzy_key_dtor(gpointer p)
+{
+       struct fuzzy_key *key = p;
+
+       if (key) {
+               if (key->key) {
+                       rspamd_keypair_unref(key->key);
+               }
+
+               if (key->stat) {
+                       REF_RELEASE(key->stat);
+               }
+
+               if (key->flags_stat) {
+                       kh_destroy(fuzzy_key_flag_stat, key->flags_stat);
+               }
+
+               if (key->forbidden_ids) {
+                       kh_destroy(fuzzy_key_ids_set, key->forbidden_ids);
+               }
+
+               if (key->rl_bucket) {
+                       /* TODO: save bucket stats */
+                       g_free(key->rl_bucket);
+               }
+
+               if (key->name) {
+                       g_free(key->name);
+               }
+
+               if (key->extensions) {
+                       ucl_object_unref(key->extensions);
+               }
+
+               g_free(key);
+       }
+}
+
+void fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash)
+{
+       struct fuzzy_key *key;
+       kh_foreach_value(hash, key, {
+               REF_RELEASE(key);
+       });
+       kh_destroy(rspamd_fuzzy_keys_hash, hash);
+}
+
+gboolean
+fuzzy_parse_ids(rspamd_mempool_t *pool,
+                               const ucl_object_t *obj,
+                               gpointer ud,
+                               struct rspamd_rcl_section *section,
+                               GError **err)
+{
+       struct rspamd_rcl_struct_parser *pd = (struct rspamd_rcl_struct_parser *) ud;
+       khash_t(fuzzy_key_ids_set) * target;
+
+       target = *(khash_t(fuzzy_key_ids_set) **) ((char *) pd->user_struct + pd->offset);
+
+       if (ucl_object_type(obj) == UCL_ARRAY) {
+               const ucl_object_t *cur;
+               ucl_object_iter_t it = NULL;
+               uint64_t id;
+
+               while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+                       if (ucl_object_toint_safe(cur, &id)) {
+                               int r;
+
+                               kh_put(fuzzy_key_ids_set, target, id, &r);
+                       }
+                       else {
+                               return FALSE;
+                       }
+               }
+
+               return TRUE;
+       }
+       else if (ucl_object_type(obj) == UCL_INT) {
+               int r;
+               kh_put(fuzzy_key_ids_set, target, ucl_object_toint(obj), &r);
+
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
+struct fuzzy_key *
+fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg,
+                                                  const ucl_object_t *obj,
+                                                  khash_t(rspamd_fuzzy_keys_hash) * target)
+{
+       struct rspamd_cryptobox_keypair *kp = rspamd_keypair_from_ucl(obj);
+
+       if (kp == NULL) {
+               return NULL;
+       }
+
+       if (rspamd_keypair_type(kp) != RSPAMD_KEYPAIR_KEX) {
+               rspamd_keypair_unref(kp);
+               return NULL;
+       }
+
+       struct fuzzy_key *key = g_malloc0(sizeof(*key));
+       REF_INIT_RETAIN(key, fuzzy_key_dtor);
+       key->key = kp;
+       struct fuzzy_key_stat *keystat = g_malloc0(sizeof(*keystat));
+       REF_INIT_RETAIN(keystat, fuzzy_key_stat_dtor);
+       /* Hash of ip -> fuzzy_key_stat */
+       keystat->last_ips = rspamd_lru_hash_new_full(1024,
+                                                                                                (GDestroyNotify) rspamd_inet_address_free,
+                                                                                                fuzzy_key_stat_unref,
+                                                                                                rspamd_inet_address_hash, rspamd_inet_address_equal);
+       key->stat = keystat;
+       key->flags_stat = kh_init(fuzzy_key_flag_stat);
+       key->burst = NAN;
+       key->rate = NAN;
+       key->expire = NAN;
+       key->rl_bucket = NULL;
+       /* Allow read by default */
+       key->flags = FUZZY_KEY_READ;
+       /* Preallocate some space for flags */
+       kh_resize(fuzzy_key_flag_stat, key->flags_stat, 8);
+       const unsigned char *pk = rspamd_keypair_component(kp, RSPAMD_KEYPAIR_COMPONENT_PK,
+                                                                                                          NULL);
+       keystat->keypair = rspamd_keypair_ref(kp);
+       /* We map entries by pubkey in binary form for faster lookup */
+       khiter_t k;
+       int r;
+
+       k = kh_put(rspamd_fuzzy_keys_hash, target, pk, &r);
+
+       if (r == 0) {
+               msg_err("duplicate keypair found: pk=%*bs",
+                               32, pk);
+               REF_RELEASE(key);
+
+               return NULL;
+       }
+       else if (r == -1) {
+               msg_err("hash insertion error: pk=%*bs",
+                               32, pk);
+               REF_RELEASE(key);
+
+               return NULL;
+       }
+
+       kh_val(target, k) = key;
+
+       const ucl_object_t *extensions = rspamd_keypair_get_extensions(kp);
+
+       if (extensions) {
+               key->extensions = ucl_object_ref(extensions);
+               lua_State *L = RSPAMD_LUA_CFG_STATE(cfg);
+               const ucl_object_t *forbidden_ids = ucl_object_lookup(extensions, "forbidden_ids");
+
+               if (forbidden_ids && ucl_object_type(forbidden_ids) == UCL_ARRAY) {
+                       key->forbidden_ids = kh_init(fuzzy_key_ids_set);
+                       const ucl_object_t *cur;
+                       ucl_object_iter_t it = NULL;
+
+                       while ((cur = ucl_object_iterate(forbidden_ids, &it, true)) != NULL) {
+                               if (ucl_object_type(cur) == UCL_INT || ucl_object_type(cur) == UCL_FLOAT) {
+                                       int id = ucl_object_toint(cur);
+                                       int ids_r;
+
+                                       kh_put(fuzzy_key_ids_set, key->forbidden_ids, id, &ids_r);
+                               }
+                       }
+               }
+
+               const ucl_object_t *ratelimit = ucl_object_lookup(extensions, "ratelimit");
+
+               static int ratelimit_lua_id = -1;
+
+               if (ratelimit_lua_id == -1) {
+                       /* Load ratelimit parsing function */
+                       if (!rspamd_lua_require_function(L, "plugins/ratelimit", "parse_limit")) {
+                               msg_err_config("cannot load ratelimit parser from ratelimit plugin");
+                       }
+                       else {
+                               ratelimit_lua_id = luaL_ref(L, LUA_REGISTRYINDEX);
+                       }
+               }
+
+               if (ratelimit && ratelimit_lua_id != -1) {
+                       lua_rawgeti(L, LUA_REGISTRYINDEX, ratelimit_lua_id);
+                       lua_pushstring(L, "fuzzy_key_ratelimit");
+                       ucl_object_push_lua(L, ratelimit, false);
+
+                       if (lua_pcall(L, 2, 1, 0) != 0) {
+                               msg_err_config("cannot call ratelimit parser from ratelimit plugin");
+                       }
+                       else {
+                               if (lua_type(L, -1) == LUA_TTABLE) {
+                                       /* The returned table is in form { rate = xx, burst = yy } */
+                                       lua_getfield(L, -1, "rate");
+                                       key->rate = lua_tonumber(L, -1);
+                                       lua_pop(L, 1);
+
+                                       lua_getfield(L, -1, "burst");
+                                       key->burst = lua_tonumber(L, -1);
+                                       lua_pop(L, 1);
+
+                                       key->rl_bucket = g_malloc0(sizeof(*key->rl_bucket));
+                               }
+                       }
+
+                       lua_settop(L, 0);
+               }
+
+               const ucl_object_t *expire = ucl_object_lookup(extensions, "expire");
+               if (expire && ucl_object_type(expire) == UCL_STRING) {
+                       struct tm tm;
+
+                       /* DD-MM-YYYY */
+                       char *end = strptime(ucl_object_tostring(expire), "%d-%m-%Y", &tm);
+
+                       if (end != NULL && *end != '\0') {
+                               msg_err_config("cannot parse expire date: %s", ucl_object_tostring(expire));
+                       }
+                       else {
+                               key->expire = mktime(&tm);
+                       }
+               }
+
+               const ucl_object_t *name = ucl_object_lookup(extensions, "name");
+               if (name && ucl_object_type(name) == UCL_STRING) {
+                       key->name = g_strdup(ucl_object_tostring(name));
+               }
+
+               /* Check permissions */
+               const ucl_object_t *read_only = ucl_object_lookup(extensions, "read_only");
+               if (read_only && ucl_object_type(read_only) == UCL_BOOLEAN) {
+                       if (ucl_object_toboolean(read_only)) {
+                               key->flags &= ~(FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
+                       }
+                       else {
+                               key->flags |= (FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
+                       }
+               }
+
+               const ucl_object_t *allowed_ops = ucl_object_lookup(extensions, "allowed_ops");
+               if (allowed_ops && ucl_object_type(allowed_ops) == UCL_ARRAY) {
+                       const ucl_object_t *cur;
+                       ucl_object_iter_t it = NULL;
+                       /* Reset to only allowed */
+                       key->flags = 0;
+
+                       while ((cur = ucl_object_iterate(allowed_ops, &it, true)) != NULL) {
+                               if (ucl_object_type(cur) == UCL_STRING) {
+                                       const char *op = ucl_object_tostring(cur);
+
+                                       if (g_ascii_strcasecmp(op, "read") == 0) {
+                                               key->flags |= FUZZY_KEY_READ;
+                                       }
+                                       else if (g_ascii_strcasecmp(op, "write") == 0) {
+                                               key->flags |= FUZZY_KEY_WRITE;
+                                       }
+                                       else if (g_ascii_strcasecmp(op, "delete") == 0) {
+                                               key->flags |= FUZZY_KEY_DELETE;
+                                       }
+                                       else {
+                                               msg_warn_config("invalid operation: %s", op);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       msg_debug("loaded keypair %*bs; expire=%f; rate=%f; burst=%f; name=%s",
+                         (int) crypto_box_publickeybytes(), pk,
+                         key->expire, key->rate, key->burst, key->name);
+
+       return key;
+}
+
+gboolean
+fuzzy_parse_keypair(rspamd_mempool_t *pool,
+                                       const ucl_object_t *obj,
+                                       gpointer ud,
+                                       struct rspamd_rcl_section *section,
+                                       GError **err)
+{
+       struct rspamd_rcl_struct_parser *pd = ud;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct fuzzy_key *key;
+       const ucl_object_t *cur;
+       ucl_object_iter_t it = NULL;
+       gboolean ret;
+
+       ctx = pd->user_struct;
+       pd->offset = G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, default_keypair);
+
+       /*
+        * Single key
+        */
+       if (ucl_object_type(obj) == UCL_STRING || ucl_object_type(obj) == UCL_OBJECT) {
+               ret = rspamd_rcl_parse_struct_keypair(pool, obj, pd, section, err);
+
+               if (!ret) {
+                       return ret;
+               }
+
+               key = fuzzy_add_keypair_from_ucl(ctx->cfg, obj, ctx->keys);
+
+               if (key == NULL) {
+                       return FALSE;
+               }
+
+               /* Use the last one ? */
+               ctx->default_key = key;
+       }
+       else if (ucl_object_type(obj) == UCL_ARRAY) {
+               while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+                       if (!fuzzy_parse_keypair(pool, cur, pd, section, err)) {
+                               msg_err_pool("cannot parse keypair");
+                       }
+               }
+       }
+
+       return TRUE;
+}
diff --git a/src/libserver/fuzzy_storage_ratelimit.c b/src/libserver/fuzzy_storage_ratelimit.c
new file mode 100644 (file)
index 0000000..1f7c1e8
--- /dev/null
@@ -0,0 +1,549 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Rspamd fuzzy storage server: ratelimits
+ */
+
+#include "config.h"
+
+#include "fuzzy_storage_internal.h"
+
+#include "maps/map_helpers.h"
+#include "rspamd_control.h"
+#include "lua/lua_common.h"
+#include "contrib/uthash/utlist.h"
+
+#include <errno.h>
+#include <math.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/socket.h>
+
+enum rspamd_ratelimit_check_result
+rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                       rspamd_inet_addr_t *addr,
+                                                                       ev_tstamp timestamp,
+                                                                       struct rspamd_leaky_bucket_elt *elt,
+                                                                       enum rspamd_ratelimit_check_policy policy,
+                                                                       double max_burst, double max_rate)
+{
+       gboolean ratelimited = FALSE, new_ratelimit = FALSE;
+
+       /* Nothing to check */
+       if (isnan(max_burst) || isnan(max_rate)) {
+               return ratelimit_pass;
+       }
+
+       if (isnan(elt->cur)) {
+               /* There is an issue with the previous logic: the TTL is updated each time
+                * we see that new bucket. Hence, we need to check the `last` and act accordingly
+                */
+               if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
+                       /*
+                        * We reset bucket to it's 90% capacity to allow some requests
+                        * This should cope with the issue when we block an IP network for some burst and never unblock it
+                        */
+                       elt->cur = max_burst * 0.9;
+                       elt->last = timestamp;
+               }
+               else {
+                       ratelimited = TRUE;
+               }
+       }
+       else {
+               /* Update bucket: leak some elements */
+               if (elt->last < timestamp) {
+                       elt->cur -= max_rate * (timestamp - elt->last);
+                       elt->last = timestamp;
+
+                       if (elt->cur < 0) {
+                               elt->cur = 0;
+                       }
+               }
+               else {
+                       elt->last = timestamp;
+               }
+
+               /* Check the bucket */
+               if (elt->cur >= max_burst) {
+
+                       if (policy == ratelimit_policy_permanent) {
+                               elt->cur = NAN;
+                       }
+                       new_ratelimit = TRUE;
+                       ratelimited = TRUE;
+               }
+               else {
+                       elt->cur++; /* Allow one more request */
+               }
+       }
+
+       /* Note: Caller is responsible for calling the ratelimit handlers with
+        * proper context (new vs existing, bucket info, session info, etc.)
+        */
+
+       if (new_ratelimit) {
+               return ratelimit_new;
+       }
+
+       return ratelimited ? ratelimit_existing : ratelimit_pass;
+}
+
+gboolean
+rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                        rspamd_inet_addr_t *addr,
+                                                        struct rspamd_worker *worker,
+                                                        ev_tstamp timestamp)
+{
+       rspamd_inet_addr_t *masked;
+       struct rspamd_leaky_bucket_elt *elt;
+
+       if (!addr) {
+               return TRUE;
+       }
+
+       if (ctx->ratelimit_whitelist != NULL) {
+               if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
+                                                                               addr) != NULL) {
+                       return TRUE;
+               }
+       }
+
+       /* Skip ratelimit for local addresses */
+       if (rspamd_inet_address_is_local(addr)) {
+               return TRUE;
+       }
+
+       masked = rspamd_inet_address_copy(addr, NULL);
+
+       if (rspamd_inet_address_get_af(masked) == AF_INET) {
+               rspamd_inet_address_apply_mask(masked,
+                                                                          MIN(ctx->leaky_bucket_mask, 32));
+       }
+       else {
+               /* Must be at least /64 */
+               rspamd_inet_address_apply_mask(masked,
+                                                                          MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
+       }
+
+       elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
+                                                                (time_t) timestamp);
+
+       if (elt) {
+               enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr,
+                                                                                                                                                                        timestamp, elt,
+                                                                                                                                                                        ratelimit_policy_permanent,
+                                                                                                                                                                        ctx->leaky_bucket_burst,
+                                                                                                                                                                        ctx->leaky_bucket_rate);
+
+               if (res == ratelimit_new) {
+                       msg_info("ratelimiting %s (%s), %.1f max elts",
+                                        rspamd_inet_address_to_string(addr),
+                                        rspamd_inet_address_to_string(masked),
+                                        ctx->leaky_bucket_burst);
+
+                       struct rspamd_srv_command srv_cmd;
+
+                       srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
+                       srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
+
+                       if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
+                               socklen_t slen;
+                               struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
+
+                               if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
+                                       memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
+                                       msg_debug("propagating blocked address to other workers");
+                                       rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+                               }
+                               else {
+                                       msg_err("bad address length: %d, expected to be %d",
+                                                       (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+                               }
+                       }
+
+                       if (ctx->lua_blacklist_handlers) {
+                               struct rspamd_ratelimit_callback_ctx cb_ctx = {
+                                       .addr = addr,
+                                       .reason = "ratelimit",
+                                       .type = RATELIMIT_EVENT_NEW,
+                                       .bucket = elt,
+                                       .max_burst = ctx->leaky_bucket_burst,
+                                       .max_rate = ctx->leaky_bucket_rate,
+                                       .session = NULL,
+                               };
+                               rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
+                       }
+               }
+               else if (res == ratelimit_existing) {
+                       if (ctx->lua_blacklist_handlers) {
+                               struct rspamd_ratelimit_callback_ctx cb_ctx = {
+                                       .addr = addr,
+                                       .reason = "ratelimit",
+                                       .type = RATELIMIT_EVENT_EXISTING,
+                                       .bucket = elt,
+                                       .max_burst = ctx->leaky_bucket_burst,
+                                       .max_rate = ctx->leaky_bucket_rate,
+                                       .session = NULL,
+                               };
+                               rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
+                       }
+               }
+
+               rspamd_inet_address_free(masked);
+
+               return res == ratelimit_pass;
+       }
+       else {
+               /* New bucket */
+               elt = g_malloc(sizeof(*elt));
+               elt->addr = masked; /* transfer ownership */
+               elt->cur = 1;
+               elt->last = timestamp;
+
+               rspamd_lru_hash_insert(ctx->ratelimit_buckets,
+                                                          masked,
+                                                          elt,
+                                                          timestamp,
+                                                          ctx->leaky_bucket_ttl);
+       }
+
+       return TRUE;
+}
+
+static void
+rspamd_fuzzy_bucket_info_tolua(lua_State *L,
+                                                          const struct rspamd_ratelimit_callback_ctx *cb_ctx)
+{
+       if (!cb_ctx->bucket) {
+               lua_pushnil(L);
+               return;
+       }
+
+       lua_createtable(L, 0, 6);
+
+       /* bucket_level - current fill level (nil if permanently blocked) */
+       if (isnan(cb_ctx->bucket->cur)) {
+               lua_pushnil(L);
+               lua_setfield(L, -2, "bucket_level");
+               lua_pushboolean(L, TRUE);
+               lua_setfield(L, -2, "is_permanent");
+       }
+       else {
+               lua_pushnumber(L, cb_ctx->bucket->cur);
+               lua_setfield(L, -2, "bucket_level");
+               lua_pushboolean(L, FALSE);
+               lua_setfield(L, -2, "is_permanent");
+       }
+
+       /* max_burst */
+       if (!isnan(cb_ctx->max_burst)) {
+               lua_pushnumber(L, cb_ctx->max_burst);
+               lua_setfield(L, -2, "max_burst");
+       }
+
+       /* max_rate */
+       if (!isnan(cb_ctx->max_rate)) {
+               lua_pushnumber(L, cb_ctx->max_rate);
+               lua_setfield(L, -2, "max_rate");
+       }
+
+       /* exceeded_by - how much over the limit */
+       if (!isnan(cb_ctx->bucket->cur) && !isnan(cb_ctx->max_burst) &&
+               cb_ctx->bucket->cur > cb_ctx->max_burst) {
+               lua_pushnumber(L, cb_ctx->bucket->cur - cb_ctx->max_burst);
+               lua_setfield(L, -2, "exceeded_by");
+       }
+
+       /* last_seen */
+       lua_pushnumber(L, cb_ctx->bucket->last);
+       lua_setfield(L, -2, "last_seen");
+}
+
+static void
+rspamd_fuzzy_ratelimit_extensions_tolua(lua_State *L,
+                                                                               const struct rspamd_ratelimit_callback_ctx *cb_ctx)
+{
+       struct rspamd_fuzzy_cmd_extension *ext;
+       rspamd_inet_addr_t *addr;
+
+       lua_createtable(L, 0, 2);
+
+       if (!cb_ctx->session || !cb_ctx->session->extensions) {
+               return;
+       }
+
+       LL_FOREACH(cb_ctx->session->extensions, ext)
+       {
+               switch (ext->ext) {
+               case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN:
+                       lua_pushlstring(L, (const char *) ext->payload, ext->length);
+                       lua_setfield(L, -2, "domain");
+                       break;
+               case RSPAMD_FUZZY_EXT_SOURCE_IP4:
+                       addr = rspamd_inet_address_new(AF_INET, ext->payload);
+                       rspamd_lua_ip_push(L, addr);
+                       rspamd_inet_address_free(addr);
+                       lua_setfield(L, -2, "source_ip");
+                       break;
+               case RSPAMD_FUZZY_EXT_SOURCE_IP6:
+                       addr = rspamd_inet_address_new(AF_INET6, ext->payload);
+                       rspamd_lua_ip_push(L, addr);
+                       rspamd_inet_address_free(addr);
+                       lua_setfield(L, -2, "source_ip");
+                       break;
+               }
+       }
+}
+
+void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                                 const struct rspamd_ratelimit_callback_ctx *cb_ctx)
+{
+       if (ctx->lua_blacklist_handlers == NULL) {
+               return;
+       }
+
+       struct rspamd_lua_fuzzy_script *cur;
+       LL_FOREACH(ctx->lua_blacklist_handlers, cur)
+       {
+               lua_State *L = ctx->cfg->lua_state;
+               int err_idx, ret;
+               const int nargs = 6;
+
+               lua_pushcfunction(L, &rspamd_lua_traceback);
+               err_idx = lua_gettop(L);
+               lua_checkstack(L, err_idx + nargs + 2);
+               lua_rawgeti(L, LUA_REGISTRYINDEX, cur->cbref);
+
+               /* Arg 1: client IP */
+               rspamd_lua_ip_push(L, cb_ctx->addr);
+
+               /* Arg 2: block reason */
+               lua_pushstring(L, cb_ctx->reason);
+
+               /* Arg 3: event type */
+               switch (cb_ctx->type) {
+               case RATELIMIT_EVENT_NEW:
+                       lua_pushliteral(L, "new");
+                       break;
+               case RATELIMIT_EVENT_EXISTING:
+                       lua_pushliteral(L, "existing");
+                       break;
+               case RATELIMIT_EVENT_BLACKLIST:
+                       lua_pushliteral(L, "blacklist");
+                       break;
+               }
+
+               /* Arg 4: ratelimit_info table (or nil) */
+               rspamd_fuzzy_bucket_info_tolua(L, cb_ctx);
+
+               /* Arg 5: digest (or nil) */
+               if (cb_ctx->session) {
+                       (void) lua_new_text(L, (const char *) cb_ctx->session->cmd.basic.digest,
+                                                               sizeof(cb_ctx->session->cmd.basic.digest), FALSE);
+               }
+               else {
+                       lua_pushnil(L);
+               }
+
+               /* Arg 6: extensions table */
+               rspamd_fuzzy_ratelimit_extensions_tolua(L, cb_ctx);
+
+               if ((ret = lua_pcall(L, nargs, 0, err_idx)) != 0) {
+                       msg_err("call to lua_blacklist_cbref "
+                                       "script failed (%d): %s",
+                                       ret, lua_tostring(L, -1));
+               }
+
+               lua_settop(L, 0);
+       }
+}
+
+void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                                rspamd_inet_addr_t *addr,
+                                                                                const char *reason)
+{
+       if (ctx->lua_blacklist_handlers == NULL) {
+               return;
+       }
+
+       struct rspamd_ratelimit_callback_ctx cb_ctx = {
+               .addr = addr,
+               .reason = reason,
+               .type = g_strcmp0(reason, "blacklisted") == 0 ? RATELIMIT_EVENT_BLACKLIST : RATELIMIT_EVENT_EXISTING,
+               .bucket = NULL,
+               .max_burst = NAN,
+               .max_rate = NAN,
+               .session = NULL,
+       };
+       rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
+}
+
+gboolean
+rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                 rspamd_inet_addr_t *addr)
+{
+       if (ctx->blocked_ips != NULL) {
+               if (rspamd_match_radix_map_addr(ctx->blocked_ips,
+                                                                               addr) != NULL) {
+
+                       rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "blacklisted");
+                       return FALSE;
+               }
+       }
+
+       return TRUE;
+}
+
+void fuzzy_rl_bucket_free(gpointer p)
+{
+       struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) p;
+
+       rspamd_inet_address_free(elt->addr);
+       g_free(elt);
+}
+
+ucl_object_t *
+rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt)
+{
+       ucl_object_t *res;
+
+       res = ucl_object_typed_new(UCL_OBJECT);
+
+       ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->cur), "cur", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->last), "last", 0, false);
+
+       return res;
+}
+
+void rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
+{
+       char path[PATH_MAX];
+
+       rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
+                                       RSPAMD_DBDIR);
+
+       if (access(path, R_OK) != -1) {
+               struct ucl_parser *parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
+               if (ucl_parser_add_file(parser, path)) {
+                       ucl_object_t *obj = ucl_parser_get_object(parser);
+                       int loaded = 0;
+
+                       if (ucl_object_type(obj) == UCL_ARRAY) {
+                               ucl_object_iter_t it = NULL;
+                               const ucl_object_t *cur;
+
+                               while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+                                       const ucl_object_t *ip, *value, *last;
+                                       const char *ip_str;
+                                       double limit_val, last_val;
+
+                                       ip = ucl_object_find_key(cur, "ip");
+                                       value = ucl_object_find_key(cur, "value");
+                                       last = ucl_object_find_key(cur, "last");
+
+                                       if (ip == NULL || value == NULL || last == NULL) {
+                                               msg_err("invalid ratelimit object");
+                                               continue;
+                                       }
+
+                                       ip_str = ucl_object_tostring(ip);
+                                       limit_val = ucl_object_todouble(value);
+                                       last_val = ucl_object_todouble(last);
+
+                                       if (ip_str == NULL || isnan(last_val)) {
+                                               msg_err("invalid ratelimit object");
+                                               continue;
+                                       }
+
+                                       rspamd_inet_addr_t *addr;
+                                       if (rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str),
+                                                                                                 RSPAMD_INET_ADDRESS_PARSE_NO_UNIX | RSPAMD_INET_ADDRESS_PARSE_NO_PORT)) {
+                                               struct rspamd_leaky_bucket_elt *elt = g_malloc(sizeof(*elt));
+
+                                               elt->cur = limit_val;
+                                               elt->last = last_val;
+                                               elt->addr = addr;
+                                               rspamd_lru_hash_insert(ctx->ratelimit_buckets, addr, elt, elt->last, ctx->leaky_bucket_ttl);
+                                               loaded++;
+                                       }
+                                       else {
+                                               msg_err("invalid ratelimit ip: %s", ip_str);
+                                               continue;
+                                       }
+                               }
+
+                               msg_info("loaded %d ratelimit objects", loaded);
+                       }
+
+                       ucl_object_unref(obj);
+               }
+
+               ucl_parser_free(parser);
+       }
+}
+
+void rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
+{
+       char path[PATH_MAX];
+
+       rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl.new",
+                                       RSPAMD_DBDIR);
+       FILE *f = fopen(path, "w");
+
+       if (f != NULL) {
+               ucl_object_t *top = ucl_object_typed_new(UCL_ARRAY);
+               int it = 0;
+               gpointer k, v;
+
+               ucl_object_reserve(top, rspamd_lru_hash_size(ctx->ratelimit_buckets));
+
+               while ((it = rspamd_lru_hash_foreach(ctx->ratelimit_buckets, it, &k, &v)) != -1) {
+                       ucl_object_t *cur = ucl_object_typed_new(UCL_OBJECT);
+                       struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) v;
+
+                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->cur), "value", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromdouble(elt->last), "last", 0, false);
+                       ucl_object_insert_key(cur, ucl_object_fromstring(rspamd_inet_address_to_string(elt->addr)), "ip", 0, false);
+                       ucl_array_append(top, cur);
+               }
+
+               if (ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, ucl_object_emit_file_funcs(f), NULL)) {
+                       char npath[PATH_MAX];
+
+                       fflush(f);
+                       rspamd_snprintf(npath, sizeof(npath), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
+                                                       RSPAMD_DBDIR);
+
+                       if (rename(path, npath) == -1) {
+                               msg_warn("cannot rename %s to %s: %s", path, npath, strerror(errno));
+                       }
+                       else {
+                               msg_info("saved %d ratelimits in %s", rspamd_lru_hash_size(ctx->ratelimit_buckets), npath);
+                       }
+               }
+               else {
+                       msg_warn("cannot serialize ratelimit buckets to %s: %s", path, strerror(errno));
+               }
+
+               fclose(f);
+               ucl_object_unref(top);
+       }
+       else {
+               msg_warn("cannot save ratelimit buckets to %s: %s", path, strerror(errno));
+       }
+}
diff --git a/src/libserver/fuzzy_storage_stat.c b/src/libserver/fuzzy_storage_stat.c
new file mode 100644 (file)
index 0000000..ef61001
--- /dev/null
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "rspamd.h"
+#include "util.h"
+#include "rspamd_control.h"
+#include "libserver/worker_util.h"
+#include "fuzzy_backend/fuzzy_backend.h"
+#include "fuzzy_storage_internal.h"
+#include "fuzzy_wire.h"
+#include "libcryptobox/keypair.h"
+#include "unix-std.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+
+ucl_object_t *
+rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat)
+{
+       ucl_object_t *res;
+
+       res = ucl_object_typed_new(UCL_OBJECT);
+
+       ucl_object_insert_key(res, ucl_object_fromint(key_stat->checked),
+                                                 "checked", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->checked_ctr.mean),
+                                                 "checked_per_hour", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromint(key_stat->matched),
+                                                 "matched", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->matched_ctr.mean),
+                                                 "matched_per_hour", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromint(key_stat->added),
+                                                 "added", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromint(key_stat->deleted),
+                                                 "deleted", 0, false);
+       ucl_object_insert_key(res, ucl_object_fromint(key_stat->errors),
+                                                 "errors", 0, false);
+
+       return res;
+}
+
+void rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter,
+                                                               struct fuzzy_key *fuzzy_key,
+                                                               ucl_object_t *keys_obj,
+                                                               gboolean ip_stat)
+{
+       struct fuzzy_key_stat *key_stat = fuzzy_key->stat;
+       char keyname[17];
+
+       if (key_stat) {
+               rspamd_snprintf(keyname, sizeof(keyname), "%8bs", pk_iter);
+
+               ucl_object_t *elt = rspamd_fuzzy_storage_stat_key(key_stat);
+
+               if (key_stat->last_ips && ip_stat) {
+                       int i = 0;
+                       ucl_object_t *ip_elt = ucl_object_typed_new(UCL_OBJECT);
+                       gpointer k, v;
+
+                       while ((i = rspamd_lru_hash_foreach(key_stat->last_ips,
+                                                                                               i, &k, &v)) != -1) {
+                               ucl_object_t *ip_cur = rspamd_fuzzy_storage_stat_key(v);
+                               ucl_object_insert_key(ip_elt, ip_cur,
+                                                                         rspamd_inet_address_to_string(k), 0, true);
+                       }
+                       ucl_object_insert_key(elt, ip_elt, "ips", 0, false);
+               }
+
+               int flag;
+               struct fuzzy_key_stat *flag_stat;
+               ucl_object_t *flags_ucl = ucl_object_typed_new(UCL_OBJECT);
+
+               kh_foreach_key_value_ptr(fuzzy_key->flags_stat, flag, flag_stat, {
+                       char intbuf[16];
+                       rspamd_snprintf(intbuf, sizeof(intbuf), "%d", flag);
+                       ucl_object_insert_key(flags_ucl, rspamd_fuzzy_storage_stat_key(flag_stat),
+                                                                 intbuf, 0, true);
+               });
+
+               ucl_object_insert_key(elt, flags_ucl, "flags", 0, false);
+
+               ucl_object_insert_key(elt,
+                                                         rspamd_keypair_to_ucl(fuzzy_key->key, RSPAMD_KEYPAIR_ENCODING_DEFAULT,
+                                                                                                       RSPAMD_KEYPAIR_DUMP_NO_SECRET | RSPAMD_KEYPAIR_DUMP_FLATTENED),
+                                                         "keypair", 0, false);
+
+               if (fuzzy_key->rl_bucket) {
+                       ucl_object_insert_key(elt,
+                                                                 rspamd_leaky_bucket_to_ucl(fuzzy_key->rl_bucket),
+                                                                 "ratelimit", 0, false);
+               }
+
+               ucl_object_insert_key(keys_obj, elt, keyname, 0, true);
+       }
+}
+
+ucl_object_t *
+rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat)
+{
+       struct fuzzy_key *fuzzy_key;
+       ucl_object_t *obj, *keys_obj, *elt, *ip_elt;
+       const unsigned char *pk_iter;
+
+       obj = ucl_object_typed_new(UCL_OBJECT);
+
+       keys_obj = ucl_object_typed_new(UCL_OBJECT);
+
+       kh_foreach(ctx->keys, pk_iter, fuzzy_key, {
+               rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
+       });
+
+       if (ctx->dynamic_keys) {
+               kh_foreach(ctx->dynamic_keys, pk_iter, fuzzy_key, {
+                       rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
+               });
+       }
+
+       ucl_object_insert_key(obj, keys_obj, "keys", 0, false);
+
+       /* Now generic stats */
+       ucl_object_insert_key(obj,
+                                                 ucl_object_fromint(ctx->stat.fuzzy_hashes),
+                                                 "fuzzy_stored",
+                                                 0,
+                                                 false);
+       ucl_object_insert_key(obj,
+                                                 ucl_object_fromint(ctx->stat.fuzzy_hashes_expired),
+                                                 "fuzzy_expired",
+                                                 0,
+                                                 false);
+       ucl_object_insert_key(obj,
+                                                 ucl_object_fromint(ctx->stat.invalid_requests),
+                                                 "invalid_requests",
+                                                 0,
+                                                 false);
+       ucl_object_insert_key(obj,
+                                                 ucl_object_fromint(ctx->stat.delayed_hashes),
+                                                 "delayed_hashes",
+                                                 0,
+                                                 false);
+
+       if (ctx->errors_ips && ip_stat) {
+               gpointer k, v;
+               int i = 0;
+               ip_elt = ucl_object_typed_new(UCL_OBJECT);
+
+               while ((i = rspamd_lru_hash_foreach(ctx->errors_ips, i, &k, &v)) != -1) {
+                       ucl_object_insert_key(ip_elt,
+                                                                 ucl_object_fromint(*(uint64_t *) v),
+                                                                 rspamd_inet_address_to_string(k), 0, true);
+               }
+
+               ucl_object_insert_key(obj,
+                                                         ip_elt,
+                                                         "errors_ips",
+                                                         0,
+                                                         false);
+       }
+
+       /* Checked by epoch */
+       elt = ucl_object_typed_new(UCL_ARRAY);
+
+       for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
+               ucl_array_append(elt,
+                                                ucl_object_fromint(ctx->stat.fuzzy_hashes_checked[i]));
+       }
+
+       ucl_object_insert_key(obj, elt, "fuzzy_checked", 0, false);
+
+       /* Shingles by epoch */
+       elt = ucl_object_typed_new(UCL_ARRAY);
+
+       for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
+               ucl_array_append(elt,
+                                                ucl_object_fromint(ctx->stat.fuzzy_shingles_checked[i]));
+       }
+
+       ucl_object_insert_key(obj, elt, "fuzzy_shingles", 0, false);
+
+       /* Matched by epoch */
+       elt = ucl_object_typed_new(UCL_ARRAY);
+
+       for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
+               ucl_array_append(elt,
+                                                ucl_object_fromint(ctx->stat.fuzzy_hashes_found[i]));
+       }
+
+       ucl_object_insert_key(obj, elt, "fuzzy_found", 0, false);
+
+
+       return obj;
+}
+
+gboolean
+rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main,
+                                                 struct rspamd_worker *worker, int fd,
+                                                 int attached_fd,
+                                                 struct rspamd_control_command *cmd,
+                                                 gpointer ud)
+{
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
+       struct rspamd_control_reply rep;
+       ucl_object_t *obj;
+       struct ucl_emitter_functions *emit_subr;
+       unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+       struct iovec iov;
+       struct msghdr msg;
+       struct cmsghdr *cmsg;
+
+       int outfd = -1;
+       char tmppath[PATH_MAX];
+
+       memset(&rep, 0, sizeof(rep));
+       rep.type = RSPAMD_CONTROL_FUZZY_STAT;
+       rep.id = cmd->id;
+
+       rspamd_snprintf(tmppath, sizeof(tmppath), "%s%c%s-XXXXXXXXXX",
+                                       rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat");
+
+       if ((outfd = mkstemp(tmppath)) == -1) {
+               rep.reply.fuzzy_stat.status = errno;
+               msg_info_main("cannot make temporary stat file for fuzzy stat: %s",
+                                         strerror(errno));
+       }
+       else {
+               const char *backend_id;
+
+               rep.reply.fuzzy_stat.status = 0;
+
+               backend_id = rspamd_fuzzy_backend_id(ctx->backend);
+               if (backend_id) {
+                       memcpy(rep.reply.fuzzy_stat.storage_id,
+                                  backend_id,
+                                  sizeof(rep.reply.fuzzy_stat.storage_id));
+               }
+
+               obj = rspamd_fuzzy_stat_to_ucl(ctx, TRUE);
+               emit_subr = ucl_object_emit_fd_funcs(outfd);
+               ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL);
+               ucl_object_emit_funcs_free(emit_subr);
+               ucl_object_unref(obj);
+               /* Rewind output file */
+               close(outfd);
+               outfd = open(tmppath, O_RDONLY);
+               unlink(tmppath);
+       }
+
+       /* Now we can send outfd and status message */
+       memset(&msg, 0, sizeof(msg));
+
+       /* Attach fd to the message */
+       if (outfd != -1) {
+               memset(fdspace, 0, sizeof(fdspace));
+               msg.msg_control = fdspace;
+               msg.msg_controllen = sizeof(fdspace);
+               cmsg = CMSG_FIRSTHDR(&msg);
+
+               if (cmsg) {
+                       cmsg->cmsg_level = SOL_SOCKET;
+                       cmsg->cmsg_type = SCM_RIGHTS;
+                       cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+                       memcpy(CMSG_DATA(cmsg), &outfd, sizeof(int));
+               }
+       }
+
+       iov.iov_base = &rep;
+       iov.iov_len = sizeof(rep);
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+       if (sendmsg(fd, &msg, 0) == -1) {
+               msg_err_main("cannot send fuzzy stat: %s", strerror(errno));
+       }
+
+       if (outfd != -1) {
+               close(outfd);
+       }
+
+       return TRUE;
+}