#include "config.h"
#include "libserver/fuzzy_wire.h"
+#include "libserver/fuzzy_storage_internal.h"
#include "util.h"
#include "rspamd.h"
#include "libserver/maps/map.h"
RSPAMD_WORKER_VER /* Version info */
};
-struct fuzzy_global_stat {
- uint64_t fuzzy_hashes;
- /**< number of fuzzy hashes stored */
- uint64_t fuzzy_hashes_expired;
- /**< number of fuzzy hashes expired */
- uint64_t fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX];
- /**< amount of check requests for each epoch */
- uint64_t fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX];
- /**< amount of shingle check requests for each epoch */
- uint64_t fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX];
- /**< amount of invalid requests */
- uint64_t invalid_requests;
- /**< amount of delayed hashes found */
- uint64_t delayed_hashes;
-};
-
-struct fuzzy_key_stat {
- uint64_t checked;
- uint64_t matched;
- uint64_t added;
- uint64_t deleted;
- uint64_t errors;
- /* Store averages for checked/matched per minute */
- struct rspamd_counter_data checked_ctr;
- struct rspamd_counter_data matched_ctr;
- double last_checked_time;
- uint64_t last_checked_count;
- uint64_t last_matched_count;
- struct rspamd_cryptobox_keypair *keypair;
- rspamd_lru_hash_t *last_ips;
-
- ref_entry_t ref;
-};
-
-struct rspamd_leaky_bucket_elt {
- rspamd_inet_addr_t *addr;
- double last;
- double cur;
-};
-
static const uint64_t rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
-static int64_t
-fuzzy_kp_hash(const unsigned char *p)
-{
- int64_t res;
-
- memcpy(&res, p, sizeof(res));
- return res;
-}
-static bool
-fuzzy_kp_equal(gconstpointer a, gconstpointer b)
-{
- const unsigned char *pa = a, *pb = b;
-
- return (memcmp(pa, pb, RSPAMD_FUZZY_KEYLEN) == 0);
-}
-
-enum fuzzy_key_op {
- FUZZY_KEY_READ = 0x1u << 0,
- FUZZY_KEY_WRITE = 0x1u << 1,
- FUZZY_KEY_DELETE = 0x1u << 2,
-};
-KHASH_SET_INIT_INT(fuzzy_key_ids_set);
-KHASH_INIT(fuzzy_key_flag_stat, int, struct fuzzy_key_stat, 1, kh_int_hash_func,
- kh_int_hash_equal);
-struct fuzzy_key {
- char *name;
- struct rspamd_cryptobox_keypair *key;
- struct rspamd_cryptobox_pubkey *pk;
- struct fuzzy_key_stat *stat;
- khash_t(fuzzy_key_flag_stat) * flags_stat;
- khash_t(fuzzy_key_ids_set) * forbidden_ids;
- struct rspamd_leaky_bucket_elt *rl_bucket;
- ucl_object_t *extensions;
- double burst;
- double rate;
- ev_tstamp expire;
- bool expired;
- int flags; /* enum fuzzy_key_op */
- ref_entry_t ref;
-};
-
-KHASH_INIT(rspamd_fuzzy_keys_hash,
- const unsigned char *, struct fuzzy_key *, 1,
- fuzzy_kp_hash, fuzzy_kp_equal);
-
-struct rspamd_lua_fuzzy_script {
- int cbref;
- struct rspamd_lua_fuzzy_script *next;
-};
-
-struct rspamd_fuzzy_storage_ctx {
- uint64_t magic;
- /* Events base */
- struct ev_loop *event_loop;
- /* DNS resolver */
- struct rspamd_dns_resolver *resolver;
- /* Config */
- struct rspamd_config *cfg;
- /* END OF COMMON PART */
- struct fuzzy_global_stat stat;
- double expire;
- double sync_timeout;
- double delay;
- double tcp_timeout;
- struct rspamd_radix_map_helper *update_ips;
- struct rspamd_hash_map_helper *update_keys;
- struct rspamd_radix_map_helper *blocked_ips;
- struct rspamd_radix_map_helper *ratelimit_whitelist;
- struct rspamd_radix_map_helper *delay_whitelist;
-
- const ucl_object_t *update_map;
- const ucl_object_t *update_keys_map;
- const ucl_object_t *delay_whitelist_map;
- const ucl_object_t *blocked_map;
- const ucl_object_t *ratelimit_whitelist_map;
- const ucl_object_t *dynamic_keys_map;
-
- unsigned int keypair_cache_size;
- ev_timer stat_ev;
- ev_io peer_ev;
-
- /* Local keypair */
- struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
- struct fuzzy_key *default_key;
- khash_t(rspamd_fuzzy_keys_hash) * keys;
- /* Those are loaded via map */
- khash_t(rspamd_fuzzy_keys_hash) * dynamic_keys;
-
- gboolean encrypted_only;
- gboolean read_only;
- gboolean dedicated_update_worker;
- struct rspamd_keypair_cache *keypair_cache;
- struct rspamd_http_context *http_ctx;
- rspamd_lru_hash_t *errors_ips;
- rspamd_lru_hash_t *ratelimit_buckets;
- struct rspamd_fuzzy_backend *backend;
- GArray *updates_pending;
- unsigned int updates_failed;
- unsigned int updates_maxfail;
- /* Used to send data between workers */
- int peer_fd;
-
- /* Ratelimits */
- unsigned int leaky_bucket_ttl;
- unsigned int leaky_bucket_mask;
- unsigned int max_buckets;
- gboolean ratelimit_log_only;
- double leaky_bucket_burst;
- double leaky_bucket_rate;
-
- struct rspamd_worker *worker;
- const ucl_object_t *skip_map;
- struct rspamd_hash_map_helper *skip_hashes;
- struct rspamd_lua_fuzzy_script *lua_pre_handlers;
- struct rspamd_lua_fuzzy_script *lua_post_handlers;
- struct rspamd_lua_fuzzy_script *lua_blacklist_handlers;
- khash_t(fuzzy_key_ids_set) * default_forbidden_ids;
- /* Ids that should not override other ids */
- khash_t(fuzzy_key_ids_set) * weak_ids;
-};
-
-enum fuzzy_cmd_type {
- CMD_NORMAL,
- CMD_SHINGLE,
- CMD_ENCRYPTED_NORMAL,
- CMD_ENCRYPTED_SHINGLE
-};
-
struct rspamd_fuzzy_tcp_frame {
uint16_t size_hdr; /* We have to write this as well */
struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
ref_entry_t ref;
};
-/* Legacy structure name for compatibility during refactoring */
-struct fuzzy_session {
- struct rspamd_worker *worker;
- rspamd_inet_addr_t *addr;
- struct rspamd_fuzzy_storage_ctx *ctx;
-
- struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */
- struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
- struct fuzzy_key_stat *ip_stat;
-
- enum rspamd_fuzzy_epoch epoch;
- enum fuzzy_cmd_type cmd_type;
- int fd;
- ev_tstamp timestamp;
- struct ev_io io;
- ref_entry_t ref;
- struct fuzzy_key *key;
- struct rspamd_fuzzy_cmd_extension *extensions;
- unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
-
- /* If this is a TCP session, this pointer will be set */
- struct fuzzy_tcp_session *tcp_session;
-};
-
struct fuzzy_peer_request {
ev_io io_ev;
struct fuzzy_peer_cmd cmd;
gboolean final;
};
-enum rspamd_ratelimit_event_type {
- RATELIMIT_EVENT_NEW,
- RATELIMIT_EVENT_EXISTING,
- RATELIMIT_EVENT_BLACKLIST,
-};
-
-struct rspamd_ratelimit_callback_ctx {
- rspamd_inet_addr_t *addr; /* Client IP */
- const char *reason; /* "ratelimit" or "blacklisted" */
- enum rspamd_ratelimit_event_type type; /* new, existing, blacklist */
-
- /* Rate limit bucket state (optional) */
- struct rspamd_leaky_bucket_elt *bucket;
- double max_burst;
- double max_rate;
-
- /* Session context (optional - only for per-key limits) */
- struct fuzzy_session *session;
-};
-
static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
struct fuzzy_tcp_reply_queue_elt *reply);
static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
const char *source, gboolean final);
-static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
- rspamd_inet_addr_t *addr);
-static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
- rspamd_inet_addr_t *addr,
- const char *reason);
-static void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
- const struct rspamd_ratelimit_callback_ctx *cb_ctx);
-static struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg,
- const ucl_object_t *obj,
- khash_t(rspamd_fuzzy_keys_hash) * target);
static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
static void accept_tcp_socket(EV_P_ ev_io *w, int revents);
-static ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt);
-struct fuzzy_keymap_ucl_buf {
- rspamd_fstring_t *buf;
- struct rspamd_fuzzy_storage_ctx *ctx;
-};
-
-/* Callbacks for reading json dynamic rules */
-static char *
-ucl_keymap_read_cb(char *chunk,
- int len,
- struct map_cb_data *data,
- gboolean final)
-{
- struct fuzzy_keymap_ucl_buf *jb, *pd;
-
- pd = data->prev_data;
-
- g_assert(pd != NULL);
-
- if (data->cur_data == NULL) {
- jb = g_malloc0(sizeof(*jb));
- jb->ctx = pd->ctx;
- data->cur_data = jb;
- }
- else {
- jb = data->cur_data;
- }
-
- if (jb->buf == NULL) {
- /* Allocate memory for buffer */
- jb->buf = rspamd_fstring_sized_new(MAX(len, 4096));
- }
-
- jb->buf = rspamd_fstring_append(jb->buf, chunk, len);
-
- return NULL;
-}
-
-static void
-ucl_keymap_fin_cb(struct map_cb_data *data, void **target)
-{
- struct fuzzy_keymap_ucl_buf *jb;
- ucl_object_t *top;
- struct ucl_parser *parser;
- struct rspamd_config *cfg;
-
- /* Now parse ucl */
- if (data->cur_data) {
- jb = data->cur_data;
- cfg = jb->ctx->cfg;
- }
- else {
- msg_err("no cur data in the map! might be a bug");
- return;
- }
-
- if (jb->buf->len == 0) {
- msg_err_config("no data read");
-
- return;
- }
-
- parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
-
- if (!ucl_parser_add_chunk(parser, jb->buf->str, jb->buf->len)) {
- msg_err_config("cannot load ucl data: parse error %s",
- ucl_parser_get_error(parser));
- ucl_parser_free(parser);
- return;
- }
-
- top = ucl_parser_get_object(parser);
- ucl_parser_free(parser);
-
- if (ucl_object_type(top) != UCL_ARRAY) {
- ucl_object_unref(top);
- msg_err_config("loaded ucl is not an array");
- return;
- }
-
- if (target) {
- *target = data->cur_data;
- }
-
- if (data->prev_data) {
- jb = data->prev_data;
- /* Clean prev data */
- if (jb->buf) {
- rspamd_fstring_free(jb->buf);
- }
-
- /* Clean the existing keys */
- struct fuzzy_key *key;
- kh_foreach_value(jb->ctx->dynamic_keys, key, {
- REF_RELEASE(key);
- });
- kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
-
- /* Insert new keys */
- const ucl_object_t *cur;
- ucl_object_iter_t it = NULL;
- int success = 0;
-
- while ((cur = ucl_object_iterate(top, &it, true)) != NULL) {
- struct fuzzy_key *nk;
-
- nk = fuzzy_add_keypair_from_ucl(cfg, cur, jb->ctx->dynamic_keys);
-
- if (nk == NULL) {
- msg_warn_config("cannot add dynamic keypair");
- }
- success++;
- }
-
- msg_info_config("loaded %d dynamic keypairs", success);
-
- g_free(jb);
- }
-
- ucl_object_unref(top);
-}
-
-static void
-ucl_keymap_dtor_cb(struct map_cb_data *data)
-{
- struct fuzzy_keymap_ucl_buf *jb;
-
- if (data->cur_data) {
- jb = data->cur_data;
- /* Clean prev data */
- if (jb->buf) {
- rspamd_fstring_free(jb->buf);
- }
-
- struct fuzzy_key *key;
- kh_foreach_value(jb->ctx->dynamic_keys, key, {
- REF_RELEASE(key);
- });
- /* Clear hash content but don't destroy - mempool destructor will handle it */
- kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
-
- g_free(jb);
- }
-}
-
-enum rspamd_ratelimit_check_result {
- ratelimit_pass,
- ratelimit_new,
- ratelimit_existing,
-};
-
-enum rspamd_ratelimit_check_policy {
- ratelimit_policy_permanent,
- ratelimit_policy_normal,
-};
-
-static enum rspamd_ratelimit_check_result
-rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx,
- rspamd_inet_addr_t *addr,
- ev_tstamp timestamp,
- struct rspamd_leaky_bucket_elt *elt,
- enum rspamd_ratelimit_check_policy policy,
- double max_burst, double max_rate)
-{
- gboolean ratelimited = FALSE, new_ratelimit = FALSE;
-
- /* Nothing to check */
- if (isnan(max_burst) || isnan(max_rate)) {
- return ratelimit_pass;
- }
-
- if (isnan(elt->cur)) {
- /* There is an issue with the previous logic: the TTL is updated each time
- * we see that new bucket. Hence, we need to check the `last` and act accordingly
- */
- if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
- /*
- * We reset bucket to it's 90% capacity to allow some requests
- * This should cope with the issue when we block an IP network for some burst and never unblock it
- */
- elt->cur = max_burst * 0.9;
- elt->last = timestamp;
- }
- else {
- ratelimited = TRUE;
- }
- }
- else {
- /* Update bucket: leak some elements */
- if (elt->last < timestamp) {
- elt->cur -= max_rate * (timestamp - elt->last);
- elt->last = timestamp;
-
- if (elt->cur < 0) {
- elt->cur = 0;
- }
- }
- else {
- elt->last = timestamp;
- }
-
- /* Check the bucket */
- if (elt->cur >= max_burst) {
-
- if (policy == ratelimit_policy_permanent) {
- elt->cur = NAN;
- }
- new_ratelimit = TRUE;
- ratelimited = TRUE;
- }
- else {
- elt->cur++; /* Allow one more request */
- }
- }
-
- /* Note: Caller is responsible for calling the ratelimit handlers with
- * proper context (new vs existing, bucket info, session info, etc.)
- */
-
- if (new_ratelimit) {
- return ratelimit_new;
- }
-
- return ratelimited ? ratelimit_existing : ratelimit_pass;
-}
-
-static gboolean
-rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
- rspamd_inet_addr_t *addr,
- struct rspamd_worker *worker,
- ev_tstamp timestamp)
-{
- rspamd_inet_addr_t *masked;
- struct rspamd_leaky_bucket_elt *elt;
-
- if (!addr) {
- return TRUE;
- }
-
- if (ctx->ratelimit_whitelist != NULL) {
- if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
- addr) != NULL) {
- return TRUE;
- }
- }
-
- /* Skip ratelimit for local addresses */
- if (rspamd_inet_address_is_local(addr)) {
- return TRUE;
- }
-
- masked = rspamd_inet_address_copy(addr, NULL);
-
- if (rspamd_inet_address_get_af(masked) == AF_INET) {
- rspamd_inet_address_apply_mask(masked,
- MIN(ctx->leaky_bucket_mask, 32));
- }
- else {
- /* Must be at least /64 */
- rspamd_inet_address_apply_mask(masked,
- MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
- }
-
- elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
- (time_t) timestamp);
-
- if (elt) {
- enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr,
- timestamp, elt,
- ratelimit_policy_permanent,
- ctx->leaky_bucket_burst,
- ctx->leaky_bucket_rate);
-
- if (res == ratelimit_new) {
- msg_info("ratelimiting %s (%s), %.1f max elts",
- rspamd_inet_address_to_string(addr),
- rspamd_inet_address_to_string(masked),
- ctx->leaky_bucket_burst);
-
- struct rspamd_srv_command srv_cmd;
-
- srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
- srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
-
- if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
- socklen_t slen;
- struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
-
- if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
- memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
- msg_debug("propagating blocked address to other workers");
- rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
- }
- else {
- msg_err("bad address length: %d, expected to be %d",
- (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
- }
- }
-
- if (ctx->lua_blacklist_handlers) {
- struct rspamd_ratelimit_callback_ctx cb_ctx = {
- .addr = addr,
- .reason = "ratelimit",
- .type = RATELIMIT_EVENT_NEW,
- .bucket = elt,
- .max_burst = ctx->leaky_bucket_burst,
- .max_rate = ctx->leaky_bucket_rate,
- .session = NULL,
- };
- rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
- }
- }
- else if (res == ratelimit_existing) {
- if (ctx->lua_blacklist_handlers) {
- struct rspamd_ratelimit_callback_ctx cb_ctx = {
- .addr = addr,
- .reason = "ratelimit",
- .type = RATELIMIT_EVENT_EXISTING,
- .bucket = elt,
- .max_burst = ctx->leaky_bucket_burst,
- .max_rate = ctx->leaky_bucket_rate,
- .session = NULL,
- };
- rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
- }
- }
-
- rspamd_inet_address_free(masked);
-
- return res == ratelimit_pass;
- }
- else {
- /* New bucket */
- elt = g_malloc(sizeof(*elt));
- elt->addr = masked; /* transfer ownership */
- elt->cur = 1;
- elt->last = timestamp;
-
- rspamd_lru_hash_insert(ctx->ratelimit_buckets,
- masked,
- elt,
- timestamp,
- ctx->leaky_bucket_ttl);
- }
-
- return TRUE;
-}
-
-/*
- * Push bucket info as a Lua table
- */
-static void
-rspamd_fuzzy_bucket_info_tolua(lua_State *L,
- const struct rspamd_ratelimit_callback_ctx *cb_ctx)
-{
- if (!cb_ctx->bucket) {
- lua_pushnil(L);
- return;
- }
-
- lua_createtable(L, 0, 6);
-
- /* bucket_level - current fill level (nil if permanently blocked) */
- if (isnan(cb_ctx->bucket->cur)) {
- lua_pushnil(L);
- lua_setfield(L, -2, "bucket_level");
- lua_pushboolean(L, TRUE);
- lua_setfield(L, -2, "is_permanent");
- }
- else {
- lua_pushnumber(L, cb_ctx->bucket->cur);
- lua_setfield(L, -2, "bucket_level");
- lua_pushboolean(L, FALSE);
- lua_setfield(L, -2, "is_permanent");
- }
-
- /* max_burst */
- if (!isnan(cb_ctx->max_burst)) {
- lua_pushnumber(L, cb_ctx->max_burst);
- lua_setfield(L, -2, "max_burst");
- }
-
- /* max_rate */
- if (!isnan(cb_ctx->max_rate)) {
- lua_pushnumber(L, cb_ctx->max_rate);
- lua_setfield(L, -2, "max_rate");
- }
-
- /* exceeded_by - how much over the limit */
- if (!isnan(cb_ctx->bucket->cur) && !isnan(cb_ctx->max_burst) &&
- cb_ctx->bucket->cur > cb_ctx->max_burst) {
- lua_pushnumber(L, cb_ctx->bucket->cur - cb_ctx->max_burst);
- lua_setfield(L, -2, "exceeded_by");
- }
-
- /* last_seen */
- lua_pushnumber(L, cb_ctx->bucket->last);
- lua_setfield(L, -2, "last_seen");
-}
-
-/*
- * Push extensions from session or callback context as a Lua table
- */
-static void
-rspamd_fuzzy_ratelimit_extensions_tolua(lua_State *L,
- const struct rspamd_ratelimit_callback_ctx *cb_ctx)
-{
- struct rspamd_fuzzy_cmd_extension *ext;
- rspamd_inet_addr_t *addr;
-
- lua_createtable(L, 0, 2);
-
- if (!cb_ctx->session || !cb_ctx->session->extensions) {
- return;
- }
-
- LL_FOREACH(cb_ctx->session->extensions, ext)
- {
- switch (ext->ext) {
- case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN:
- lua_pushlstring(L, (const char *) ext->payload, ext->length);
- lua_setfield(L, -2, "domain");
- break;
- case RSPAMD_FUZZY_EXT_SOURCE_IP4:
- addr = rspamd_inet_address_new(AF_INET, ext->payload);
- rspamd_lua_ip_push(L, addr);
- rspamd_inet_address_free(addr);
- lua_setfield(L, -2, "source_ip");
- break;
- case RSPAMD_FUZZY_EXT_SOURCE_IP6:
- addr = rspamd_inet_address_new(AF_INET6, ext->payload);
- rspamd_lua_ip_push(L, addr);
- rspamd_inet_address_free(addr);
- lua_setfield(L, -2, "source_ip");
- break;
- }
- }
-}
-
-/*
- * Enhanced Lua callback for ratelimit/blacklist events
- * Passes 7 arguments to Lua:
- * 1. ip (rspamd_ip) - Client IP address
- * 2. reason (string) - "ratelimit" or "blacklisted"
- * 3. event_type (string) - "new", "existing", or "blacklist"
- * 4. ratelimit_info (table/nil) - Bucket state details
- * 5. digest (rspamd_text/nil) - Hash if session available
- * 6. extensions (table) - Domain, source IP from extensions
- */
-static void
-rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
- const struct rspamd_ratelimit_callback_ctx *cb_ctx)
-{
- if (ctx->lua_blacklist_handlers == NULL) {
- return;
- }
-
- struct rspamd_lua_fuzzy_script *cur;
- LL_FOREACH(ctx->lua_blacklist_handlers, cur)
- {
- lua_State *L = ctx->cfg->lua_state;
- int err_idx, ret;
- const int nargs = 6;
-
- lua_pushcfunction(L, &rspamd_lua_traceback);
- err_idx = lua_gettop(L);
- lua_checkstack(L, err_idx + nargs + 2);
- lua_rawgeti(L, LUA_REGISTRYINDEX, cur->cbref);
-
- /* Arg 1: client IP */
- rspamd_lua_ip_push(L, cb_ctx->addr);
-
- /* Arg 2: block reason */
- lua_pushstring(L, cb_ctx->reason);
-
- /* Arg 3: event type */
- switch (cb_ctx->type) {
- case RATELIMIT_EVENT_NEW:
- lua_pushliteral(L, "new");
- break;
- case RATELIMIT_EVENT_EXISTING:
- lua_pushliteral(L, "existing");
- break;
- case RATELIMIT_EVENT_BLACKLIST:
- lua_pushliteral(L, "blacklist");
- break;
- }
-
- /* Arg 4: ratelimit_info table (or nil) */
- rspamd_fuzzy_bucket_info_tolua(L, cb_ctx);
-
- /* Arg 5: digest (or nil) */
- if (cb_ctx->session) {
- (void) lua_new_text(L, (const char *) cb_ctx->session->cmd.basic.digest,
- sizeof(cb_ctx->session->cmd.basic.digest), FALSE);
- }
- else {
- lua_pushnil(L);
- }
-
- /* Arg 6: extensions table */
- rspamd_fuzzy_ratelimit_extensions_tolua(L, cb_ctx);
-
- if ((ret = lua_pcall(L, nargs, 0, err_idx)) != 0) {
- msg_err("call to lua_blacklist_cbref "
- "script failed (%d): %s",
- ret, lua_tostring(L, -1));
- }
-
- lua_settop(L, 0);
- }
-}
-
-/*
- * Backwards-compatible wrapper that calls the enhanced handler
- */
-static void
-rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
- rspamd_inet_addr_t *addr,
- const char *reason)
-{
- if (ctx->lua_blacklist_handlers == NULL) {
- return;
- }
-
- struct rspamd_ratelimit_callback_ctx cb_ctx = {
- .addr = addr,
- .reason = reason,
- .type = g_strcmp0(reason, "blacklisted") == 0 ? RATELIMIT_EVENT_BLACKLIST : RATELIMIT_EVENT_EXISTING,
- .bucket = NULL,
- .max_burst = NAN,
- .max_rate = NAN,
- .session = NULL,
- };
- rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
-}
-
-static gboolean
-rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
- rspamd_inet_addr_t *addr)
-{
- if (ctx->blocked_ips != NULL) {
- if (rspamd_match_radix_map_addr(ctx->blocked_ips,
- addr) != NULL) {
-
- rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "blacklisted");
- return FALSE;
- }
- }
-
- return TRUE;
-}
-
static gboolean
rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx,
rspamd_inet_addr_t *addr,
}
if (key) {
- if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) {
- return TRUE;
- }
- else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) {
- return TRUE;
- }
- }
-
- return FALSE;
-}
-
-static void
-fuzzy_key_stat_dtor(gpointer p)
-{
- struct fuzzy_key_stat *st = p;
-
- if (st->last_ips) {
- rspamd_lru_hash_destroy(st->last_ips);
- }
-
- if (st->keypair) {
- rspamd_keypair_unref(st->keypair);
- }
-
- g_free(st);
-}
-
-static void
-fuzzy_key_stat_unref(gpointer p)
-{
- struct fuzzy_key_stat *st = p;
-
- REF_RELEASE(st);
-}
-
-static void
-fuzzy_key_dtor(gpointer p)
-{
- struct fuzzy_key *key = p;
-
- if (key) {
- if (key->key) {
- rspamd_keypair_unref(key->key);
- }
-
- if (key->stat) {
- REF_RELEASE(key->stat);
- }
-
- if (key->flags_stat) {
- kh_destroy(fuzzy_key_flag_stat, key->flags_stat);
- }
-
- if (key->forbidden_ids) {
- kh_destroy(fuzzy_key_ids_set, key->forbidden_ids);
- }
-
- if (key->rl_bucket) {
- /* TODO: save bucket stats */
- g_free(key->rl_bucket);
- }
-
- if (key->name) {
- g_free(key->name);
+ if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) {
+ return TRUE;
}
-
- if (key->extensions) {
- ucl_object_unref(key->extensions);
+ else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) {
+ return TRUE;
}
-
- g_free(key);
}
-}
-static void
-fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash)
-{
- struct fuzzy_key *key;
- kh_foreach_value(hash, key, {
- REF_RELEASE(key);
- });
- kh_destroy(rspamd_fuzzy_keys_hash, hash);
+ return FALSE;
}
static void
ctx->stat.fuzzy_hashes = count;
}
-static void
-fuzzy_rl_bucket_free(gpointer p)
-{
- struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) p;
-
- rspamd_inet_address_free(elt->addr);
- g_free(elt);
-}
-
static void
fuzzy_stat_count_callback(uint64_t count, void *ud)
{
return TRUE;
}
-static ucl_object_t *
-rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat)
-{
- ucl_object_t *res;
-
- res = ucl_object_typed_new(UCL_OBJECT);
-
- ucl_object_insert_key(res, ucl_object_fromint(key_stat->checked),
- "checked", 0, false);
- ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->checked_ctr.mean),
- "checked_per_hour", 0, false);
- ucl_object_insert_key(res, ucl_object_fromint(key_stat->matched),
- "matched", 0, false);
- ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->matched_ctr.mean),
- "matched_per_hour", 0, false);
- ucl_object_insert_key(res, ucl_object_fromint(key_stat->added),
- "added", 0, false);
- ucl_object_insert_key(res, ucl_object_fromint(key_stat->deleted),
- "deleted", 0, false);
- ucl_object_insert_key(res, ucl_object_fromint(key_stat->errors),
- "errors", 0, false);
-
- return res;
-}
-
-static void
-rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter, struct fuzzy_key *fuzzy_key, ucl_object_t *keys_obj, gboolean ip_stat)
-{
- struct fuzzy_key_stat *key_stat = fuzzy_key->stat;
- char keyname[17];
-
- if (key_stat) {
- rspamd_snprintf(keyname, sizeof(keyname), "%8bs", pk_iter);
-
- ucl_object_t *elt = rspamd_fuzzy_storage_stat_key(key_stat);
-
- if (key_stat->last_ips && ip_stat) {
- int i = 0;
- ucl_object_t *ip_elt = ucl_object_typed_new(UCL_OBJECT);
- gpointer k, v;
-
- while ((i = rspamd_lru_hash_foreach(key_stat->last_ips,
- i, &k, &v)) != -1) {
- ucl_object_t *ip_cur = rspamd_fuzzy_storage_stat_key(v);
- ucl_object_insert_key(ip_elt, ip_cur,
- rspamd_inet_address_to_string(k), 0, true);
- }
- ucl_object_insert_key(elt, ip_elt, "ips", 0, false);
- }
-
- int flag;
- struct fuzzy_key_stat *flag_stat;
- ucl_object_t *flags_ucl = ucl_object_typed_new(UCL_OBJECT);
-
- kh_foreach_key_value_ptr(fuzzy_key->flags_stat, flag, flag_stat, {
- char intbuf[16];
- rspamd_snprintf(intbuf, sizeof(intbuf), "%d", flag);
- ucl_object_insert_key(flags_ucl, rspamd_fuzzy_storage_stat_key(flag_stat),
- intbuf, 0, true);
- });
-
- ucl_object_insert_key(elt, flags_ucl, "flags", 0, false);
-
- ucl_object_insert_key(elt,
- rspamd_keypair_to_ucl(fuzzy_key->key, RSPAMD_KEYPAIR_ENCODING_DEFAULT,
- RSPAMD_KEYPAIR_DUMP_NO_SECRET | RSPAMD_KEYPAIR_DUMP_FLATTENED),
- "keypair", 0, false);
-
- if (fuzzy_key->rl_bucket) {
- ucl_object_insert_key(elt,
- rspamd_leaky_bucket_to_ucl(fuzzy_key->rl_bucket),
- "ratelimit", 0, false);
- }
-
- ucl_object_insert_key(keys_obj, elt, keyname, 0, true);
- }
-}
-static ucl_object_t *
-rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt)
-{
- ucl_object_t *res;
-
- res = ucl_object_typed_new(UCL_OBJECT);
-
- ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->cur), "cur", 0, false);
- ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->last), "last", 0, false);
-
- return res;
-}
-
-static ucl_object_t *
-rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat)
-{
- struct fuzzy_key *fuzzy_key;
- ucl_object_t *obj, *keys_obj, *elt, *ip_elt;
- const unsigned char *pk_iter;
-
- obj = ucl_object_typed_new(UCL_OBJECT);
-
- keys_obj = ucl_object_typed_new(UCL_OBJECT);
-
- kh_foreach(ctx->keys, pk_iter, fuzzy_key, {
- rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
- });
-
- if (ctx->dynamic_keys) {
- kh_foreach(ctx->dynamic_keys, pk_iter, fuzzy_key, {
- rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
- });
- }
-
- ucl_object_insert_key(obj, keys_obj, "keys", 0, false);
-
- /* Now generic stats */
- ucl_object_insert_key(obj,
- ucl_object_fromint(ctx->stat.fuzzy_hashes),
- "fuzzy_stored",
- 0,
- false);
- ucl_object_insert_key(obj,
- ucl_object_fromint(ctx->stat.fuzzy_hashes_expired),
- "fuzzy_expired",
- 0,
- false);
- ucl_object_insert_key(obj,
- ucl_object_fromint(ctx->stat.invalid_requests),
- "invalid_requests",
- 0,
- false);
- ucl_object_insert_key(obj,
- ucl_object_fromint(ctx->stat.delayed_hashes),
- "delayed_hashes",
- 0,
- false);
-
- if (ctx->errors_ips && ip_stat) {
- gpointer k, v;
- int i = 0;
- ip_elt = ucl_object_typed_new(UCL_OBJECT);
-
- while ((i = rspamd_lru_hash_foreach(ctx->errors_ips, i, &k, &v)) != -1) {
- ucl_object_insert_key(ip_elt,
- ucl_object_fromint(*(uint64_t *) v),
- rspamd_inet_address_to_string(k), 0, true);
- }
-
- ucl_object_insert_key(obj,
- ip_elt,
- "errors_ips",
- 0,
- false);
- }
-
- /* Checked by epoch */
- elt = ucl_object_typed_new(UCL_ARRAY);
-
- for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
- ucl_array_append(elt,
- ucl_object_fromint(ctx->stat.fuzzy_hashes_checked[i]));
- }
-
- ucl_object_insert_key(obj, elt, "fuzzy_checked", 0, false);
-
- /* Shingles by epoch */
- elt = ucl_object_typed_new(UCL_ARRAY);
-
- for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
- ucl_array_append(elt,
- ucl_object_fromint(ctx->stat.fuzzy_shingles_checked[i]));
- }
-
- ucl_object_insert_key(obj, elt, "fuzzy_shingles", 0, false);
-
- /* Matched by epoch */
- elt = ucl_object_typed_new(UCL_ARRAY);
-
- for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
- ucl_array_append(elt,
- ucl_object_fromint(ctx->stat.fuzzy_hashes_found[i]));
- }
-
- ucl_object_insert_key(obj, elt, "fuzzy_found", 0, false);
-
-
- return obj;
-}
-
-static void
-rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
-{
- char path[PATH_MAX];
-
- rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
- RSPAMD_DBDIR);
-
- if (access(path, R_OK) != -1) {
- struct ucl_parser *parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
- if (ucl_parser_add_file(parser, path)) {
- ucl_object_t *obj = ucl_parser_get_object(parser);
- int loaded = 0;
-
- if (ucl_object_type(obj) == UCL_ARRAY) {
- ucl_object_iter_t it = NULL;
- const ucl_object_t *cur;
-
- while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
- const ucl_object_t *ip, *value, *last;
- const char *ip_str;
- double limit_val, last_val;
-
- ip = ucl_object_find_key(cur, "ip");
- value = ucl_object_find_key(cur, "value");
- last = ucl_object_find_key(cur, "last");
-
- if (ip == NULL || value == NULL || last == NULL) {
- msg_err("invalid ratelimit object");
- continue;
- }
-
- ip_str = ucl_object_tostring(ip);
- limit_val = ucl_object_todouble(value);
- last_val = ucl_object_todouble(last);
-
- if (ip_str == NULL || isnan(last_val)) {
- msg_err("invalid ratelimit object");
- continue;
- }
-
- rspamd_inet_addr_t *addr;
- if (rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str),
- RSPAMD_INET_ADDRESS_PARSE_NO_UNIX | RSPAMD_INET_ADDRESS_PARSE_NO_PORT)) {
- struct rspamd_leaky_bucket_elt *elt = g_malloc(sizeof(*elt));
-
- elt->cur = limit_val;
- elt->last = last_val;
- elt->addr = addr;
- rspamd_lru_hash_insert(ctx->ratelimit_buckets, addr, elt, elt->last, ctx->leaky_bucket_ttl);
- loaded++;
- }
- else {
- msg_err("invalid ratelimit ip: %s", ip_str);
- continue;
- }
- }
-
- msg_info("loaded %d ratelimit objects", loaded);
- }
-
- ucl_object_unref(obj);
- }
-
- ucl_parser_free(parser);
- }
-}
-
-static void
-rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
-{
- char path[PATH_MAX];
-
- rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl.new",
- RSPAMD_DBDIR);
- FILE *f = fopen(path, "w");
-
- if (f != NULL) {
- ucl_object_t *top = ucl_object_typed_new(UCL_ARRAY);
- int it = 0;
- gpointer k, v;
-
- ucl_object_reserve(top, rspamd_lru_hash_size(ctx->ratelimit_buckets));
-
- while ((it = rspamd_lru_hash_foreach(ctx->ratelimit_buckets, it, &k, &v)) != -1) {
- ucl_object_t *cur = ucl_object_typed_new(UCL_OBJECT);
- struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) v;
-
- ucl_object_insert_key(cur, ucl_object_fromdouble(elt->cur), "value", 0, false);
- ucl_object_insert_key(cur, ucl_object_fromdouble(elt->last), "last", 0, false);
- ucl_object_insert_key(cur, ucl_object_fromstring(rspamd_inet_address_to_string(elt->addr)), "ip", 0, false);
- ucl_array_append(top, cur);
- }
-
- if (ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, ucl_object_emit_file_funcs(f), NULL)) {
- char npath[PATH_MAX];
-
- fflush(f);
- rspamd_snprintf(npath, sizeof(npath), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
- RSPAMD_DBDIR);
-
- if (rename(path, npath) == -1) {
- msg_warn("cannot rename %s to %s: %s", path, npath, strerror(errno));
- }
- else {
- msg_info("saved %d ratelimits in %s", rspamd_lru_hash_size(ctx->ratelimit_buckets), npath);
- }
- }
- else {
- msg_warn("cannot serialize ratelimit buckets to %s: %s", path, strerror(errno));
- }
-
- fclose(f);
- ucl_object_unref(top);
- }
- else {
- msg_warn("cannot save ratelimit buckets to %s: %s", path, strerror(errno));
- }
-}
-
static int
lua_fuzzy_add_pre_handler(lua_State *L)
{
return 0;
}
-static gboolean
-rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main,
- struct rspamd_worker *worker, int fd,
- int attached_fd,
- struct rspamd_control_command *cmd,
- gpointer ud)
-{
- struct rspamd_fuzzy_storage_ctx *ctx = ud;
- struct rspamd_control_reply rep;
- ucl_object_t *obj;
- struct ucl_emitter_functions *emit_subr;
- unsigned char fdspace[CMSG_SPACE(sizeof(int))];
- struct iovec iov;
- struct msghdr msg;
- struct cmsghdr *cmsg;
-
- int outfd = -1;
- char tmppath[PATH_MAX];
-
- memset(&rep, 0, sizeof(rep));
- rep.type = RSPAMD_CONTROL_FUZZY_STAT;
- rep.id = cmd->id;
-
- rspamd_snprintf(tmppath, sizeof(tmppath), "%s%c%s-XXXXXXXXXX",
- rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat");
-
- if ((outfd = mkstemp(tmppath)) == -1) {
- rep.reply.fuzzy_stat.status = errno;
- msg_info_main("cannot make temporary stat file for fuzzy stat: %s",
- strerror(errno));
- }
- else {
- const char *backend_id;
-
- rep.reply.fuzzy_stat.status = 0;
-
- backend_id = rspamd_fuzzy_backend_id(ctx->backend);
- if (backend_id) {
- memcpy(rep.reply.fuzzy_stat.storage_id,
- backend_id,
- sizeof(rep.reply.fuzzy_stat.storage_id));
- }
-
- obj = rspamd_fuzzy_stat_to_ucl(ctx, TRUE);
- emit_subr = ucl_object_emit_fd_funcs(outfd);
- ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL);
- ucl_object_emit_funcs_free(emit_subr);
- ucl_object_unref(obj);
- /* Rewind output file */
- close(outfd);
- outfd = open(tmppath, O_RDONLY);
- unlink(tmppath);
- }
-
- /* Now we can send outfd and status message */
- memset(&msg, 0, sizeof(msg));
-
- /* Attach fd to the message */
- if (outfd != -1) {
- memset(fdspace, 0, sizeof(fdspace));
- msg.msg_control = fdspace;
- msg.msg_controllen = sizeof(fdspace);
- cmsg = CMSG_FIRSTHDR(&msg);
-
- if (cmsg) {
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = CMSG_LEN(sizeof(int));
- memcpy(CMSG_DATA(cmsg), &outfd, sizeof(int));
- }
- }
-
- iov.iov_base = &rep;
- iov.iov_len = sizeof(rep);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
-
- if (sendmsg(fd, &msg, 0) == -1) {
- msg_err_main("cannot send fuzzy stat: %s", strerror(errno));
- }
-
- if (outfd != -1) {
- close(outfd);
- }
-
- return TRUE;
-}
-
-static gboolean
-fuzzy_parse_ids(rspamd_mempool_t *pool,
- const ucl_object_t *obj,
- gpointer ud,
- struct rspamd_rcl_section *section,
- GError **err)
-{
- struct rspamd_rcl_struct_parser *pd = (struct rspamd_rcl_struct_parser *) ud;
- khash_t(fuzzy_key_ids_set) * target;
-
- target = *(khash_t(fuzzy_key_ids_set) **) ((char *) pd->user_struct + pd->offset);
-
- if (ucl_object_type(obj) == UCL_ARRAY) {
- const ucl_object_t *cur;
- ucl_object_iter_t it = NULL;
- uint64_t id;
-
- while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
- if (ucl_object_toint_safe(cur, &id)) {
- int r;
-
- kh_put(fuzzy_key_ids_set, target, id, &r);
- }
- else {
- return FALSE;
- }
- }
-
- return TRUE;
- }
- else if (ucl_object_type(obj) == UCL_INT) {
- int r;
- kh_put(fuzzy_key_ids_set, target, ucl_object_toint(obj), &r);
-
- return TRUE;
- }
-
- return FALSE;
-}
-
-static struct fuzzy_key *
-fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj,
- khash_t(rspamd_fuzzy_keys_hash) * target)
-{
- struct rspamd_cryptobox_keypair *kp = rspamd_keypair_from_ucl(obj);
-
- if (kp == NULL) {
- return NULL;
- }
-
- if (rspamd_keypair_type(kp) != RSPAMD_KEYPAIR_KEX) {
- rspamd_keypair_unref(kp);
- return FALSE;
- }
-
- struct fuzzy_key *key = g_malloc0(sizeof(*key));
- REF_INIT_RETAIN(key, fuzzy_key_dtor);
- key->key = kp;
- struct fuzzy_key_stat *keystat = g_malloc0(sizeof(*keystat));
- REF_INIT_RETAIN(keystat, fuzzy_key_stat_dtor);
- /* Hash of ip -> fuzzy_key_stat */
- keystat->last_ips = rspamd_lru_hash_new_full(1024,
- (GDestroyNotify) rspamd_inet_address_free,
- fuzzy_key_stat_unref,
- rspamd_inet_address_hash, rspamd_inet_address_equal);
- key->stat = keystat;
- key->flags_stat = kh_init(fuzzy_key_flag_stat);
- key->burst = NAN;
- key->rate = NAN;
- key->expire = NAN;
- key->rl_bucket = NULL;
- /* Allow read by default */
- key->flags = FUZZY_KEY_READ;
- /* Preallocate some space for flags */
- kh_resize(fuzzy_key_flag_stat, key->flags_stat, 8);
- const unsigned char *pk = rspamd_keypair_component(kp, RSPAMD_KEYPAIR_COMPONENT_PK,
- NULL);
- keystat->keypair = rspamd_keypair_ref(kp);
- /* We map entries by pubkey in binary form for faster lookup */
- khiter_t k;
- int r;
-
- k = kh_put(rspamd_fuzzy_keys_hash, target, pk, &r);
-
- if (r == 0) {
- msg_err("duplicate keypair found: pk=%*bs",
- 32, pk);
- REF_RELEASE(key);
-
- return FALSE;
- }
- else if (r == -1) {
- msg_err("hash insertion error: pk=%*bs",
- 32, pk);
- REF_RELEASE(key);
-
- return FALSE;
- }
-
- kh_val(target, k) = key;
-
- const ucl_object_t *extensions = rspamd_keypair_get_extensions(kp);
-
- if (extensions) {
- key->extensions = ucl_object_ref(extensions);
- lua_State *L = RSPAMD_LUA_CFG_STATE(cfg);
- const ucl_object_t *forbidden_ids = ucl_object_lookup(extensions, "forbidden_ids");
-
- if (forbidden_ids && ucl_object_type(forbidden_ids) == UCL_ARRAY) {
- key->forbidden_ids = kh_init(fuzzy_key_ids_set);
- const ucl_object_t *cur;
- ucl_object_iter_t it = NULL;
-
- while ((cur = ucl_object_iterate(forbidden_ids, &it, true)) != NULL) {
- if (ucl_object_type(cur) == UCL_INT || ucl_object_type(cur) == UCL_FLOAT) {
- int id = ucl_object_toint(cur);
- int r;
-
- kh_put(fuzzy_key_ids_set, key->forbidden_ids, id, &r);
- }
- }
- }
-
- const ucl_object_t *ratelimit = ucl_object_lookup(extensions, "ratelimit");
-
- static int ratelimit_lua_id = -1;
-
- if (ratelimit_lua_id == -1) {
- /* Load ratelimit parsing function */
- if (!rspamd_lua_require_function(L, "plugins/ratelimit", "parse_limit")) {
- msg_err_config("cannot load ratelimit parser from ratelimit plugin");
- }
- else {
- ratelimit_lua_id = luaL_ref(L, LUA_REGISTRYINDEX);
- }
- }
-
- if (ratelimit && ratelimit_lua_id != -1) {
- lua_rawgeti(L, LUA_REGISTRYINDEX, ratelimit_lua_id);
- lua_pushstring(L, "fuzzy_key_ratelimit");
- ucl_object_push_lua(L, ratelimit, false);
-
- if (lua_pcall(L, 2, 1, 0) != 0) {
- msg_err_config("cannot call ratelimit parser from ratelimit plugin");
- }
- else {
- if (lua_type(L, -1) == LUA_TTABLE) {
- /*
- * The returned table is in form { rate = xx, burst = yy }
- */
- lua_getfield(L, -1, "rate");
- key->rate = lua_tonumber(L, -1);
- lua_pop(L, 1);
-
- lua_getfield(L, -1, "burst");
- key->burst = lua_tonumber(L, -1);
- lua_pop(L, 1);
-
- key->rl_bucket = g_malloc0(sizeof(*key->rl_bucket));
- }
- }
-
- lua_settop(L, 0);
- }
-
- const ucl_object_t *expire = ucl_object_lookup(extensions, "expire");
- if (expire && ucl_object_type(expire) == UCL_STRING) {
- struct tm tm;
-
- /* DD-MM-YYYY */
- char *end = strptime(ucl_object_tostring(expire), "%d-%m-%Y", &tm);
-
- if (end != NULL && *end != '\0') {
- msg_err_config("cannot parse expire date: %s", ucl_object_tostring(expire));
- }
- else {
- key->expire = mktime(&tm);
- }
- }
-
- const ucl_object_t *name = ucl_object_lookup(extensions, "name");
- if (name && ucl_object_type(name) == UCL_STRING) {
- key->name = g_strdup(ucl_object_tostring(name));
- }
-
- /* Check permissions */
- const ucl_object_t *read_only = ucl_object_lookup(extensions, "read_only");
- if (read_only && ucl_object_type(read_only) == UCL_BOOLEAN) {
- if (ucl_object_toboolean(read_only)) {
- key->flags &= ~(FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
- }
- else {
- key->flags |= (FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
- }
- }
-
- const ucl_object_t *allowed_ops = ucl_object_lookup(extensions, "allowed_ops");
- if (allowed_ops && ucl_object_type(allowed_ops) == UCL_ARRAY) {
- const ucl_object_t *cur;
- ucl_object_iter_t it = NULL;
- /* Reset to only allowed */
- key->flags = 0;
-
- while ((cur = ucl_object_iterate(allowed_ops, &it, true)) != NULL) {
- if (ucl_object_type(cur) == UCL_STRING) {
- const char *op = ucl_object_tostring(cur);
-
- if (g_ascii_strcasecmp(op, "read") == 0) {
- key->flags |= FUZZY_KEY_READ;
- }
- else if (g_ascii_strcasecmp(op, "write") == 0) {
- key->flags |= FUZZY_KEY_WRITE;
- }
- else if (g_ascii_strcasecmp(op, "delete") == 0) {
- key->flags |= FUZZY_KEY_DELETE;
- }
- else {
- msg_warn_config("invalid operation: %s", op);
- }
- }
- }
- }
- }
-
- msg_debug("loaded keypair %*bs; expire=%f; rate=%f; burst=%f; name=%s",
- (int) crypto_box_publickeybytes(), pk,
- key->expire, key->rate, key->burst, key->name);
-
- return key;
-}
-
-
-static gboolean
-fuzzy_parse_keypair(rspamd_mempool_t *pool,
- const ucl_object_t *obj,
- gpointer ud,
- struct rspamd_rcl_section *section,
- GError **err)
-{
- struct rspamd_rcl_struct_parser *pd = ud;
- struct rspamd_fuzzy_storage_ctx *ctx;
- struct fuzzy_key *key;
- const ucl_object_t *cur;
- ucl_object_iter_t it = NULL;
- gboolean ret;
-
- ctx = pd->user_struct;
- pd->offset = G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, default_keypair);
-
- /*
- * Single key
- */
- if (ucl_object_type(obj) == UCL_STRING || ucl_object_type(obj) == UCL_OBJECT) {
- ret = rspamd_rcl_parse_struct_keypair(pool, obj, pd, section, err);
-
- if (!ret) {
- return ret;
- }
-
- key = fuzzy_add_keypair_from_ucl(ctx->cfg, obj, ctx->keys);
-
- if (key == NULL) {
- return FALSE;
- }
-
- /* Use the last one ? */
- ctx->default_key = key;
- }
- else if (ucl_object_type(obj) == UCL_ARRAY) {
- while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
- if (!fuzzy_parse_keypair(pool, cur, pd, section, err)) {
- msg_err_pool("cannot parse keypair");
- }
- }
- }
-
- return TRUE;
-}
-
gpointer
init_fuzzy(struct rspamd_config *cfg)
{
${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_sqlite.c
${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_redis.c
${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_noop.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_keys.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_ratelimit.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_stat.c
${CMAKE_CURRENT_SOURCE_DIR}/milter.c
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
--- /dev/null
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef RSPAMD_FUZZY_STORAGE_INTERNAL_H
+#define RSPAMD_FUZZY_STORAGE_INTERNAL_H
+
+#include "config.h"
+
+#include "rspamd.h"
+#include "ref.h"
+#include "libserver/fuzzy_wire.h"
+#include "libutil/hash.h"
+#include "contrib/libev/ev.h"
+#include "contrib/libucl/khash.h"
+
+#include <string.h>
+
+struct map_cb_data;
+struct rspamd_rcl_section;
+struct rspamd_rcl_struct_parser;
+struct rspamd_control_command;
+
+struct rspamd_main;
+struct rspamd_worker;
+struct rspamd_dns_resolver;
+struct rspamd_radix_map_helper;
+struct rspamd_hash_map_helper;
+struct rspamd_keypair_cache;
+struct rspamd_http_context;
+struct rspamd_fuzzy_backend;
+
+struct rspamd_cryptobox_keypair;
+struct rspamd_cryptobox_pubkey;
+
+struct fuzzy_tcp_session;
+
+struct fuzzy_global_stat {
+ uint64_t fuzzy_hashes;
+ uint64_t fuzzy_hashes_expired;
+ uint64_t fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX];
+ uint64_t fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX];
+ uint64_t fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX];
+ uint64_t invalid_requests;
+ uint64_t delayed_hashes;
+};
+
+struct fuzzy_key_stat {
+ uint64_t checked;
+ uint64_t matched;
+ uint64_t added;
+ uint64_t deleted;
+ uint64_t errors;
+ struct rspamd_counter_data checked_ctr;
+ struct rspamd_counter_data matched_ctr;
+ double last_checked_time;
+ uint64_t last_checked_count;
+ uint64_t last_matched_count;
+ struct rspamd_cryptobox_keypair *keypair;
+ rspamd_lru_hash_t *last_ips;
+
+ ref_entry_t ref;
+};
+
+struct rspamd_leaky_bucket_elt {
+ rspamd_inet_addr_t *addr;
+ double last;
+ double cur;
+};
+
+static inline int64_t
+fuzzy_kp_hash(const unsigned char *p)
+{
+ int64_t res;
+
+ memcpy(&res, p, sizeof(res));
+ return res;
+}
+
+static inline bool
+fuzzy_kp_equal(gconstpointer a, gconstpointer b)
+{
+ const unsigned char *pa = a, *pb = b;
+
+ return (memcmp(pa, pb, RSPAMD_FUZZY_KEYLEN) == 0);
+}
+
+enum fuzzy_key_op {
+ FUZZY_KEY_READ = 0x1u << 0,
+ FUZZY_KEY_WRITE = 0x1u << 1,
+ FUZZY_KEY_DELETE = 0x1u << 2,
+};
+
+KHASH_SET_INIT_INT(fuzzy_key_ids_set);
+KHASH_INIT(fuzzy_key_flag_stat, int, struct fuzzy_key_stat, 1, kh_int_hash_func,
+ kh_int_hash_equal);
+
+struct fuzzy_key {
+ char *name;
+ struct rspamd_cryptobox_keypair *key;
+ struct rspamd_cryptobox_pubkey *pk;
+ struct fuzzy_key_stat *stat;
+ khash_t(fuzzy_key_flag_stat) * flags_stat;
+ khash_t(fuzzy_key_ids_set) * forbidden_ids;
+ struct rspamd_leaky_bucket_elt *rl_bucket;
+ ucl_object_t *extensions;
+ double burst;
+ double rate;
+ ev_tstamp expire;
+ bool expired;
+ int flags; /* enum fuzzy_key_op */
+ ref_entry_t ref;
+};
+
+KHASH_INIT(rspamd_fuzzy_keys_hash,
+ const unsigned char *, struct fuzzy_key *, 1,
+ fuzzy_kp_hash, fuzzy_kp_equal);
+
+struct rspamd_lua_fuzzy_script {
+ int cbref;
+ struct rspamd_lua_fuzzy_script *next;
+};
+
+struct rspamd_fuzzy_storage_ctx {
+ uint64_t magic;
+ struct ev_loop *event_loop;
+ struct rspamd_dns_resolver *resolver;
+ struct rspamd_config *cfg;
+ struct fuzzy_global_stat stat;
+ double expire;
+ double sync_timeout;
+ double delay;
+ double tcp_timeout;
+ struct rspamd_radix_map_helper *update_ips;
+ struct rspamd_hash_map_helper *update_keys;
+ struct rspamd_radix_map_helper *blocked_ips;
+ struct rspamd_radix_map_helper *ratelimit_whitelist;
+ struct rspamd_radix_map_helper *delay_whitelist;
+
+ const ucl_object_t *update_map;
+ const ucl_object_t *update_keys_map;
+ const ucl_object_t *delay_whitelist_map;
+ const ucl_object_t *blocked_map;
+ const ucl_object_t *ratelimit_whitelist_map;
+ const ucl_object_t *dynamic_keys_map;
+
+ unsigned int keypair_cache_size;
+ ev_timer stat_ev;
+ ev_io peer_ev;
+
+ struct rspamd_cryptobox_keypair *default_keypair;
+ struct fuzzy_key *default_key;
+ khash_t(rspamd_fuzzy_keys_hash) * keys;
+ khash_t(rspamd_fuzzy_keys_hash) * dynamic_keys;
+
+ gboolean encrypted_only;
+ gboolean read_only;
+ gboolean dedicated_update_worker;
+ struct rspamd_keypair_cache *keypair_cache;
+ struct rspamd_http_context *http_ctx;
+ rspamd_lru_hash_t *errors_ips;
+ rspamd_lru_hash_t *ratelimit_buckets;
+ struct rspamd_fuzzy_backend *backend;
+ GArray *updates_pending;
+ unsigned int updates_failed;
+ unsigned int updates_maxfail;
+ int peer_fd;
+
+ unsigned int leaky_bucket_ttl;
+ unsigned int leaky_bucket_mask;
+ unsigned int max_buckets;
+ gboolean ratelimit_log_only;
+ double leaky_bucket_burst;
+ double leaky_bucket_rate;
+
+ struct rspamd_worker *worker;
+ const ucl_object_t *skip_map;
+ struct rspamd_hash_map_helper *skip_hashes;
+ struct rspamd_lua_fuzzy_script *lua_pre_handlers;
+ struct rspamd_lua_fuzzy_script *lua_post_handlers;
+ struct rspamd_lua_fuzzy_script *lua_blacklist_handlers;
+ khash_t(fuzzy_key_ids_set) * default_forbidden_ids;
+ khash_t(fuzzy_key_ids_set) * weak_ids;
+};
+
+enum fuzzy_cmd_type {
+ CMD_NORMAL,
+ CMD_SHINGLE,
+ CMD_ENCRYPTED_NORMAL,
+ CMD_ENCRYPTED_SHINGLE
+};
+
+/* Legacy structure name for compatibility during refactoring */
+struct fuzzy_session {
+ struct rspamd_worker *worker;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+
+ struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */
+ struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
+ struct fuzzy_key_stat *ip_stat;
+
+ enum rspamd_fuzzy_epoch epoch;
+ enum fuzzy_cmd_type cmd_type;
+ int fd;
+ ev_tstamp timestamp;
+ struct ev_io io;
+ ref_entry_t ref;
+ struct fuzzy_key *key;
+ struct rspamd_fuzzy_cmd_extension *extensions;
+ unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+ /* If this is a TCP session, this pointer will be set */
+ struct fuzzy_tcp_session *tcp_session;
+};
+
+enum rspamd_ratelimit_event_type {
+ RATELIMIT_EVENT_NEW,
+ RATELIMIT_EVENT_EXISTING,
+ RATELIMIT_EVENT_BLACKLIST,
+};
+
+struct rspamd_ratelimit_callback_ctx {
+ rspamd_inet_addr_t *addr;
+ const char *reason;
+ enum rspamd_ratelimit_event_type type;
+
+ struct rspamd_leaky_bucket_elt *bucket;
+ double max_burst;
+ double max_rate;
+
+ struct fuzzy_session *session;
+};
+
+struct fuzzy_keymap_ucl_buf {
+ rspamd_fstring_t *buf;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+};
+
+enum rspamd_ratelimit_check_result {
+ ratelimit_pass,
+ ratelimit_new,
+ ratelimit_existing,
+};
+
+enum rspamd_ratelimit_check_policy {
+ ratelimit_policy_permanent,
+ ratelimit_policy_normal,
+};
+
+/* Keys/dynamic map */
+char *ucl_keymap_read_cb(char *chunk, int len, struct map_cb_data *data, gboolean final);
+void ucl_keymap_fin_cb(struct map_cb_data *data, void **target);
+void ucl_keymap_dtor_cb(struct map_cb_data *data);
+
+void fuzzy_key_stat_dtor(gpointer p);
+void fuzzy_key_stat_unref(gpointer p);
+void fuzzy_key_dtor(gpointer p);
+void fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash);
+
+gboolean fuzzy_parse_ids(rspamd_mempool_t *pool, const ucl_object_t *obj,
+ gpointer ud, struct rspamd_rcl_section *section, GError **err);
+struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj,
+ khash_t(rspamd_fuzzy_keys_hash) * target);
+gboolean fuzzy_parse_keypair(rspamd_mempool_t *pool, const ucl_object_t *obj,
+ gpointer ud, struct rspamd_rcl_section *section, GError **err);
+
+/* Ratelimit */
+void fuzzy_rl_bucket_free(gpointer p);
+
+enum rspamd_ratelimit_check_result rspamd_fuzzy_check_ratelimit_bucket(
+ struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ ev_tstamp timestamp,
+ struct rspamd_leaky_bucket_elt *elt,
+ enum rspamd_ratelimit_check_policy policy,
+ double max_burst, double max_rate);
+
+gboolean rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ struct rspamd_worker *worker,
+ ev_tstamp timestamp);
+
+void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
+ const struct rspamd_ratelimit_callback_ctx *cb_ctx);
+void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ const char *reason);
+
+gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr);
+
+ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt);
+void rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx);
+void rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx);
+
+/* Stats / controller */
+ucl_object_t *rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat);
+void rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter,
+ struct fuzzy_key *fuzzy_key,
+ ucl_object_t *keys_obj,
+ gboolean ip_stat);
+ucl_object_t *rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat);
+
+gboolean rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, int fd,
+ int attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud);
+
+#endif
--- /dev/null
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Rspamd fuzzy storage server: keys and dynamic keymaps
+ */
+
+#include "config.h"
+
+#include "fuzzy_storage_internal.h"
+
+#include "cfg_rcl.h"
+#include "libcryptobox/cryptobox.h"
+#include "libcryptobox/keypair.h"
+#include "libserver/maps/map.h"
+#include "lua/lua_common.h"
+#include "unix-std.h"
+
+#include <math.h>
+#include <string.h>
+#include <time.h>
+
+char *
+ucl_keymap_read_cb(char *chunk, int len,
+ struct map_cb_data *data, gboolean final)
+{
+ struct fuzzy_keymap_ucl_buf *jb, *pd;
+
+ pd = data->prev_data;
+
+ g_assert(pd != NULL);
+
+ if (data->cur_data == NULL) {
+ jb = g_malloc0(sizeof(*jb));
+ jb->ctx = pd->ctx;
+ data->cur_data = jb;
+ }
+ else {
+ jb = data->cur_data;
+ }
+
+ if (jb->buf == NULL) {
+ /* Allocate memory for buffer */
+ jb->buf = rspamd_fstring_sized_new(MAX(len, 4096));
+ }
+
+ jb->buf = rspamd_fstring_append(jb->buf, chunk, len);
+
+ return NULL;
+}
+
+void ucl_keymap_fin_cb(struct map_cb_data *data, void **target)
+{
+ struct fuzzy_keymap_ucl_buf *jb;
+ ucl_object_t *top;
+ struct ucl_parser *parser;
+ struct rspamd_config *cfg;
+
+ /* Now parse ucl */
+ if (data->cur_data) {
+ jb = data->cur_data;
+ cfg = jb->ctx->cfg;
+ }
+ else {
+ msg_err("no cur data in the map! might be a bug");
+ return;
+ }
+
+ if (jb->buf->len == 0) {
+ msg_err_config("no data read");
+
+ return;
+ }
+
+ parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
+
+ if (!ucl_parser_add_chunk(parser, jb->buf->str, jb->buf->len)) {
+ msg_err_config("cannot load ucl data: parse error %s",
+ ucl_parser_get_error(parser));
+ ucl_parser_free(parser);
+ return;
+ }
+
+ top = ucl_parser_get_object(parser);
+ ucl_parser_free(parser);
+
+ if (ucl_object_type(top) != UCL_ARRAY) {
+ ucl_object_unref(top);
+ msg_err_config("loaded ucl is not an array");
+ return;
+ }
+
+ if (target) {
+ *target = data->cur_data;
+ }
+
+ if (data->prev_data) {
+ jb = data->prev_data;
+ /* Clean prev data */
+ if (jb->buf) {
+ rspamd_fstring_free(jb->buf);
+ }
+
+ /* Clean the existing keys */
+ struct fuzzy_key *key;
+ kh_foreach_value(jb->ctx->dynamic_keys, key, {
+ REF_RELEASE(key);
+ });
+ kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
+
+ /* Insert new keys */
+ const ucl_object_t *cur;
+ ucl_object_iter_t it = NULL;
+ int success = 0;
+
+ while ((cur = ucl_object_iterate(top, &it, true)) != NULL) {
+ struct fuzzy_key *nk;
+
+ nk = fuzzy_add_keypair_from_ucl(cfg, cur, jb->ctx->dynamic_keys);
+
+ if (nk == NULL) {
+ msg_warn_config("cannot add dynamic keypair");
+ }
+ success++;
+ }
+
+ msg_info_config("loaded %d dynamic keypairs", success);
+
+ g_free(jb);
+ }
+
+ ucl_object_unref(top);
+}
+
+void ucl_keymap_dtor_cb(struct map_cb_data *data)
+{
+ struct fuzzy_keymap_ucl_buf *jb;
+
+ if (data->cur_data) {
+ jb = data->cur_data;
+ /* Clean prev data */
+ if (jb->buf) {
+ rspamd_fstring_free(jb->buf);
+ }
+
+ struct fuzzy_key *key;
+ kh_foreach_value(jb->ctx->dynamic_keys, key, {
+ REF_RELEASE(key);
+ });
+ /* Clear hash content but don't destroy - mempool destructor will handle it */
+ kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys);
+
+ g_free(jb);
+ }
+}
+
+void fuzzy_key_stat_dtor(gpointer p)
+{
+ struct fuzzy_key_stat *st = p;
+
+ if (st->last_ips) {
+ rspamd_lru_hash_destroy(st->last_ips);
+ }
+
+ if (st->keypair) {
+ rspamd_keypair_unref(st->keypair);
+ }
+
+ g_free(st);
+}
+
+void fuzzy_key_stat_unref(gpointer p)
+{
+ struct fuzzy_key_stat *st = p;
+
+ REF_RELEASE(st);
+}
+
+void fuzzy_key_dtor(gpointer p)
+{
+ struct fuzzy_key *key = p;
+
+ if (key) {
+ if (key->key) {
+ rspamd_keypair_unref(key->key);
+ }
+
+ if (key->stat) {
+ REF_RELEASE(key->stat);
+ }
+
+ if (key->flags_stat) {
+ kh_destroy(fuzzy_key_flag_stat, key->flags_stat);
+ }
+
+ if (key->forbidden_ids) {
+ kh_destroy(fuzzy_key_ids_set, key->forbidden_ids);
+ }
+
+ if (key->rl_bucket) {
+ /* TODO: save bucket stats */
+ g_free(key->rl_bucket);
+ }
+
+ if (key->name) {
+ g_free(key->name);
+ }
+
+ if (key->extensions) {
+ ucl_object_unref(key->extensions);
+ }
+
+ g_free(key);
+ }
+}
+
+void fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash)
+{
+ struct fuzzy_key *key;
+ kh_foreach_value(hash, key, {
+ REF_RELEASE(key);
+ });
+ kh_destroy(rspamd_fuzzy_keys_hash, hash);
+}
+
+gboolean
+fuzzy_parse_ids(rspamd_mempool_t *pool,
+ const ucl_object_t *obj,
+ gpointer ud,
+ struct rspamd_rcl_section *section,
+ GError **err)
+{
+ struct rspamd_rcl_struct_parser *pd = (struct rspamd_rcl_struct_parser *) ud;
+ khash_t(fuzzy_key_ids_set) * target;
+
+ target = *(khash_t(fuzzy_key_ids_set) **) ((char *) pd->user_struct + pd->offset);
+
+ if (ucl_object_type(obj) == UCL_ARRAY) {
+ const ucl_object_t *cur;
+ ucl_object_iter_t it = NULL;
+ uint64_t id;
+
+ while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+ if (ucl_object_toint_safe(cur, &id)) {
+ int r;
+
+ kh_put(fuzzy_key_ids_set, target, id, &r);
+ }
+ else {
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+ }
+ else if (ucl_object_type(obj) == UCL_INT) {
+ int r;
+ kh_put(fuzzy_key_ids_set, target, ucl_object_toint(obj), &r);
+
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+struct fuzzy_key *
+fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg,
+ const ucl_object_t *obj,
+ khash_t(rspamd_fuzzy_keys_hash) * target)
+{
+ struct rspamd_cryptobox_keypair *kp = rspamd_keypair_from_ucl(obj);
+
+ if (kp == NULL) {
+ return NULL;
+ }
+
+ if (rspamd_keypair_type(kp) != RSPAMD_KEYPAIR_KEX) {
+ rspamd_keypair_unref(kp);
+ return NULL;
+ }
+
+ struct fuzzy_key *key = g_malloc0(sizeof(*key));
+ REF_INIT_RETAIN(key, fuzzy_key_dtor);
+ key->key = kp;
+ struct fuzzy_key_stat *keystat = g_malloc0(sizeof(*keystat));
+ REF_INIT_RETAIN(keystat, fuzzy_key_stat_dtor);
+ /* Hash of ip -> fuzzy_key_stat */
+ keystat->last_ips = rspamd_lru_hash_new_full(1024,
+ (GDestroyNotify) rspamd_inet_address_free,
+ fuzzy_key_stat_unref,
+ rspamd_inet_address_hash, rspamd_inet_address_equal);
+ key->stat = keystat;
+ key->flags_stat = kh_init(fuzzy_key_flag_stat);
+ key->burst = NAN;
+ key->rate = NAN;
+ key->expire = NAN;
+ key->rl_bucket = NULL;
+ /* Allow read by default */
+ key->flags = FUZZY_KEY_READ;
+ /* Preallocate some space for flags */
+ kh_resize(fuzzy_key_flag_stat, key->flags_stat, 8);
+ const unsigned char *pk = rspamd_keypair_component(kp, RSPAMD_KEYPAIR_COMPONENT_PK,
+ NULL);
+ keystat->keypair = rspamd_keypair_ref(kp);
+ /* We map entries by pubkey in binary form for faster lookup */
+ khiter_t k;
+ int r;
+
+ k = kh_put(rspamd_fuzzy_keys_hash, target, pk, &r);
+
+ if (r == 0) {
+ msg_err("duplicate keypair found: pk=%*bs",
+ 32, pk);
+ REF_RELEASE(key);
+
+ return NULL;
+ }
+ else if (r == -1) {
+ msg_err("hash insertion error: pk=%*bs",
+ 32, pk);
+ REF_RELEASE(key);
+
+ return NULL;
+ }
+
+ kh_val(target, k) = key;
+
+ const ucl_object_t *extensions = rspamd_keypair_get_extensions(kp);
+
+ if (extensions) {
+ key->extensions = ucl_object_ref(extensions);
+ lua_State *L = RSPAMD_LUA_CFG_STATE(cfg);
+ const ucl_object_t *forbidden_ids = ucl_object_lookup(extensions, "forbidden_ids");
+
+ if (forbidden_ids && ucl_object_type(forbidden_ids) == UCL_ARRAY) {
+ key->forbidden_ids = kh_init(fuzzy_key_ids_set);
+ const ucl_object_t *cur;
+ ucl_object_iter_t it = NULL;
+
+ while ((cur = ucl_object_iterate(forbidden_ids, &it, true)) != NULL) {
+ if (ucl_object_type(cur) == UCL_INT || ucl_object_type(cur) == UCL_FLOAT) {
+ int id = ucl_object_toint(cur);
+ int ids_r;
+
+ kh_put(fuzzy_key_ids_set, key->forbidden_ids, id, &ids_r);
+ }
+ }
+ }
+
+ const ucl_object_t *ratelimit = ucl_object_lookup(extensions, "ratelimit");
+
+ static int ratelimit_lua_id = -1;
+
+ if (ratelimit_lua_id == -1) {
+ /* Load ratelimit parsing function */
+ if (!rspamd_lua_require_function(L, "plugins/ratelimit", "parse_limit")) {
+ msg_err_config("cannot load ratelimit parser from ratelimit plugin");
+ }
+ else {
+ ratelimit_lua_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ }
+ }
+
+ if (ratelimit && ratelimit_lua_id != -1) {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, ratelimit_lua_id);
+ lua_pushstring(L, "fuzzy_key_ratelimit");
+ ucl_object_push_lua(L, ratelimit, false);
+
+ if (lua_pcall(L, 2, 1, 0) != 0) {
+ msg_err_config("cannot call ratelimit parser from ratelimit plugin");
+ }
+ else {
+ if (lua_type(L, -1) == LUA_TTABLE) {
+ /* The returned table is in form { rate = xx, burst = yy } */
+ lua_getfield(L, -1, "rate");
+ key->rate = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+
+ lua_getfield(L, -1, "burst");
+ key->burst = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+
+ key->rl_bucket = g_malloc0(sizeof(*key->rl_bucket));
+ }
+ }
+
+ lua_settop(L, 0);
+ }
+
+ const ucl_object_t *expire = ucl_object_lookup(extensions, "expire");
+ if (expire && ucl_object_type(expire) == UCL_STRING) {
+ struct tm tm;
+
+ /* DD-MM-YYYY */
+ char *end = strptime(ucl_object_tostring(expire), "%d-%m-%Y", &tm);
+
+ if (end != NULL && *end != '\0') {
+ msg_err_config("cannot parse expire date: %s", ucl_object_tostring(expire));
+ }
+ else {
+ key->expire = mktime(&tm);
+ }
+ }
+
+ const ucl_object_t *name = ucl_object_lookup(extensions, "name");
+ if (name && ucl_object_type(name) == UCL_STRING) {
+ key->name = g_strdup(ucl_object_tostring(name));
+ }
+
+ /* Check permissions */
+ const ucl_object_t *read_only = ucl_object_lookup(extensions, "read_only");
+ if (read_only && ucl_object_type(read_only) == UCL_BOOLEAN) {
+ if (ucl_object_toboolean(read_only)) {
+ key->flags &= ~(FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
+ }
+ else {
+ key->flags |= (FUZZY_KEY_WRITE | FUZZY_KEY_DELETE);
+ }
+ }
+
+ const ucl_object_t *allowed_ops = ucl_object_lookup(extensions, "allowed_ops");
+ if (allowed_ops && ucl_object_type(allowed_ops) == UCL_ARRAY) {
+ const ucl_object_t *cur;
+ ucl_object_iter_t it = NULL;
+ /* Reset to only allowed */
+ key->flags = 0;
+
+ while ((cur = ucl_object_iterate(allowed_ops, &it, true)) != NULL) {
+ if (ucl_object_type(cur) == UCL_STRING) {
+ const char *op = ucl_object_tostring(cur);
+
+ if (g_ascii_strcasecmp(op, "read") == 0) {
+ key->flags |= FUZZY_KEY_READ;
+ }
+ else if (g_ascii_strcasecmp(op, "write") == 0) {
+ key->flags |= FUZZY_KEY_WRITE;
+ }
+ else if (g_ascii_strcasecmp(op, "delete") == 0) {
+ key->flags |= FUZZY_KEY_DELETE;
+ }
+ else {
+ msg_warn_config("invalid operation: %s", op);
+ }
+ }
+ }
+ }
+ }
+
+ msg_debug("loaded keypair %*bs; expire=%f; rate=%f; burst=%f; name=%s",
+ (int) crypto_box_publickeybytes(), pk,
+ key->expire, key->rate, key->burst, key->name);
+
+ return key;
+}
+
+gboolean
+fuzzy_parse_keypair(rspamd_mempool_t *pool,
+ const ucl_object_t *obj,
+ gpointer ud,
+ struct rspamd_rcl_section *section,
+ GError **err)
+{
+ struct rspamd_rcl_struct_parser *pd = ud;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct fuzzy_key *key;
+ const ucl_object_t *cur;
+ ucl_object_iter_t it = NULL;
+ gboolean ret;
+
+ ctx = pd->user_struct;
+ pd->offset = G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, default_keypair);
+
+ /*
+ * Single key
+ */
+ if (ucl_object_type(obj) == UCL_STRING || ucl_object_type(obj) == UCL_OBJECT) {
+ ret = rspamd_rcl_parse_struct_keypair(pool, obj, pd, section, err);
+
+ if (!ret) {
+ return ret;
+ }
+
+ key = fuzzy_add_keypair_from_ucl(ctx->cfg, obj, ctx->keys);
+
+ if (key == NULL) {
+ return FALSE;
+ }
+
+ /* Use the last one ? */
+ ctx->default_key = key;
+ }
+ else if (ucl_object_type(obj) == UCL_ARRAY) {
+ while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+ if (!fuzzy_parse_keypair(pool, cur, pd, section, err)) {
+ msg_err_pool("cannot parse keypair");
+ }
+ }
+ }
+
+ return TRUE;
+}
--- /dev/null
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Rspamd fuzzy storage server: ratelimits
+ */
+
+#include "config.h"
+
+#include "fuzzy_storage_internal.h"
+
+#include "maps/map_helpers.h"
+#include "rspamd_control.h"
+#include "lua/lua_common.h"
+#include "contrib/uthash/utlist.h"
+
+#include <errno.h>
+#include <math.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/socket.h>
+
+enum rspamd_ratelimit_check_result
+rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ ev_tstamp timestamp,
+ struct rspamd_leaky_bucket_elt *elt,
+ enum rspamd_ratelimit_check_policy policy,
+ double max_burst, double max_rate)
+{
+ gboolean ratelimited = FALSE, new_ratelimit = FALSE;
+
+ /* Nothing to check */
+ if (isnan(max_burst) || isnan(max_rate)) {
+ return ratelimit_pass;
+ }
+
+ if (isnan(elt->cur)) {
+ /* There is an issue with the previous logic: the TTL is updated each time
+ * we see that new bucket. Hence, we need to check the `last` and act accordingly
+ */
+ if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
+ /*
+ * We reset bucket to it's 90% capacity to allow some requests
+ * This should cope with the issue when we block an IP network for some burst and never unblock it
+ */
+ elt->cur = max_burst * 0.9;
+ elt->last = timestamp;
+ }
+ else {
+ ratelimited = TRUE;
+ }
+ }
+ else {
+ /* Update bucket: leak some elements */
+ if (elt->last < timestamp) {
+ elt->cur -= max_rate * (timestamp - elt->last);
+ elt->last = timestamp;
+
+ if (elt->cur < 0) {
+ elt->cur = 0;
+ }
+ }
+ else {
+ elt->last = timestamp;
+ }
+
+ /* Check the bucket */
+ if (elt->cur >= max_burst) {
+
+ if (policy == ratelimit_policy_permanent) {
+ elt->cur = NAN;
+ }
+ new_ratelimit = TRUE;
+ ratelimited = TRUE;
+ }
+ else {
+ elt->cur++; /* Allow one more request */
+ }
+ }
+
+ /* Note: Caller is responsible for calling the ratelimit handlers with
+ * proper context (new vs existing, bucket info, session info, etc.)
+ */
+
+ if (new_ratelimit) {
+ return ratelimit_new;
+ }
+
+ return ratelimited ? ratelimit_existing : ratelimit_pass;
+}
+
+gboolean
+rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ struct rspamd_worker *worker,
+ ev_tstamp timestamp)
+{
+ rspamd_inet_addr_t *masked;
+ struct rspamd_leaky_bucket_elt *elt;
+
+ if (!addr) {
+ return TRUE;
+ }
+
+ if (ctx->ratelimit_whitelist != NULL) {
+ if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
+ addr) != NULL) {
+ return TRUE;
+ }
+ }
+
+ /* Skip ratelimit for local addresses */
+ if (rspamd_inet_address_is_local(addr)) {
+ return TRUE;
+ }
+
+ masked = rspamd_inet_address_copy(addr, NULL);
+
+ if (rspamd_inet_address_get_af(masked) == AF_INET) {
+ rspamd_inet_address_apply_mask(masked,
+ MIN(ctx->leaky_bucket_mask, 32));
+ }
+ else {
+ /* Must be at least /64 */
+ rspamd_inet_address_apply_mask(masked,
+ MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
+ }
+
+ elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
+ (time_t) timestamp);
+
+ if (elt) {
+ enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr,
+ timestamp, elt,
+ ratelimit_policy_permanent,
+ ctx->leaky_bucket_burst,
+ ctx->leaky_bucket_rate);
+
+ if (res == ratelimit_new) {
+ msg_info("ratelimiting %s (%s), %.1f max elts",
+ rspamd_inet_address_to_string(addr),
+ rspamd_inet_address_to_string(masked),
+ ctx->leaky_bucket_burst);
+
+ struct rspamd_srv_command srv_cmd;
+
+ srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
+ srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
+
+ if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
+ socklen_t slen;
+ struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
+
+ if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
+ memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
+ msg_debug("propagating blocked address to other workers");
+ rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+ }
+ else {
+ msg_err("bad address length: %d, expected to be %d",
+ (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+ }
+ }
+
+ if (ctx->lua_blacklist_handlers) {
+ struct rspamd_ratelimit_callback_ctx cb_ctx = {
+ .addr = addr,
+ .reason = "ratelimit",
+ .type = RATELIMIT_EVENT_NEW,
+ .bucket = elt,
+ .max_burst = ctx->leaky_bucket_burst,
+ .max_rate = ctx->leaky_bucket_rate,
+ .session = NULL,
+ };
+ rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
+ }
+ }
+ else if (res == ratelimit_existing) {
+ if (ctx->lua_blacklist_handlers) {
+ struct rspamd_ratelimit_callback_ctx cb_ctx = {
+ .addr = addr,
+ .reason = "ratelimit",
+ .type = RATELIMIT_EVENT_EXISTING,
+ .bucket = elt,
+ .max_burst = ctx->leaky_bucket_burst,
+ .max_rate = ctx->leaky_bucket_rate,
+ .session = NULL,
+ };
+ rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
+ }
+ }
+
+ rspamd_inet_address_free(masked);
+
+ return res == ratelimit_pass;
+ }
+ else {
+ /* New bucket */
+ elt = g_malloc(sizeof(*elt));
+ elt->addr = masked; /* transfer ownership */
+ elt->cur = 1;
+ elt->last = timestamp;
+
+ rspamd_lru_hash_insert(ctx->ratelimit_buckets,
+ masked,
+ elt,
+ timestamp,
+ ctx->leaky_bucket_ttl);
+ }
+
+ return TRUE;
+}
+
+static void
+rspamd_fuzzy_bucket_info_tolua(lua_State *L,
+ const struct rspamd_ratelimit_callback_ctx *cb_ctx)
+{
+ if (!cb_ctx->bucket) {
+ lua_pushnil(L);
+ return;
+ }
+
+ lua_createtable(L, 0, 6);
+
+ /* bucket_level - current fill level (nil if permanently blocked) */
+ if (isnan(cb_ctx->bucket->cur)) {
+ lua_pushnil(L);
+ lua_setfield(L, -2, "bucket_level");
+ lua_pushboolean(L, TRUE);
+ lua_setfield(L, -2, "is_permanent");
+ }
+ else {
+ lua_pushnumber(L, cb_ctx->bucket->cur);
+ lua_setfield(L, -2, "bucket_level");
+ lua_pushboolean(L, FALSE);
+ lua_setfield(L, -2, "is_permanent");
+ }
+
+ /* max_burst */
+ if (!isnan(cb_ctx->max_burst)) {
+ lua_pushnumber(L, cb_ctx->max_burst);
+ lua_setfield(L, -2, "max_burst");
+ }
+
+ /* max_rate */
+ if (!isnan(cb_ctx->max_rate)) {
+ lua_pushnumber(L, cb_ctx->max_rate);
+ lua_setfield(L, -2, "max_rate");
+ }
+
+ /* exceeded_by - how much over the limit */
+ if (!isnan(cb_ctx->bucket->cur) && !isnan(cb_ctx->max_burst) &&
+ cb_ctx->bucket->cur > cb_ctx->max_burst) {
+ lua_pushnumber(L, cb_ctx->bucket->cur - cb_ctx->max_burst);
+ lua_setfield(L, -2, "exceeded_by");
+ }
+
+ /* last_seen */
+ lua_pushnumber(L, cb_ctx->bucket->last);
+ lua_setfield(L, -2, "last_seen");
+}
+
+static void
+rspamd_fuzzy_ratelimit_extensions_tolua(lua_State *L,
+ const struct rspamd_ratelimit_callback_ctx *cb_ctx)
+{
+ struct rspamd_fuzzy_cmd_extension *ext;
+ rspamd_inet_addr_t *addr;
+
+ lua_createtable(L, 0, 2);
+
+ if (!cb_ctx->session || !cb_ctx->session->extensions) {
+ return;
+ }
+
+ LL_FOREACH(cb_ctx->session->extensions, ext)
+ {
+ switch (ext->ext) {
+ case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN:
+ lua_pushlstring(L, (const char *) ext->payload, ext->length);
+ lua_setfield(L, -2, "domain");
+ break;
+ case RSPAMD_FUZZY_EXT_SOURCE_IP4:
+ addr = rspamd_inet_address_new(AF_INET, ext->payload);
+ rspamd_lua_ip_push(L, addr);
+ rspamd_inet_address_free(addr);
+ lua_setfield(L, -2, "source_ip");
+ break;
+ case RSPAMD_FUZZY_EXT_SOURCE_IP6:
+ addr = rspamd_inet_address_new(AF_INET6, ext->payload);
+ rspamd_lua_ip_push(L, addr);
+ rspamd_inet_address_free(addr);
+ lua_setfield(L, -2, "source_ip");
+ break;
+ }
+ }
+}
+
+void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx,
+ const struct rspamd_ratelimit_callback_ctx *cb_ctx)
+{
+ if (ctx->lua_blacklist_handlers == NULL) {
+ return;
+ }
+
+ struct rspamd_lua_fuzzy_script *cur;
+ LL_FOREACH(ctx->lua_blacklist_handlers, cur)
+ {
+ lua_State *L = ctx->cfg->lua_state;
+ int err_idx, ret;
+ const int nargs = 6;
+
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+ lua_checkstack(L, err_idx + nargs + 2);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cur->cbref);
+
+ /* Arg 1: client IP */
+ rspamd_lua_ip_push(L, cb_ctx->addr);
+
+ /* Arg 2: block reason */
+ lua_pushstring(L, cb_ctx->reason);
+
+ /* Arg 3: event type */
+ switch (cb_ctx->type) {
+ case RATELIMIT_EVENT_NEW:
+ lua_pushliteral(L, "new");
+ break;
+ case RATELIMIT_EVENT_EXISTING:
+ lua_pushliteral(L, "existing");
+ break;
+ case RATELIMIT_EVENT_BLACKLIST:
+ lua_pushliteral(L, "blacklist");
+ break;
+ }
+
+ /* Arg 4: ratelimit_info table (or nil) */
+ rspamd_fuzzy_bucket_info_tolua(L, cb_ctx);
+
+ /* Arg 5: digest (or nil) */
+ if (cb_ctx->session) {
+ (void) lua_new_text(L, (const char *) cb_ctx->session->cmd.basic.digest,
+ sizeof(cb_ctx->session->cmd.basic.digest), FALSE);
+ }
+ else {
+ lua_pushnil(L);
+ }
+
+ /* Arg 6: extensions table */
+ rspamd_fuzzy_ratelimit_extensions_tolua(L, cb_ctx);
+
+ if ((ret = lua_pcall(L, nargs, 0, err_idx)) != 0) {
+ msg_err("call to lua_blacklist_cbref "
+ "script failed (%d): %s",
+ ret, lua_tostring(L, -1));
+ }
+
+ lua_settop(L, 0);
+ }
+}
+
+void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ const char *reason)
+{
+ if (ctx->lua_blacklist_handlers == NULL) {
+ return;
+ }
+
+ struct rspamd_ratelimit_callback_ctx cb_ctx = {
+ .addr = addr,
+ .reason = reason,
+ .type = g_strcmp0(reason, "blacklisted") == 0 ? RATELIMIT_EVENT_BLACKLIST : RATELIMIT_EVENT_EXISTING,
+ .bucket = NULL,
+ .max_burst = NAN,
+ .max_rate = NAN,
+ .session = NULL,
+ };
+ rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx);
+}
+
+gboolean
+rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr)
+{
+ if (ctx->blocked_ips != NULL) {
+ if (rspamd_match_radix_map_addr(ctx->blocked_ips,
+ addr) != NULL) {
+
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "blacklisted");
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+void fuzzy_rl_bucket_free(gpointer p)
+{
+ struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) p;
+
+ rspamd_inet_address_free(elt->addr);
+ g_free(elt);
+}
+
+ucl_object_t *
+rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt)
+{
+ ucl_object_t *res;
+
+ res = ucl_object_typed_new(UCL_OBJECT);
+
+ ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->cur), "cur", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->last), "last", 0, false);
+
+ return res;
+}
+
+void rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
+{
+ char path[PATH_MAX];
+
+ rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
+ RSPAMD_DBDIR);
+
+ if (access(path, R_OK) != -1) {
+ struct ucl_parser *parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS);
+ if (ucl_parser_add_file(parser, path)) {
+ ucl_object_t *obj = ucl_parser_get_object(parser);
+ int loaded = 0;
+
+ if (ucl_object_type(obj) == UCL_ARRAY) {
+ ucl_object_iter_t it = NULL;
+ const ucl_object_t *cur;
+
+ while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+ const ucl_object_t *ip, *value, *last;
+ const char *ip_str;
+ double limit_val, last_val;
+
+ ip = ucl_object_find_key(cur, "ip");
+ value = ucl_object_find_key(cur, "value");
+ last = ucl_object_find_key(cur, "last");
+
+ if (ip == NULL || value == NULL || last == NULL) {
+ msg_err("invalid ratelimit object");
+ continue;
+ }
+
+ ip_str = ucl_object_tostring(ip);
+ limit_val = ucl_object_todouble(value);
+ last_val = ucl_object_todouble(last);
+
+ if (ip_str == NULL || isnan(last_val)) {
+ msg_err("invalid ratelimit object");
+ continue;
+ }
+
+ rspamd_inet_addr_t *addr;
+ if (rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str),
+ RSPAMD_INET_ADDRESS_PARSE_NO_UNIX | RSPAMD_INET_ADDRESS_PARSE_NO_PORT)) {
+ struct rspamd_leaky_bucket_elt *elt = g_malloc(sizeof(*elt));
+
+ elt->cur = limit_val;
+ elt->last = last_val;
+ elt->addr = addr;
+ rspamd_lru_hash_insert(ctx->ratelimit_buckets, addr, elt, elt->last, ctx->leaky_bucket_ttl);
+ loaded++;
+ }
+ else {
+ msg_err("invalid ratelimit ip: %s", ip_str);
+ continue;
+ }
+ }
+
+ msg_info("loaded %d ratelimit objects", loaded);
+ }
+
+ ucl_object_unref(obj);
+ }
+
+ ucl_parser_free(parser);
+ }
+}
+
+void rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx)
+{
+ char path[PATH_MAX];
+
+ rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl.new",
+ RSPAMD_DBDIR);
+ FILE *f = fopen(path, "w");
+
+ if (f != NULL) {
+ ucl_object_t *top = ucl_object_typed_new(UCL_ARRAY);
+ int it = 0;
+ gpointer k, v;
+
+ ucl_object_reserve(top, rspamd_lru_hash_size(ctx->ratelimit_buckets));
+
+ while ((it = rspamd_lru_hash_foreach(ctx->ratelimit_buckets, it, &k, &v)) != -1) {
+ ucl_object_t *cur = ucl_object_typed_new(UCL_OBJECT);
+ struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) v;
+
+ ucl_object_insert_key(cur, ucl_object_fromdouble(elt->cur), "value", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromdouble(elt->last), "last", 0, false);
+ ucl_object_insert_key(cur, ucl_object_fromstring(rspamd_inet_address_to_string(elt->addr)), "ip", 0, false);
+ ucl_array_append(top, cur);
+ }
+
+ if (ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, ucl_object_emit_file_funcs(f), NULL)) {
+ char npath[PATH_MAX];
+
+ fflush(f);
+ rspamd_snprintf(npath, sizeof(npath), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl",
+ RSPAMD_DBDIR);
+
+ if (rename(path, npath) == -1) {
+ msg_warn("cannot rename %s to %s: %s", path, npath, strerror(errno));
+ }
+ else {
+ msg_info("saved %d ratelimits in %s", rspamd_lru_hash_size(ctx->ratelimit_buckets), npath);
+ }
+ }
+ else {
+ msg_warn("cannot serialize ratelimit buckets to %s: %s", path, strerror(errno));
+ }
+
+ fclose(f);
+ ucl_object_unref(top);
+ }
+ else {
+ msg_warn("cannot save ratelimit buckets to %s: %s", path, strerror(errno));
+ }
+}
--- /dev/null
+/*
+ * Copyright 2026 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "rspamd.h"
+#include "util.h"
+#include "rspamd_control.h"
+#include "libserver/worker_util.h"
+#include "fuzzy_backend/fuzzy_backend.h"
+#include "fuzzy_storage_internal.h"
+#include "fuzzy_wire.h"
+#include "libcryptobox/keypair.h"
+#include "unix-std.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+
+ucl_object_t *
+rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat)
+{
+ ucl_object_t *res;
+
+ res = ucl_object_typed_new(UCL_OBJECT);
+
+ ucl_object_insert_key(res, ucl_object_fromint(key_stat->checked),
+ "checked", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->checked_ctr.mean),
+ "checked_per_hour", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(key_stat->matched),
+ "matched", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->matched_ctr.mean),
+ "matched_per_hour", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(key_stat->added),
+ "added", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(key_stat->deleted),
+ "deleted", 0, false);
+ ucl_object_insert_key(res, ucl_object_fromint(key_stat->errors),
+ "errors", 0, false);
+
+ return res;
+}
+
+void rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter,
+ struct fuzzy_key *fuzzy_key,
+ ucl_object_t *keys_obj,
+ gboolean ip_stat)
+{
+ struct fuzzy_key_stat *key_stat = fuzzy_key->stat;
+ char keyname[17];
+
+ if (key_stat) {
+ rspamd_snprintf(keyname, sizeof(keyname), "%8bs", pk_iter);
+
+ ucl_object_t *elt = rspamd_fuzzy_storage_stat_key(key_stat);
+
+ if (key_stat->last_ips && ip_stat) {
+ int i = 0;
+ ucl_object_t *ip_elt = ucl_object_typed_new(UCL_OBJECT);
+ gpointer k, v;
+
+ while ((i = rspamd_lru_hash_foreach(key_stat->last_ips,
+ i, &k, &v)) != -1) {
+ ucl_object_t *ip_cur = rspamd_fuzzy_storage_stat_key(v);
+ ucl_object_insert_key(ip_elt, ip_cur,
+ rspamd_inet_address_to_string(k), 0, true);
+ }
+ ucl_object_insert_key(elt, ip_elt, "ips", 0, false);
+ }
+
+ int flag;
+ struct fuzzy_key_stat *flag_stat;
+ ucl_object_t *flags_ucl = ucl_object_typed_new(UCL_OBJECT);
+
+ kh_foreach_key_value_ptr(fuzzy_key->flags_stat, flag, flag_stat, {
+ char intbuf[16];
+ rspamd_snprintf(intbuf, sizeof(intbuf), "%d", flag);
+ ucl_object_insert_key(flags_ucl, rspamd_fuzzy_storage_stat_key(flag_stat),
+ intbuf, 0, true);
+ });
+
+ ucl_object_insert_key(elt, flags_ucl, "flags", 0, false);
+
+ ucl_object_insert_key(elt,
+ rspamd_keypair_to_ucl(fuzzy_key->key, RSPAMD_KEYPAIR_ENCODING_DEFAULT,
+ RSPAMD_KEYPAIR_DUMP_NO_SECRET | RSPAMD_KEYPAIR_DUMP_FLATTENED),
+ "keypair", 0, false);
+
+ if (fuzzy_key->rl_bucket) {
+ ucl_object_insert_key(elt,
+ rspamd_leaky_bucket_to_ucl(fuzzy_key->rl_bucket),
+ "ratelimit", 0, false);
+ }
+
+ ucl_object_insert_key(keys_obj, elt, keyname, 0, true);
+ }
+}
+
+ucl_object_t *
+rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat)
+{
+ struct fuzzy_key *fuzzy_key;
+ ucl_object_t *obj, *keys_obj, *elt, *ip_elt;
+ const unsigned char *pk_iter;
+
+ obj = ucl_object_typed_new(UCL_OBJECT);
+
+ keys_obj = ucl_object_typed_new(UCL_OBJECT);
+
+ kh_foreach(ctx->keys, pk_iter, fuzzy_key, {
+ rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
+ });
+
+ if (ctx->dynamic_keys) {
+ kh_foreach(ctx->dynamic_keys, pk_iter, fuzzy_key, {
+ rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat);
+ });
+ }
+
+ ucl_object_insert_key(obj, keys_obj, "keys", 0, false);
+
+ /* Now generic stats */
+ ucl_object_insert_key(obj,
+ ucl_object_fromint(ctx->stat.fuzzy_hashes),
+ "fuzzy_stored",
+ 0,
+ false);
+ ucl_object_insert_key(obj,
+ ucl_object_fromint(ctx->stat.fuzzy_hashes_expired),
+ "fuzzy_expired",
+ 0,
+ false);
+ ucl_object_insert_key(obj,
+ ucl_object_fromint(ctx->stat.invalid_requests),
+ "invalid_requests",
+ 0,
+ false);
+ ucl_object_insert_key(obj,
+ ucl_object_fromint(ctx->stat.delayed_hashes),
+ "delayed_hashes",
+ 0,
+ false);
+
+ if (ctx->errors_ips && ip_stat) {
+ gpointer k, v;
+ int i = 0;
+ ip_elt = ucl_object_typed_new(UCL_OBJECT);
+
+ while ((i = rspamd_lru_hash_foreach(ctx->errors_ips, i, &k, &v)) != -1) {
+ ucl_object_insert_key(ip_elt,
+ ucl_object_fromint(*(uint64_t *) v),
+ rspamd_inet_address_to_string(k), 0, true);
+ }
+
+ ucl_object_insert_key(obj,
+ ip_elt,
+ "errors_ips",
+ 0,
+ false);
+ }
+
+ /* Checked by epoch */
+ elt = ucl_object_typed_new(UCL_ARRAY);
+
+ for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
+ ucl_array_append(elt,
+ ucl_object_fromint(ctx->stat.fuzzy_hashes_checked[i]));
+ }
+
+ ucl_object_insert_key(obj, elt, "fuzzy_checked", 0, false);
+
+ /* Shingles by epoch */
+ elt = ucl_object_typed_new(UCL_ARRAY);
+
+ for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
+ ucl_array_append(elt,
+ ucl_object_fromint(ctx->stat.fuzzy_shingles_checked[i]));
+ }
+
+ ucl_object_insert_key(obj, elt, "fuzzy_shingles", 0, false);
+
+ /* Matched by epoch */
+ elt = ucl_object_typed_new(UCL_ARRAY);
+
+ for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
+ ucl_array_append(elt,
+ ucl_object_fromint(ctx->stat.fuzzy_hashes_found[i]));
+ }
+
+ ucl_object_insert_key(obj, elt, "fuzzy_found", 0, false);
+
+
+ return obj;
+}
+
+gboolean
+rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, int fd,
+ int attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+ struct rspamd_control_reply rep;
+ ucl_object_t *obj;
+ struct ucl_emitter_functions *emit_subr;
+ unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+
+ int outfd = -1;
+ char tmppath[PATH_MAX];
+
+ memset(&rep, 0, sizeof(rep));
+ rep.type = RSPAMD_CONTROL_FUZZY_STAT;
+ rep.id = cmd->id;
+
+ rspamd_snprintf(tmppath, sizeof(tmppath), "%s%c%s-XXXXXXXXXX",
+ rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat");
+
+ if ((outfd = mkstemp(tmppath)) == -1) {
+ rep.reply.fuzzy_stat.status = errno;
+ msg_info_main("cannot make temporary stat file for fuzzy stat: %s",
+ strerror(errno));
+ }
+ else {
+ const char *backend_id;
+
+ rep.reply.fuzzy_stat.status = 0;
+
+ backend_id = rspamd_fuzzy_backend_id(ctx->backend);
+ if (backend_id) {
+ memcpy(rep.reply.fuzzy_stat.storage_id,
+ backend_id,
+ sizeof(rep.reply.fuzzy_stat.storage_id));
+ }
+
+ obj = rspamd_fuzzy_stat_to_ucl(ctx, TRUE);
+ emit_subr = ucl_object_emit_fd_funcs(outfd);
+ ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL);
+ ucl_object_emit_funcs_free(emit_subr);
+ ucl_object_unref(obj);
+ /* Rewind output file */
+ close(outfd);
+ outfd = open(tmppath, O_RDONLY);
+ unlink(tmppath);
+ }
+
+ /* Now we can send outfd and status message */
+ memset(&msg, 0, sizeof(msg));
+
+ /* Attach fd to the message */
+ if (outfd != -1) {
+ memset(fdspace, 0, sizeof(fdspace));
+ msg.msg_control = fdspace;
+ msg.msg_controllen = sizeof(fdspace);
+ cmsg = CMSG_FIRSTHDR(&msg);
+
+ if (cmsg) {
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(cmsg), &outfd, sizeof(int));
+ }
+ }
+
+ iov.iov_base = &rep;
+ iov.iov_len = sizeof(rep);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (sendmsg(fd, &msg, 0) == -1) {
+ msg_err_main("cannot send fuzzy stat: %s", strerror(errno));
+ }
+
+ if (outfd != -1) {
+ close(outfd);
+ }
+
+ return TRUE;
+}