From: Vsevolod Stakhov Date: Thu, 5 Feb 2026 15:38:07 +0000 (+0000) Subject: [Refactor] fuzzy storage: split helper code (#5875) X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;ds=sidebyside;p=thirdparty%2Frspamd.git [Refactor] fuzzy storage: split helper code (#5875) --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 0aab908e5..1b649de3c 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -19,6 +19,7 @@ #include "config.h" #include "libserver/fuzzy_wire.h" +#include "libserver/fuzzy_storage_internal.h" #include "util.h" #include "rspamd.h" #include "libserver/maps/map.h" @@ -75,176 +76,8 @@ worker_t fuzzy_worker = { RSPAMD_WORKER_VER /* Version info */ }; -struct fuzzy_global_stat { - uint64_t fuzzy_hashes; - /**< number of fuzzy hashes stored */ - uint64_t fuzzy_hashes_expired; - /**< number of fuzzy hashes expired */ - uint64_t fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX]; - /**< amount of check requests for each epoch */ - uint64_t fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX]; - /**< amount of shingle check requests for each epoch */ - uint64_t fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX]; - /**< amount of invalid requests */ - uint64_t invalid_requests; - /**< amount of delayed hashes found */ - uint64_t delayed_hashes; -}; - -struct fuzzy_key_stat { - uint64_t checked; - uint64_t matched; - uint64_t added; - uint64_t deleted; - uint64_t errors; - /* Store averages for checked/matched per minute */ - struct rspamd_counter_data checked_ctr; - struct rspamd_counter_data matched_ctr; - double last_checked_time; - uint64_t last_checked_count; - uint64_t last_matched_count; - struct rspamd_cryptobox_keypair *keypair; - rspamd_lru_hash_t *last_ips; - - ref_entry_t ref; -}; - -struct rspamd_leaky_bucket_elt { - rspamd_inet_addr_t *addr; - double last; - double cur; -}; - static const uint64_t rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL; -static int64_t -fuzzy_kp_hash(const unsigned char *p) -{ - int64_t res; - - memcpy(&res, p, sizeof(res)); - return res; -} -static bool -fuzzy_kp_equal(gconstpointer a, gconstpointer b) -{ - const unsigned char *pa = a, *pb = b; - - return (memcmp(pa, pb, RSPAMD_FUZZY_KEYLEN) == 0); -} - -enum fuzzy_key_op { - FUZZY_KEY_READ = 0x1u << 0, - FUZZY_KEY_WRITE = 0x1u << 1, - FUZZY_KEY_DELETE = 0x1u << 2, -}; -KHASH_SET_INIT_INT(fuzzy_key_ids_set); -KHASH_INIT(fuzzy_key_flag_stat, int, struct fuzzy_key_stat, 1, kh_int_hash_func, - kh_int_hash_equal); -struct fuzzy_key { - char *name; - struct rspamd_cryptobox_keypair *key; - struct rspamd_cryptobox_pubkey *pk; - struct fuzzy_key_stat *stat; - khash_t(fuzzy_key_flag_stat) * flags_stat; - khash_t(fuzzy_key_ids_set) * forbidden_ids; - struct rspamd_leaky_bucket_elt *rl_bucket; - ucl_object_t *extensions; - double burst; - double rate; - ev_tstamp expire; - bool expired; - int flags; /* enum fuzzy_key_op */ - ref_entry_t ref; -}; - -KHASH_INIT(rspamd_fuzzy_keys_hash, - const unsigned char *, struct fuzzy_key *, 1, - fuzzy_kp_hash, fuzzy_kp_equal); - -struct rspamd_lua_fuzzy_script { - int cbref; - struct rspamd_lua_fuzzy_script *next; -}; - -struct rspamd_fuzzy_storage_ctx { - uint64_t magic; - /* Events base */ - struct ev_loop *event_loop; - /* DNS resolver */ - struct rspamd_dns_resolver *resolver; - /* Config */ - struct rspamd_config *cfg; - /* END OF COMMON PART */ - struct fuzzy_global_stat stat; - double expire; - double sync_timeout; - double delay; - double tcp_timeout; - struct rspamd_radix_map_helper *update_ips; - struct rspamd_hash_map_helper *update_keys; - struct rspamd_radix_map_helper *blocked_ips; - struct rspamd_radix_map_helper *ratelimit_whitelist; - struct rspamd_radix_map_helper *delay_whitelist; - - const ucl_object_t *update_map; - const ucl_object_t *update_keys_map; - const ucl_object_t *delay_whitelist_map; - const ucl_object_t *blocked_map; - const ucl_object_t *ratelimit_whitelist_map; - const ucl_object_t *dynamic_keys_map; - - unsigned int keypair_cache_size; - ev_timer stat_ev; - ev_io peer_ev; - - /* Local keypair */ - struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */ - struct fuzzy_key *default_key; - khash_t(rspamd_fuzzy_keys_hash) * keys; - /* Those are loaded via map */ - khash_t(rspamd_fuzzy_keys_hash) * dynamic_keys; - - gboolean encrypted_only; - gboolean read_only; - gboolean dedicated_update_worker; - struct rspamd_keypair_cache *keypair_cache; - struct rspamd_http_context *http_ctx; - rspamd_lru_hash_t *errors_ips; - rspamd_lru_hash_t *ratelimit_buckets; - struct rspamd_fuzzy_backend *backend; - GArray *updates_pending; - unsigned int updates_failed; - unsigned int updates_maxfail; - /* Used to send data between workers */ - int peer_fd; - - /* Ratelimits */ - unsigned int leaky_bucket_ttl; - unsigned int leaky_bucket_mask; - unsigned int max_buckets; - gboolean ratelimit_log_only; - double leaky_bucket_burst; - double leaky_bucket_rate; - - struct rspamd_worker *worker; - const ucl_object_t *skip_map; - struct rspamd_hash_map_helper *skip_hashes; - struct rspamd_lua_fuzzy_script *lua_pre_handlers; - struct rspamd_lua_fuzzy_script *lua_post_handlers; - struct rspamd_lua_fuzzy_script *lua_blacklist_handlers; - khash_t(fuzzy_key_ids_set) * default_forbidden_ids; - /* Ids that should not override other ids */ - khash_t(fuzzy_key_ids_set) * weak_ids; -}; - -enum fuzzy_cmd_type { - CMD_NORMAL, - CMD_SHINGLE, - CMD_ENCRYPTED_NORMAL, - CMD_ENCRYPTED_SHINGLE -}; - struct rspamd_fuzzy_tcp_frame { uint16_t size_hdr; /* We have to write this as well */ struct rspamd_fuzzy_encrypted_reply payload; /* Payload */ @@ -301,30 +134,6 @@ struct fuzzy_udp_session { ref_entry_t ref; }; -/* Legacy structure name for compatibility during refactoring */ -struct fuzzy_session { - struct rspamd_worker *worker; - rspamd_inet_addr_t *addr; - struct rspamd_fuzzy_storage_ctx *ctx; - - struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */ - struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */ - struct fuzzy_key_stat *ip_stat; - - enum rspamd_fuzzy_epoch epoch; - enum fuzzy_cmd_type cmd_type; - int fd; - ev_tstamp timestamp; - struct ev_io io; - ref_entry_t ref; - struct fuzzy_key *key; - struct rspamd_fuzzy_cmd_extension *extensions; - unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; - - /* If this is a TCP session, this pointer will be set */ - struct fuzzy_tcp_session *tcp_session; -}; - struct fuzzy_peer_request { ev_io io_ev; struct fuzzy_peer_cmd cmd; @@ -337,598 +146,15 @@ struct rspamd_updates_cbdata { gboolean final; }; -enum rspamd_ratelimit_event_type { - RATELIMIT_EVENT_NEW, - RATELIMIT_EVENT_EXISTING, - RATELIMIT_EVENT_BLACKLIST, -}; - -struct rspamd_ratelimit_callback_ctx { - rspamd_inet_addr_t *addr; /* Client IP */ - const char *reason; /* "ratelimit" or "blacklisted" */ - enum rspamd_ratelimit_event_type type; /* new, existing, blacklist */ - - /* Rate limit bucket state (optional) */ - struct rspamd_leaky_bucket_elt *bucket; - double max_burst; - double max_rate; - - /* Session context (optional - only for per-key limits) */ - struct fuzzy_session *session; -}; - static void rspamd_fuzzy_write_reply(struct fuzzy_session *session); static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session); static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply); static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx, const char *source, gboolean final); -static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, - rspamd_inet_addr_t *addr); -static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx, - rspamd_inet_addr_t *addr, - const char *reason); -static void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx, - const struct rspamd_ratelimit_callback_ctx *cb_ctx); -static struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, - const ucl_object_t *obj, - khash_t(rspamd_fuzzy_keys_hash) * target); static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents); static void accept_tcp_socket(EV_P_ ev_io *w, int revents); -static ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt); -struct fuzzy_keymap_ucl_buf { - rspamd_fstring_t *buf; - struct rspamd_fuzzy_storage_ctx *ctx; -}; - -/* Callbacks for reading json dynamic rules */ -static char * -ucl_keymap_read_cb(char *chunk, - int len, - struct map_cb_data *data, - gboolean final) -{ - struct fuzzy_keymap_ucl_buf *jb, *pd; - - pd = data->prev_data; - - g_assert(pd != NULL); - - if (data->cur_data == NULL) { - jb = g_malloc0(sizeof(*jb)); - jb->ctx = pd->ctx; - data->cur_data = jb; - } - else { - jb = data->cur_data; - } - - if (jb->buf == NULL) { - /* Allocate memory for buffer */ - jb->buf = rspamd_fstring_sized_new(MAX(len, 4096)); - } - - jb->buf = rspamd_fstring_append(jb->buf, chunk, len); - - return NULL; -} - -static void -ucl_keymap_fin_cb(struct map_cb_data *data, void **target) -{ - struct fuzzy_keymap_ucl_buf *jb; - ucl_object_t *top; - struct ucl_parser *parser; - struct rspamd_config *cfg; - - /* Now parse ucl */ - if (data->cur_data) { - jb = data->cur_data; - cfg = jb->ctx->cfg; - } - else { - msg_err("no cur data in the map! might be a bug"); - return; - } - - if (jb->buf->len == 0) { - msg_err_config("no data read"); - - return; - } - - parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); - - if (!ucl_parser_add_chunk(parser, jb->buf->str, jb->buf->len)) { - msg_err_config("cannot load ucl data: parse error %s", - ucl_parser_get_error(parser)); - ucl_parser_free(parser); - return; - } - - top = ucl_parser_get_object(parser); - ucl_parser_free(parser); - - if (ucl_object_type(top) != UCL_ARRAY) { - ucl_object_unref(top); - msg_err_config("loaded ucl is not an array"); - return; - } - - if (target) { - *target = data->cur_data; - } - - if (data->prev_data) { - jb = data->prev_data; - /* Clean prev data */ - if (jb->buf) { - rspamd_fstring_free(jb->buf); - } - - /* Clean the existing keys */ - struct fuzzy_key *key; - kh_foreach_value(jb->ctx->dynamic_keys, key, { - REF_RELEASE(key); - }); - kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys); - - /* Insert new keys */ - const ucl_object_t *cur; - ucl_object_iter_t it = NULL; - int success = 0; - - while ((cur = ucl_object_iterate(top, &it, true)) != NULL) { - struct fuzzy_key *nk; - - nk = fuzzy_add_keypair_from_ucl(cfg, cur, jb->ctx->dynamic_keys); - - if (nk == NULL) { - msg_warn_config("cannot add dynamic keypair"); - } - success++; - } - - msg_info_config("loaded %d dynamic keypairs", success); - - g_free(jb); - } - - ucl_object_unref(top); -} - -static void -ucl_keymap_dtor_cb(struct map_cb_data *data) -{ - struct fuzzy_keymap_ucl_buf *jb; - - if (data->cur_data) { - jb = data->cur_data; - /* Clean prev data */ - if (jb->buf) { - rspamd_fstring_free(jb->buf); - } - - struct fuzzy_key *key; - kh_foreach_value(jb->ctx->dynamic_keys, key, { - REF_RELEASE(key); - }); - /* Clear hash content but don't destroy - mempool destructor will handle it */ - kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys); - - g_free(jb); - } -} - -enum rspamd_ratelimit_check_result { - ratelimit_pass, - ratelimit_new, - ratelimit_existing, -}; - -enum rspamd_ratelimit_check_policy { - ratelimit_policy_permanent, - ratelimit_policy_normal, -}; - -static enum rspamd_ratelimit_check_result -rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx, - rspamd_inet_addr_t *addr, - ev_tstamp timestamp, - struct rspamd_leaky_bucket_elt *elt, - enum rspamd_ratelimit_check_policy policy, - double max_burst, double max_rate) -{ - gboolean ratelimited = FALSE, new_ratelimit = FALSE; - - /* Nothing to check */ - if (isnan(max_burst) || isnan(max_rate)) { - return ratelimit_pass; - } - - if (isnan(elt->cur)) { - /* There is an issue with the previous logic: the TTL is updated each time - * we see that new bucket. Hence, we need to check the `last` and act accordingly - */ - if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) { - /* - * We reset bucket to it's 90% capacity to allow some requests - * This should cope with the issue when we block an IP network for some burst and never unblock it - */ - elt->cur = max_burst * 0.9; - elt->last = timestamp; - } - else { - ratelimited = TRUE; - } - } - else { - /* Update bucket: leak some elements */ - if (elt->last < timestamp) { - elt->cur -= max_rate * (timestamp - elt->last); - elt->last = timestamp; - - if (elt->cur < 0) { - elt->cur = 0; - } - } - else { - elt->last = timestamp; - } - - /* Check the bucket */ - if (elt->cur >= max_burst) { - - if (policy == ratelimit_policy_permanent) { - elt->cur = NAN; - } - new_ratelimit = TRUE; - ratelimited = TRUE; - } - else { - elt->cur++; /* Allow one more request */ - } - } - - /* Note: Caller is responsible for calling the ratelimit handlers with - * proper context (new vs existing, bucket info, session info, etc.) - */ - - if (new_ratelimit) { - return ratelimit_new; - } - - return ratelimited ? ratelimit_existing : ratelimit_pass; -} - -static gboolean -rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx, - rspamd_inet_addr_t *addr, - struct rspamd_worker *worker, - ev_tstamp timestamp) -{ - rspamd_inet_addr_t *masked; - struct rspamd_leaky_bucket_elt *elt; - - if (!addr) { - return TRUE; - } - - if (ctx->ratelimit_whitelist != NULL) { - if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist, - addr) != NULL) { - return TRUE; - } - } - - /* Skip ratelimit for local addresses */ - if (rspamd_inet_address_is_local(addr)) { - return TRUE; - } - - masked = rspamd_inet_address_copy(addr, NULL); - - if (rspamd_inet_address_get_af(masked) == AF_INET) { - rspamd_inet_address_apply_mask(masked, - MIN(ctx->leaky_bucket_mask, 32)); - } - else { - /* Must be at least /64 */ - rspamd_inet_address_apply_mask(masked, - MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128)); - } - - elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked, - (time_t) timestamp); - - if (elt) { - enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr, - timestamp, elt, - ratelimit_policy_permanent, - ctx->leaky_bucket_burst, - ctx->leaky_bucket_rate); - - if (res == ratelimit_new) { - msg_info("ratelimiting %s (%s), %.1f max elts", - rspamd_inet_address_to_string(addr), - rspamd_inet_address_to_string(masked), - ctx->leaky_bucket_burst); - - struct rspamd_srv_command srv_cmd; - - srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED; - srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked); - - if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) { - socklen_t slen; - struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen); - - if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) { - memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen); - msg_debug("propagating blocked address to other workers"); - rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); - } - else { - msg_err("bad address length: %d, expected to be %d", - (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); - } - } - - if (ctx->lua_blacklist_handlers) { - struct rspamd_ratelimit_callback_ctx cb_ctx = { - .addr = addr, - .reason = "ratelimit", - .type = RATELIMIT_EVENT_NEW, - .bucket = elt, - .max_burst = ctx->leaky_bucket_burst, - .max_rate = ctx->leaky_bucket_rate, - .session = NULL, - }; - rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx); - } - } - else if (res == ratelimit_existing) { - if (ctx->lua_blacklist_handlers) { - struct rspamd_ratelimit_callback_ctx cb_ctx = { - .addr = addr, - .reason = "ratelimit", - .type = RATELIMIT_EVENT_EXISTING, - .bucket = elt, - .max_burst = ctx->leaky_bucket_burst, - .max_rate = ctx->leaky_bucket_rate, - .session = NULL, - }; - rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx); - } - } - - rspamd_inet_address_free(masked); - - return res == ratelimit_pass; - } - else { - /* New bucket */ - elt = g_malloc(sizeof(*elt)); - elt->addr = masked; /* transfer ownership */ - elt->cur = 1; - elt->last = timestamp; - - rspamd_lru_hash_insert(ctx->ratelimit_buckets, - masked, - elt, - timestamp, - ctx->leaky_bucket_ttl); - } - - return TRUE; -} - -/* - * Push bucket info as a Lua table - */ -static void -rspamd_fuzzy_bucket_info_tolua(lua_State *L, - const struct rspamd_ratelimit_callback_ctx *cb_ctx) -{ - if (!cb_ctx->bucket) { - lua_pushnil(L); - return; - } - - lua_createtable(L, 0, 6); - - /* bucket_level - current fill level (nil if permanently blocked) */ - if (isnan(cb_ctx->bucket->cur)) { - lua_pushnil(L); - lua_setfield(L, -2, "bucket_level"); - lua_pushboolean(L, TRUE); - lua_setfield(L, -2, "is_permanent"); - } - else { - lua_pushnumber(L, cb_ctx->bucket->cur); - lua_setfield(L, -2, "bucket_level"); - lua_pushboolean(L, FALSE); - lua_setfield(L, -2, "is_permanent"); - } - - /* max_burst */ - if (!isnan(cb_ctx->max_burst)) { - lua_pushnumber(L, cb_ctx->max_burst); - lua_setfield(L, -2, "max_burst"); - } - - /* max_rate */ - if (!isnan(cb_ctx->max_rate)) { - lua_pushnumber(L, cb_ctx->max_rate); - lua_setfield(L, -2, "max_rate"); - } - - /* exceeded_by - how much over the limit */ - if (!isnan(cb_ctx->bucket->cur) && !isnan(cb_ctx->max_burst) && - cb_ctx->bucket->cur > cb_ctx->max_burst) { - lua_pushnumber(L, cb_ctx->bucket->cur - cb_ctx->max_burst); - lua_setfield(L, -2, "exceeded_by"); - } - - /* last_seen */ - lua_pushnumber(L, cb_ctx->bucket->last); - lua_setfield(L, -2, "last_seen"); -} - -/* - * Push extensions from session or callback context as a Lua table - */ -static void -rspamd_fuzzy_ratelimit_extensions_tolua(lua_State *L, - const struct rspamd_ratelimit_callback_ctx *cb_ctx) -{ - struct rspamd_fuzzy_cmd_extension *ext; - rspamd_inet_addr_t *addr; - - lua_createtable(L, 0, 2); - - if (!cb_ctx->session || !cb_ctx->session->extensions) { - return; - } - - LL_FOREACH(cb_ctx->session->extensions, ext) - { - switch (ext->ext) { - case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN: - lua_pushlstring(L, (const char *) ext->payload, ext->length); - lua_setfield(L, -2, "domain"); - break; - case RSPAMD_FUZZY_EXT_SOURCE_IP4: - addr = rspamd_inet_address_new(AF_INET, ext->payload); - rspamd_lua_ip_push(L, addr); - rspamd_inet_address_free(addr); - lua_setfield(L, -2, "source_ip"); - break; - case RSPAMD_FUZZY_EXT_SOURCE_IP6: - addr = rspamd_inet_address_new(AF_INET6, ext->payload); - rspamd_lua_ip_push(L, addr); - rspamd_inet_address_free(addr); - lua_setfield(L, -2, "source_ip"); - break; - } - } -} - -/* - * Enhanced Lua callback for ratelimit/blacklist events - * Passes 7 arguments to Lua: - * 1. ip (rspamd_ip) - Client IP address - * 2. reason (string) - "ratelimit" or "blacklisted" - * 3. event_type (string) - "new", "existing", or "blacklist" - * 4. ratelimit_info (table/nil) - Bucket state details - * 5. digest (rspamd_text/nil) - Hash if session available - * 6. extensions (table) - Domain, source IP from extensions - */ -static void -rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx, - const struct rspamd_ratelimit_callback_ctx *cb_ctx) -{ - if (ctx->lua_blacklist_handlers == NULL) { - return; - } - - struct rspamd_lua_fuzzy_script *cur; - LL_FOREACH(ctx->lua_blacklist_handlers, cur) - { - lua_State *L = ctx->cfg->lua_state; - int err_idx, ret; - const int nargs = 6; - - lua_pushcfunction(L, &rspamd_lua_traceback); - err_idx = lua_gettop(L); - lua_checkstack(L, err_idx + nargs + 2); - lua_rawgeti(L, LUA_REGISTRYINDEX, cur->cbref); - - /* Arg 1: client IP */ - rspamd_lua_ip_push(L, cb_ctx->addr); - - /* Arg 2: block reason */ - lua_pushstring(L, cb_ctx->reason); - - /* Arg 3: event type */ - switch (cb_ctx->type) { - case RATELIMIT_EVENT_NEW: - lua_pushliteral(L, "new"); - break; - case RATELIMIT_EVENT_EXISTING: - lua_pushliteral(L, "existing"); - break; - case RATELIMIT_EVENT_BLACKLIST: - lua_pushliteral(L, "blacklist"); - break; - } - - /* Arg 4: ratelimit_info table (or nil) */ - rspamd_fuzzy_bucket_info_tolua(L, cb_ctx); - - /* Arg 5: digest (or nil) */ - if (cb_ctx->session) { - (void) lua_new_text(L, (const char *) cb_ctx->session->cmd.basic.digest, - sizeof(cb_ctx->session->cmd.basic.digest), FALSE); - } - else { - lua_pushnil(L); - } - - /* Arg 6: extensions table */ - rspamd_fuzzy_ratelimit_extensions_tolua(L, cb_ctx); - - if ((ret = lua_pcall(L, nargs, 0, err_idx)) != 0) { - msg_err("call to lua_blacklist_cbref " - "script failed (%d): %s", - ret, lua_tostring(L, -1)); - } - - lua_settop(L, 0); - } -} - -/* - * Backwards-compatible wrapper that calls the enhanced handler - */ -static void -rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx, - rspamd_inet_addr_t *addr, - const char *reason) -{ - if (ctx->lua_blacklist_handlers == NULL) { - return; - } - - struct rspamd_ratelimit_callback_ctx cb_ctx = { - .addr = addr, - .reason = reason, - .type = g_strcmp0(reason, "blacklisted") == 0 ? RATELIMIT_EVENT_BLACKLIST : RATELIMIT_EVENT_EXISTING, - .bucket = NULL, - .max_burst = NAN, - .max_rate = NAN, - .session = NULL, - }; - rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx); -} - -static gboolean -rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, - rspamd_inet_addr_t *addr) -{ - if (ctx->blocked_ips != NULL) { - if (rspamd_match_radix_map_addr(ctx->blocked_ips, - addr) != NULL) { - - rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "blacklisted"); - return FALSE; - } - } - - return TRUE; -} - static gboolean rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx, rspamd_inet_addr_t *addr, @@ -973,88 +199,15 @@ rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx, } if (key) { - if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) { - return TRUE; - } - else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) { - return TRUE; - } - } - - return FALSE; -} - -static void -fuzzy_key_stat_dtor(gpointer p) -{ - struct fuzzy_key_stat *st = p; - - if (st->last_ips) { - rspamd_lru_hash_destroy(st->last_ips); - } - - if (st->keypair) { - rspamd_keypair_unref(st->keypair); - } - - g_free(st); -} - -static void -fuzzy_key_stat_unref(gpointer p) -{ - struct fuzzy_key_stat *st = p; - - REF_RELEASE(st); -} - -static void -fuzzy_key_dtor(gpointer p) -{ - struct fuzzy_key *key = p; - - if (key) { - if (key->key) { - rspamd_keypair_unref(key->key); - } - - if (key->stat) { - REF_RELEASE(key->stat); - } - - if (key->flags_stat) { - kh_destroy(fuzzy_key_flag_stat, key->flags_stat); - } - - if (key->forbidden_ids) { - kh_destroy(fuzzy_key_ids_set, key->forbidden_ids); - } - - if (key->rl_bucket) { - /* TODO: save bucket stats */ - g_free(key->rl_bucket); - } - - if (key->name) { - g_free(key->name); + if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) { + return TRUE; } - - if (key->extensions) { - ucl_object_unref(key->extensions); + else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) { + return TRUE; } - - g_free(key); } -} -static void -fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash) -{ - struct fuzzy_key *key; - kh_foreach_value(hash, key, { - REF_RELEASE(key); - }); - kh_destroy(rspamd_fuzzy_keys_hash, hash); + return FALSE; } static void @@ -1065,15 +218,6 @@ fuzzy_count_callback(uint64_t count, void *ud) ctx->stat.fuzzy_hashes = count; } -static void -fuzzy_rl_bucket_free(gpointer p) -{ - struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) p; - - rspamd_inet_address_free(elt->addr); - g_free(elt); -} - static void fuzzy_stat_count_callback(uint64_t count, void *ud) { @@ -3292,313 +2436,6 @@ rspamd_fuzzy_storage_reload(struct rspamd_main *rspamd_main, return TRUE; } -static ucl_object_t * -rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat) -{ - ucl_object_t *res; - - res = ucl_object_typed_new(UCL_OBJECT); - - ucl_object_insert_key(res, ucl_object_fromint(key_stat->checked), - "checked", 0, false); - ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->checked_ctr.mean), - "checked_per_hour", 0, false); - ucl_object_insert_key(res, ucl_object_fromint(key_stat->matched), - "matched", 0, false); - ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->matched_ctr.mean), - "matched_per_hour", 0, false); - ucl_object_insert_key(res, ucl_object_fromint(key_stat->added), - "added", 0, false); - ucl_object_insert_key(res, ucl_object_fromint(key_stat->deleted), - "deleted", 0, false); - ucl_object_insert_key(res, ucl_object_fromint(key_stat->errors), - "errors", 0, false); - - return res; -} - -static void -rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter, struct fuzzy_key *fuzzy_key, ucl_object_t *keys_obj, gboolean ip_stat) -{ - struct fuzzy_key_stat *key_stat = fuzzy_key->stat; - char keyname[17]; - - if (key_stat) { - rspamd_snprintf(keyname, sizeof(keyname), "%8bs", pk_iter); - - ucl_object_t *elt = rspamd_fuzzy_storage_stat_key(key_stat); - - if (key_stat->last_ips && ip_stat) { - int i = 0; - ucl_object_t *ip_elt = ucl_object_typed_new(UCL_OBJECT); - gpointer k, v; - - while ((i = rspamd_lru_hash_foreach(key_stat->last_ips, - i, &k, &v)) != -1) { - ucl_object_t *ip_cur = rspamd_fuzzy_storage_stat_key(v); - ucl_object_insert_key(ip_elt, ip_cur, - rspamd_inet_address_to_string(k), 0, true); - } - ucl_object_insert_key(elt, ip_elt, "ips", 0, false); - } - - int flag; - struct fuzzy_key_stat *flag_stat; - ucl_object_t *flags_ucl = ucl_object_typed_new(UCL_OBJECT); - - kh_foreach_key_value_ptr(fuzzy_key->flags_stat, flag, flag_stat, { - char intbuf[16]; - rspamd_snprintf(intbuf, sizeof(intbuf), "%d", flag); - ucl_object_insert_key(flags_ucl, rspamd_fuzzy_storage_stat_key(flag_stat), - intbuf, 0, true); - }); - - ucl_object_insert_key(elt, flags_ucl, "flags", 0, false); - - ucl_object_insert_key(elt, - rspamd_keypair_to_ucl(fuzzy_key->key, RSPAMD_KEYPAIR_ENCODING_DEFAULT, - RSPAMD_KEYPAIR_DUMP_NO_SECRET | RSPAMD_KEYPAIR_DUMP_FLATTENED), - "keypair", 0, false); - - if (fuzzy_key->rl_bucket) { - ucl_object_insert_key(elt, - rspamd_leaky_bucket_to_ucl(fuzzy_key->rl_bucket), - "ratelimit", 0, false); - } - - ucl_object_insert_key(keys_obj, elt, keyname, 0, true); - } -} -static ucl_object_t * -rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt) -{ - ucl_object_t *res; - - res = ucl_object_typed_new(UCL_OBJECT); - - ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->cur), "cur", 0, false); - ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->last), "last", 0, false); - - return res; -} - -static ucl_object_t * -rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat) -{ - struct fuzzy_key *fuzzy_key; - ucl_object_t *obj, *keys_obj, *elt, *ip_elt; - const unsigned char *pk_iter; - - obj = ucl_object_typed_new(UCL_OBJECT); - - keys_obj = ucl_object_typed_new(UCL_OBJECT); - - kh_foreach(ctx->keys, pk_iter, fuzzy_key, { - rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat); - }); - - if (ctx->dynamic_keys) { - kh_foreach(ctx->dynamic_keys, pk_iter, fuzzy_key, { - rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat); - }); - } - - ucl_object_insert_key(obj, keys_obj, "keys", 0, false); - - /* Now generic stats */ - ucl_object_insert_key(obj, - ucl_object_fromint(ctx->stat.fuzzy_hashes), - "fuzzy_stored", - 0, - false); - ucl_object_insert_key(obj, - ucl_object_fromint(ctx->stat.fuzzy_hashes_expired), - "fuzzy_expired", - 0, - false); - ucl_object_insert_key(obj, - ucl_object_fromint(ctx->stat.invalid_requests), - "invalid_requests", - 0, - false); - ucl_object_insert_key(obj, - ucl_object_fromint(ctx->stat.delayed_hashes), - "delayed_hashes", - 0, - false); - - if (ctx->errors_ips && ip_stat) { - gpointer k, v; - int i = 0; - ip_elt = ucl_object_typed_new(UCL_OBJECT); - - while ((i = rspamd_lru_hash_foreach(ctx->errors_ips, i, &k, &v)) != -1) { - ucl_object_insert_key(ip_elt, - ucl_object_fromint(*(uint64_t *) v), - rspamd_inet_address_to_string(k), 0, true); - } - - ucl_object_insert_key(obj, - ip_elt, - "errors_ips", - 0, - false); - } - - /* Checked by epoch */ - elt = ucl_object_typed_new(UCL_ARRAY); - - for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) { - ucl_array_append(elt, - ucl_object_fromint(ctx->stat.fuzzy_hashes_checked[i])); - } - - ucl_object_insert_key(obj, elt, "fuzzy_checked", 0, false); - - /* Shingles by epoch */ - elt = ucl_object_typed_new(UCL_ARRAY); - - for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) { - ucl_array_append(elt, - ucl_object_fromint(ctx->stat.fuzzy_shingles_checked[i])); - } - - ucl_object_insert_key(obj, elt, "fuzzy_shingles", 0, false); - - /* Matched by epoch */ - elt = ucl_object_typed_new(UCL_ARRAY); - - for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) { - ucl_array_append(elt, - ucl_object_fromint(ctx->stat.fuzzy_hashes_found[i])); - } - - ucl_object_insert_key(obj, elt, "fuzzy_found", 0, false); - - - return obj; -} - -static void -rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx) -{ - char path[PATH_MAX]; - - rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl", - RSPAMD_DBDIR); - - if (access(path, R_OK) != -1) { - struct ucl_parser *parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); - if (ucl_parser_add_file(parser, path)) { - ucl_object_t *obj = ucl_parser_get_object(parser); - int loaded = 0; - - if (ucl_object_type(obj) == UCL_ARRAY) { - ucl_object_iter_t it = NULL; - const ucl_object_t *cur; - - while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { - const ucl_object_t *ip, *value, *last; - const char *ip_str; - double limit_val, last_val; - - ip = ucl_object_find_key(cur, "ip"); - value = ucl_object_find_key(cur, "value"); - last = ucl_object_find_key(cur, "last"); - - if (ip == NULL || value == NULL || last == NULL) { - msg_err("invalid ratelimit object"); - continue; - } - - ip_str = ucl_object_tostring(ip); - limit_val = ucl_object_todouble(value); - last_val = ucl_object_todouble(last); - - if (ip_str == NULL || isnan(last_val)) { - msg_err("invalid ratelimit object"); - continue; - } - - rspamd_inet_addr_t *addr; - if (rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str), - RSPAMD_INET_ADDRESS_PARSE_NO_UNIX | RSPAMD_INET_ADDRESS_PARSE_NO_PORT)) { - struct rspamd_leaky_bucket_elt *elt = g_malloc(sizeof(*elt)); - - elt->cur = limit_val; - elt->last = last_val; - elt->addr = addr; - rspamd_lru_hash_insert(ctx->ratelimit_buckets, addr, elt, elt->last, ctx->leaky_bucket_ttl); - loaded++; - } - else { - msg_err("invalid ratelimit ip: %s", ip_str); - continue; - } - } - - msg_info("loaded %d ratelimit objects", loaded); - } - - ucl_object_unref(obj); - } - - ucl_parser_free(parser); - } -} - -static void -rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx) -{ - char path[PATH_MAX]; - - rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl.new", - RSPAMD_DBDIR); - FILE *f = fopen(path, "w"); - - if (f != NULL) { - ucl_object_t *top = ucl_object_typed_new(UCL_ARRAY); - int it = 0; - gpointer k, v; - - ucl_object_reserve(top, rspamd_lru_hash_size(ctx->ratelimit_buckets)); - - while ((it = rspamd_lru_hash_foreach(ctx->ratelimit_buckets, it, &k, &v)) != -1) { - ucl_object_t *cur = ucl_object_typed_new(UCL_OBJECT); - struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) v; - - ucl_object_insert_key(cur, ucl_object_fromdouble(elt->cur), "value", 0, false); - ucl_object_insert_key(cur, ucl_object_fromdouble(elt->last), "last", 0, false); - ucl_object_insert_key(cur, ucl_object_fromstring(rspamd_inet_address_to_string(elt->addr)), "ip", 0, false); - ucl_array_append(top, cur); - } - - if (ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, ucl_object_emit_file_funcs(f), NULL)) { - char npath[PATH_MAX]; - - fflush(f); - rspamd_snprintf(npath, sizeof(npath), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl", - RSPAMD_DBDIR); - - if (rename(path, npath) == -1) { - msg_warn("cannot rename %s to %s: %s", path, npath, strerror(errno)); - } - else { - msg_info("saved %d ratelimits in %s", rspamd_lru_hash_size(ctx->ratelimit_buckets), npath); - } - } - else { - msg_warn("cannot serialize ratelimit buckets to %s: %s", path, strerror(errno)); - } - - fclose(f); - ucl_object_unref(top); - } - else { - msg_warn("cannot save ratelimit buckets to %s: %s", path, strerror(errno)); - } -} - static int lua_fuzzy_add_pre_handler(lua_State *L) { @@ -3687,373 +2524,6 @@ lua_fuzzy_add_blacklist_handler(lua_State *L) return 0; } -static gboolean -rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main, - struct rspamd_worker *worker, int fd, - int attached_fd, - struct rspamd_control_command *cmd, - gpointer ud) -{ - struct rspamd_fuzzy_storage_ctx *ctx = ud; - struct rspamd_control_reply rep; - ucl_object_t *obj; - struct ucl_emitter_functions *emit_subr; - unsigned char fdspace[CMSG_SPACE(sizeof(int))]; - struct iovec iov; - struct msghdr msg; - struct cmsghdr *cmsg; - - int outfd = -1; - char tmppath[PATH_MAX]; - - memset(&rep, 0, sizeof(rep)); - rep.type = RSPAMD_CONTROL_FUZZY_STAT; - rep.id = cmd->id; - - rspamd_snprintf(tmppath, sizeof(tmppath), "%s%c%s-XXXXXXXXXX", - rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat"); - - if ((outfd = mkstemp(tmppath)) == -1) { - rep.reply.fuzzy_stat.status = errno; - msg_info_main("cannot make temporary stat file for fuzzy stat: %s", - strerror(errno)); - } - else { - const char *backend_id; - - rep.reply.fuzzy_stat.status = 0; - - backend_id = rspamd_fuzzy_backend_id(ctx->backend); - if (backend_id) { - memcpy(rep.reply.fuzzy_stat.storage_id, - backend_id, - sizeof(rep.reply.fuzzy_stat.storage_id)); - } - - obj = rspamd_fuzzy_stat_to_ucl(ctx, TRUE); - emit_subr = ucl_object_emit_fd_funcs(outfd); - ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL); - ucl_object_emit_funcs_free(emit_subr); - ucl_object_unref(obj); - /* Rewind output file */ - close(outfd); - outfd = open(tmppath, O_RDONLY); - unlink(tmppath); - } - - /* Now we can send outfd and status message */ - memset(&msg, 0, sizeof(msg)); - - /* Attach fd to the message */ - if (outfd != -1) { - memset(fdspace, 0, sizeof(fdspace)); - msg.msg_control = fdspace; - msg.msg_controllen = sizeof(fdspace); - cmsg = CMSG_FIRSTHDR(&msg); - - if (cmsg) { - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN(sizeof(int)); - memcpy(CMSG_DATA(cmsg), &outfd, sizeof(int)); - } - } - - iov.iov_base = &rep; - iov.iov_len = sizeof(rep); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - - if (sendmsg(fd, &msg, 0) == -1) { - msg_err_main("cannot send fuzzy stat: %s", strerror(errno)); - } - - if (outfd != -1) { - close(outfd); - } - - return TRUE; -} - -static gboolean -fuzzy_parse_ids(rspamd_mempool_t *pool, - const ucl_object_t *obj, - gpointer ud, - struct rspamd_rcl_section *section, - GError **err) -{ - struct rspamd_rcl_struct_parser *pd = (struct rspamd_rcl_struct_parser *) ud; - khash_t(fuzzy_key_ids_set) * target; - - target = *(khash_t(fuzzy_key_ids_set) **) ((char *) pd->user_struct + pd->offset); - - if (ucl_object_type(obj) == UCL_ARRAY) { - const ucl_object_t *cur; - ucl_object_iter_t it = NULL; - uint64_t id; - - while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { - if (ucl_object_toint_safe(cur, &id)) { - int r; - - kh_put(fuzzy_key_ids_set, target, id, &r); - } - else { - return FALSE; - } - } - - return TRUE; - } - else if (ucl_object_type(obj) == UCL_INT) { - int r; - kh_put(fuzzy_key_ids_set, target, ucl_object_toint(obj), &r); - - return TRUE; - } - - return FALSE; -} - -static struct fuzzy_key * -fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj, - khash_t(rspamd_fuzzy_keys_hash) * target) -{ - struct rspamd_cryptobox_keypair *kp = rspamd_keypair_from_ucl(obj); - - if (kp == NULL) { - return NULL; - } - - if (rspamd_keypair_type(kp) != RSPAMD_KEYPAIR_KEX) { - rspamd_keypair_unref(kp); - return FALSE; - } - - struct fuzzy_key *key = g_malloc0(sizeof(*key)); - REF_INIT_RETAIN(key, fuzzy_key_dtor); - key->key = kp; - struct fuzzy_key_stat *keystat = g_malloc0(sizeof(*keystat)); - REF_INIT_RETAIN(keystat, fuzzy_key_stat_dtor); - /* Hash of ip -> fuzzy_key_stat */ - keystat->last_ips = rspamd_lru_hash_new_full(1024, - (GDestroyNotify) rspamd_inet_address_free, - fuzzy_key_stat_unref, - rspamd_inet_address_hash, rspamd_inet_address_equal); - key->stat = keystat; - key->flags_stat = kh_init(fuzzy_key_flag_stat); - key->burst = NAN; - key->rate = NAN; - key->expire = NAN; - key->rl_bucket = NULL; - /* Allow read by default */ - key->flags = FUZZY_KEY_READ; - /* Preallocate some space for flags */ - kh_resize(fuzzy_key_flag_stat, key->flags_stat, 8); - const unsigned char *pk = rspamd_keypair_component(kp, RSPAMD_KEYPAIR_COMPONENT_PK, - NULL); - keystat->keypair = rspamd_keypair_ref(kp); - /* We map entries by pubkey in binary form for faster lookup */ - khiter_t k; - int r; - - k = kh_put(rspamd_fuzzy_keys_hash, target, pk, &r); - - if (r == 0) { - msg_err("duplicate keypair found: pk=%*bs", - 32, pk); - REF_RELEASE(key); - - return FALSE; - } - else if (r == -1) { - msg_err("hash insertion error: pk=%*bs", - 32, pk); - REF_RELEASE(key); - - return FALSE; - } - - kh_val(target, k) = key; - - const ucl_object_t *extensions = rspamd_keypair_get_extensions(kp); - - if (extensions) { - key->extensions = ucl_object_ref(extensions); - lua_State *L = RSPAMD_LUA_CFG_STATE(cfg); - const ucl_object_t *forbidden_ids = ucl_object_lookup(extensions, "forbidden_ids"); - - if (forbidden_ids && ucl_object_type(forbidden_ids) == UCL_ARRAY) { - key->forbidden_ids = kh_init(fuzzy_key_ids_set); - const ucl_object_t *cur; - ucl_object_iter_t it = NULL; - - while ((cur = ucl_object_iterate(forbidden_ids, &it, true)) != NULL) { - if (ucl_object_type(cur) == UCL_INT || ucl_object_type(cur) == UCL_FLOAT) { - int id = ucl_object_toint(cur); - int r; - - kh_put(fuzzy_key_ids_set, key->forbidden_ids, id, &r); - } - } - } - - const ucl_object_t *ratelimit = ucl_object_lookup(extensions, "ratelimit"); - - static int ratelimit_lua_id = -1; - - if (ratelimit_lua_id == -1) { - /* Load ratelimit parsing function */ - if (!rspamd_lua_require_function(L, "plugins/ratelimit", "parse_limit")) { - msg_err_config("cannot load ratelimit parser from ratelimit plugin"); - } - else { - ratelimit_lua_id = luaL_ref(L, LUA_REGISTRYINDEX); - } - } - - if (ratelimit && ratelimit_lua_id != -1) { - lua_rawgeti(L, LUA_REGISTRYINDEX, ratelimit_lua_id); - lua_pushstring(L, "fuzzy_key_ratelimit"); - ucl_object_push_lua(L, ratelimit, false); - - if (lua_pcall(L, 2, 1, 0) != 0) { - msg_err_config("cannot call ratelimit parser from ratelimit plugin"); - } - else { - if (lua_type(L, -1) == LUA_TTABLE) { - /* - * The returned table is in form { rate = xx, burst = yy } - */ - lua_getfield(L, -1, "rate"); - key->rate = lua_tonumber(L, -1); - lua_pop(L, 1); - - lua_getfield(L, -1, "burst"); - key->burst = lua_tonumber(L, -1); - lua_pop(L, 1); - - key->rl_bucket = g_malloc0(sizeof(*key->rl_bucket)); - } - } - - lua_settop(L, 0); - } - - const ucl_object_t *expire = ucl_object_lookup(extensions, "expire"); - if (expire && ucl_object_type(expire) == UCL_STRING) { - struct tm tm; - - /* DD-MM-YYYY */ - char *end = strptime(ucl_object_tostring(expire), "%d-%m-%Y", &tm); - - if (end != NULL && *end != '\0') { - msg_err_config("cannot parse expire date: %s", ucl_object_tostring(expire)); - } - else { - key->expire = mktime(&tm); - } - } - - const ucl_object_t *name = ucl_object_lookup(extensions, "name"); - if (name && ucl_object_type(name) == UCL_STRING) { - key->name = g_strdup(ucl_object_tostring(name)); - } - - /* Check permissions */ - const ucl_object_t *read_only = ucl_object_lookup(extensions, "read_only"); - if (read_only && ucl_object_type(read_only) == UCL_BOOLEAN) { - if (ucl_object_toboolean(read_only)) { - key->flags &= ~(FUZZY_KEY_WRITE | FUZZY_KEY_DELETE); - } - else { - key->flags |= (FUZZY_KEY_WRITE | FUZZY_KEY_DELETE); - } - } - - const ucl_object_t *allowed_ops = ucl_object_lookup(extensions, "allowed_ops"); - if (allowed_ops && ucl_object_type(allowed_ops) == UCL_ARRAY) { - const ucl_object_t *cur; - ucl_object_iter_t it = NULL; - /* Reset to only allowed */ - key->flags = 0; - - while ((cur = ucl_object_iterate(allowed_ops, &it, true)) != NULL) { - if (ucl_object_type(cur) == UCL_STRING) { - const char *op = ucl_object_tostring(cur); - - if (g_ascii_strcasecmp(op, "read") == 0) { - key->flags |= FUZZY_KEY_READ; - } - else if (g_ascii_strcasecmp(op, "write") == 0) { - key->flags |= FUZZY_KEY_WRITE; - } - else if (g_ascii_strcasecmp(op, "delete") == 0) { - key->flags |= FUZZY_KEY_DELETE; - } - else { - msg_warn_config("invalid operation: %s", op); - } - } - } - } - } - - msg_debug("loaded keypair %*bs; expire=%f; rate=%f; burst=%f; name=%s", - (int) crypto_box_publickeybytes(), pk, - key->expire, key->rate, key->burst, key->name); - - return key; -} - - -static gboolean -fuzzy_parse_keypair(rspamd_mempool_t *pool, - const ucl_object_t *obj, - gpointer ud, - struct rspamd_rcl_section *section, - GError **err) -{ - struct rspamd_rcl_struct_parser *pd = ud; - struct rspamd_fuzzy_storage_ctx *ctx; - struct fuzzy_key *key; - const ucl_object_t *cur; - ucl_object_iter_t it = NULL; - gboolean ret; - - ctx = pd->user_struct; - pd->offset = G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, default_keypair); - - /* - * Single key - */ - if (ucl_object_type(obj) == UCL_STRING || ucl_object_type(obj) == UCL_OBJECT) { - ret = rspamd_rcl_parse_struct_keypair(pool, obj, pd, section, err); - - if (!ret) { - return ret; - } - - key = fuzzy_add_keypair_from_ucl(ctx->cfg, obj, ctx->keys); - - if (key == NULL) { - return FALSE; - } - - /* Use the last one ? */ - ctx->default_key = key; - } - else if (ucl_object_type(obj) == UCL_ARRAY) { - while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { - if (!fuzzy_parse_keypair(pool, cur, pd, section, err)) { - msg_err_pool("cannot parse keypair"); - } - } - } - - return TRUE; -} - gpointer init_fuzzy(struct rspamd_config *cfg) { diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt index ad9df344d..8cc673f13 100644 --- a/src/libserver/CMakeLists.txt +++ b/src/libserver/CMakeLists.txt @@ -13,6 +13,9 @@ SET(LIBRSPAMDSERVERSRC ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_sqlite.c ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_redis.c ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend/fuzzy_backend_noop.c + ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_keys.c + ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_ratelimit.c + ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_storage_stat.c ${CMAKE_CURRENT_SOURCE_DIR}/milter.c ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c diff --git a/src/libserver/fuzzy_storage_internal.h b/src/libserver/fuzzy_storage_internal.h new file mode 100644 index 000000000..7b3779ae0 --- /dev/null +++ b/src/libserver/fuzzy_storage_internal.h @@ -0,0 +1,323 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_FUZZY_STORAGE_INTERNAL_H +#define RSPAMD_FUZZY_STORAGE_INTERNAL_H + +#include "config.h" + +#include "rspamd.h" +#include "ref.h" +#include "libserver/fuzzy_wire.h" +#include "libutil/hash.h" +#include "contrib/libev/ev.h" +#include "contrib/libucl/khash.h" + +#include + +struct map_cb_data; +struct rspamd_rcl_section; +struct rspamd_rcl_struct_parser; +struct rspamd_control_command; + +struct rspamd_main; +struct rspamd_worker; +struct rspamd_dns_resolver; +struct rspamd_radix_map_helper; +struct rspamd_hash_map_helper; +struct rspamd_keypair_cache; +struct rspamd_http_context; +struct rspamd_fuzzy_backend; + +struct rspamd_cryptobox_keypair; +struct rspamd_cryptobox_pubkey; + +struct fuzzy_tcp_session; + +struct fuzzy_global_stat { + uint64_t fuzzy_hashes; + uint64_t fuzzy_hashes_expired; + uint64_t fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX]; + uint64_t fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX]; + uint64_t fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX]; + uint64_t invalid_requests; + uint64_t delayed_hashes; +}; + +struct fuzzy_key_stat { + uint64_t checked; + uint64_t matched; + uint64_t added; + uint64_t deleted; + uint64_t errors; + struct rspamd_counter_data checked_ctr; + struct rspamd_counter_data matched_ctr; + double last_checked_time; + uint64_t last_checked_count; + uint64_t last_matched_count; + struct rspamd_cryptobox_keypair *keypair; + rspamd_lru_hash_t *last_ips; + + ref_entry_t ref; +}; + +struct rspamd_leaky_bucket_elt { + rspamd_inet_addr_t *addr; + double last; + double cur; +}; + +static inline int64_t +fuzzy_kp_hash(const unsigned char *p) +{ + int64_t res; + + memcpy(&res, p, sizeof(res)); + return res; +} + +static inline bool +fuzzy_kp_equal(gconstpointer a, gconstpointer b) +{ + const unsigned char *pa = a, *pb = b; + + return (memcmp(pa, pb, RSPAMD_FUZZY_KEYLEN) == 0); +} + +enum fuzzy_key_op { + FUZZY_KEY_READ = 0x1u << 0, + FUZZY_KEY_WRITE = 0x1u << 1, + FUZZY_KEY_DELETE = 0x1u << 2, +}; + +KHASH_SET_INIT_INT(fuzzy_key_ids_set); +KHASH_INIT(fuzzy_key_flag_stat, int, struct fuzzy_key_stat, 1, kh_int_hash_func, + kh_int_hash_equal); + +struct fuzzy_key { + char *name; + struct rspamd_cryptobox_keypair *key; + struct rspamd_cryptobox_pubkey *pk; + struct fuzzy_key_stat *stat; + khash_t(fuzzy_key_flag_stat) * flags_stat; + khash_t(fuzzy_key_ids_set) * forbidden_ids; + struct rspamd_leaky_bucket_elt *rl_bucket; + ucl_object_t *extensions; + double burst; + double rate; + ev_tstamp expire; + bool expired; + int flags; /* enum fuzzy_key_op */ + ref_entry_t ref; +}; + +KHASH_INIT(rspamd_fuzzy_keys_hash, + const unsigned char *, struct fuzzy_key *, 1, + fuzzy_kp_hash, fuzzy_kp_equal); + +struct rspamd_lua_fuzzy_script { + int cbref; + struct rspamd_lua_fuzzy_script *next; +}; + +struct rspamd_fuzzy_storage_ctx { + uint64_t magic; + struct ev_loop *event_loop; + struct rspamd_dns_resolver *resolver; + struct rspamd_config *cfg; + struct fuzzy_global_stat stat; + double expire; + double sync_timeout; + double delay; + double tcp_timeout; + struct rspamd_radix_map_helper *update_ips; + struct rspamd_hash_map_helper *update_keys; + struct rspamd_radix_map_helper *blocked_ips; + struct rspamd_radix_map_helper *ratelimit_whitelist; + struct rspamd_radix_map_helper *delay_whitelist; + + const ucl_object_t *update_map; + const ucl_object_t *update_keys_map; + const ucl_object_t *delay_whitelist_map; + const ucl_object_t *blocked_map; + const ucl_object_t *ratelimit_whitelist_map; + const ucl_object_t *dynamic_keys_map; + + unsigned int keypair_cache_size; + ev_timer stat_ev; + ev_io peer_ev; + + struct rspamd_cryptobox_keypair *default_keypair; + struct fuzzy_key *default_key; + khash_t(rspamd_fuzzy_keys_hash) * keys; + khash_t(rspamd_fuzzy_keys_hash) * dynamic_keys; + + gboolean encrypted_only; + gboolean read_only; + gboolean dedicated_update_worker; + struct rspamd_keypair_cache *keypair_cache; + struct rspamd_http_context *http_ctx; + rspamd_lru_hash_t *errors_ips; + rspamd_lru_hash_t *ratelimit_buckets; + struct rspamd_fuzzy_backend *backend; + GArray *updates_pending; + unsigned int updates_failed; + unsigned int updates_maxfail; + int peer_fd; + + unsigned int leaky_bucket_ttl; + unsigned int leaky_bucket_mask; + unsigned int max_buckets; + gboolean ratelimit_log_only; + double leaky_bucket_burst; + double leaky_bucket_rate; + + struct rspamd_worker *worker; + const ucl_object_t *skip_map; + struct rspamd_hash_map_helper *skip_hashes; + struct rspamd_lua_fuzzy_script *lua_pre_handlers; + struct rspamd_lua_fuzzy_script *lua_post_handlers; + struct rspamd_lua_fuzzy_script *lua_blacklist_handlers; + khash_t(fuzzy_key_ids_set) * default_forbidden_ids; + khash_t(fuzzy_key_ids_set) * weak_ids; +}; + +enum fuzzy_cmd_type { + CMD_NORMAL, + CMD_SHINGLE, + CMD_ENCRYPTED_NORMAL, + CMD_ENCRYPTED_SHINGLE +}; + +/* Legacy structure name for compatibility during refactoring */ +struct fuzzy_session { + struct rspamd_worker *worker; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_storage_ctx *ctx; + + struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */ + struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */ + struct fuzzy_key_stat *ip_stat; + + enum rspamd_fuzzy_epoch epoch; + enum fuzzy_cmd_type cmd_type; + int fd; + ev_tstamp timestamp; + struct ev_io io; + ref_entry_t ref; + struct fuzzy_key *key; + struct rspamd_fuzzy_cmd_extension *extensions; + unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; + + /* If this is a TCP session, this pointer will be set */ + struct fuzzy_tcp_session *tcp_session; +}; + +enum rspamd_ratelimit_event_type { + RATELIMIT_EVENT_NEW, + RATELIMIT_EVENT_EXISTING, + RATELIMIT_EVENT_BLACKLIST, +}; + +struct rspamd_ratelimit_callback_ctx { + rspamd_inet_addr_t *addr; + const char *reason; + enum rspamd_ratelimit_event_type type; + + struct rspamd_leaky_bucket_elt *bucket; + double max_burst; + double max_rate; + + struct fuzzy_session *session; +}; + +struct fuzzy_keymap_ucl_buf { + rspamd_fstring_t *buf; + struct rspamd_fuzzy_storage_ctx *ctx; +}; + +enum rspamd_ratelimit_check_result { + ratelimit_pass, + ratelimit_new, + ratelimit_existing, +}; + +enum rspamd_ratelimit_check_policy { + ratelimit_policy_permanent, + ratelimit_policy_normal, +}; + +/* Keys/dynamic map */ +char *ucl_keymap_read_cb(char *chunk, int len, struct map_cb_data *data, gboolean final); +void ucl_keymap_fin_cb(struct map_cb_data *data, void **target); +void ucl_keymap_dtor_cb(struct map_cb_data *data); + +void fuzzy_key_stat_dtor(gpointer p); +void fuzzy_key_stat_unref(gpointer p); +void fuzzy_key_dtor(gpointer p); +void fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash); + +gboolean fuzzy_parse_ids(rspamd_mempool_t *pool, const ucl_object_t *obj, + gpointer ud, struct rspamd_rcl_section *section, GError **err); +struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj, + khash_t(rspamd_fuzzy_keys_hash) * target); +gboolean fuzzy_parse_keypair(rspamd_mempool_t *pool, const ucl_object_t *obj, + gpointer ud, struct rspamd_rcl_section *section, GError **err); + +/* Ratelimit */ +void fuzzy_rl_bucket_free(gpointer p); + +enum rspamd_ratelimit_check_result rspamd_fuzzy_check_ratelimit_bucket( + struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + ev_tstamp timestamp, + struct rspamd_leaky_bucket_elt *elt, + enum rspamd_ratelimit_check_policy policy, + double max_burst, double max_rate); + +gboolean rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + struct rspamd_worker *worker, + ev_tstamp timestamp); + +void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx, + const struct rspamd_ratelimit_callback_ctx *cb_ctx); +void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + const char *reason); + +gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr); + +ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt); +void rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx); +void rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx); + +/* Stats / controller */ +ucl_object_t *rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat); +void rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter, + struct fuzzy_key *fuzzy_key, + ucl_object_t *keys_obj, + gboolean ip_stat); +ucl_object_t *rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat); + +gboolean rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, int fd, + int attached_fd, + struct rspamd_control_command *cmd, + gpointer ud); + +#endif diff --git a/src/libserver/fuzzy_storage_keys.c b/src/libserver/fuzzy_storage_keys.c new file mode 100644 index 000000000..8c12de2e0 --- /dev/null +++ b/src/libserver/fuzzy_storage_keys.c @@ -0,0 +1,513 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Rspamd fuzzy storage server: keys and dynamic keymaps + */ + +#include "config.h" + +#include "fuzzy_storage_internal.h" + +#include "cfg_rcl.h" +#include "libcryptobox/cryptobox.h" +#include "libcryptobox/keypair.h" +#include "libserver/maps/map.h" +#include "lua/lua_common.h" +#include "unix-std.h" + +#include +#include +#include + +char * +ucl_keymap_read_cb(char *chunk, int len, + struct map_cb_data *data, gboolean final) +{ + struct fuzzy_keymap_ucl_buf *jb, *pd; + + pd = data->prev_data; + + g_assert(pd != NULL); + + if (data->cur_data == NULL) { + jb = g_malloc0(sizeof(*jb)); + jb->ctx = pd->ctx; + data->cur_data = jb; + } + else { + jb = data->cur_data; + } + + if (jb->buf == NULL) { + /* Allocate memory for buffer */ + jb->buf = rspamd_fstring_sized_new(MAX(len, 4096)); + } + + jb->buf = rspamd_fstring_append(jb->buf, chunk, len); + + return NULL; +} + +void ucl_keymap_fin_cb(struct map_cb_data *data, void **target) +{ + struct fuzzy_keymap_ucl_buf *jb; + ucl_object_t *top; + struct ucl_parser *parser; + struct rspamd_config *cfg; + + /* Now parse ucl */ + if (data->cur_data) { + jb = data->cur_data; + cfg = jb->ctx->cfg; + } + else { + msg_err("no cur data in the map! might be a bug"); + return; + } + + if (jb->buf->len == 0) { + msg_err_config("no data read"); + + return; + } + + parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); + + if (!ucl_parser_add_chunk(parser, jb->buf->str, jb->buf->len)) { + msg_err_config("cannot load ucl data: parse error %s", + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + return; + } + + top = ucl_parser_get_object(parser); + ucl_parser_free(parser); + + if (ucl_object_type(top) != UCL_ARRAY) { + ucl_object_unref(top); + msg_err_config("loaded ucl is not an array"); + return; + } + + if (target) { + *target = data->cur_data; + } + + if (data->prev_data) { + jb = data->prev_data; + /* Clean prev data */ + if (jb->buf) { + rspamd_fstring_free(jb->buf); + } + + /* Clean the existing keys */ + struct fuzzy_key *key; + kh_foreach_value(jb->ctx->dynamic_keys, key, { + REF_RELEASE(key); + }); + kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys); + + /* Insert new keys */ + const ucl_object_t *cur; + ucl_object_iter_t it = NULL; + int success = 0; + + while ((cur = ucl_object_iterate(top, &it, true)) != NULL) { + struct fuzzy_key *nk; + + nk = fuzzy_add_keypair_from_ucl(cfg, cur, jb->ctx->dynamic_keys); + + if (nk == NULL) { + msg_warn_config("cannot add dynamic keypair"); + } + success++; + } + + msg_info_config("loaded %d dynamic keypairs", success); + + g_free(jb); + } + + ucl_object_unref(top); +} + +void ucl_keymap_dtor_cb(struct map_cb_data *data) +{ + struct fuzzy_keymap_ucl_buf *jb; + + if (data->cur_data) { + jb = data->cur_data; + /* Clean prev data */ + if (jb->buf) { + rspamd_fstring_free(jb->buf); + } + + struct fuzzy_key *key; + kh_foreach_value(jb->ctx->dynamic_keys, key, { + REF_RELEASE(key); + }); + /* Clear hash content but don't destroy - mempool destructor will handle it */ + kh_clear(rspamd_fuzzy_keys_hash, jb->ctx->dynamic_keys); + + g_free(jb); + } +} + +void fuzzy_key_stat_dtor(gpointer p) +{ + struct fuzzy_key_stat *st = p; + + if (st->last_ips) { + rspamd_lru_hash_destroy(st->last_ips); + } + + if (st->keypair) { + rspamd_keypair_unref(st->keypair); + } + + g_free(st); +} + +void fuzzy_key_stat_unref(gpointer p) +{ + struct fuzzy_key_stat *st = p; + + REF_RELEASE(st); +} + +void fuzzy_key_dtor(gpointer p) +{ + struct fuzzy_key *key = p; + + if (key) { + if (key->key) { + rspamd_keypair_unref(key->key); + } + + if (key->stat) { + REF_RELEASE(key->stat); + } + + if (key->flags_stat) { + kh_destroy(fuzzy_key_flag_stat, key->flags_stat); + } + + if (key->forbidden_ids) { + kh_destroy(fuzzy_key_ids_set, key->forbidden_ids); + } + + if (key->rl_bucket) { + /* TODO: save bucket stats */ + g_free(key->rl_bucket); + } + + if (key->name) { + g_free(key->name); + } + + if (key->extensions) { + ucl_object_unref(key->extensions); + } + + g_free(key); + } +} + +void fuzzy_hash_table_dtor(khash_t(rspamd_fuzzy_keys_hash) * hash) +{ + struct fuzzy_key *key; + kh_foreach_value(hash, key, { + REF_RELEASE(key); + }); + kh_destroy(rspamd_fuzzy_keys_hash, hash); +} + +gboolean +fuzzy_parse_ids(rspamd_mempool_t *pool, + const ucl_object_t *obj, + gpointer ud, + struct rspamd_rcl_section *section, + GError **err) +{ + struct rspamd_rcl_struct_parser *pd = (struct rspamd_rcl_struct_parser *) ud; + khash_t(fuzzy_key_ids_set) * target; + + target = *(khash_t(fuzzy_key_ids_set) **) ((char *) pd->user_struct + pd->offset); + + if (ucl_object_type(obj) == UCL_ARRAY) { + const ucl_object_t *cur; + ucl_object_iter_t it = NULL; + uint64_t id; + + while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { + if (ucl_object_toint_safe(cur, &id)) { + int r; + + kh_put(fuzzy_key_ids_set, target, id, &r); + } + else { + return FALSE; + } + } + + return TRUE; + } + else if (ucl_object_type(obj) == UCL_INT) { + int r; + kh_put(fuzzy_key_ids_set, target, ucl_object_toint(obj), &r); + + return TRUE; + } + + return FALSE; +} + +struct fuzzy_key * +fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, + const ucl_object_t *obj, + khash_t(rspamd_fuzzy_keys_hash) * target) +{ + struct rspamd_cryptobox_keypair *kp = rspamd_keypair_from_ucl(obj); + + if (kp == NULL) { + return NULL; + } + + if (rspamd_keypair_type(kp) != RSPAMD_KEYPAIR_KEX) { + rspamd_keypair_unref(kp); + return NULL; + } + + struct fuzzy_key *key = g_malloc0(sizeof(*key)); + REF_INIT_RETAIN(key, fuzzy_key_dtor); + key->key = kp; + struct fuzzy_key_stat *keystat = g_malloc0(sizeof(*keystat)); + REF_INIT_RETAIN(keystat, fuzzy_key_stat_dtor); + /* Hash of ip -> fuzzy_key_stat */ + keystat->last_ips = rspamd_lru_hash_new_full(1024, + (GDestroyNotify) rspamd_inet_address_free, + fuzzy_key_stat_unref, + rspamd_inet_address_hash, rspamd_inet_address_equal); + key->stat = keystat; + key->flags_stat = kh_init(fuzzy_key_flag_stat); + key->burst = NAN; + key->rate = NAN; + key->expire = NAN; + key->rl_bucket = NULL; + /* Allow read by default */ + key->flags = FUZZY_KEY_READ; + /* Preallocate some space for flags */ + kh_resize(fuzzy_key_flag_stat, key->flags_stat, 8); + const unsigned char *pk = rspamd_keypair_component(kp, RSPAMD_KEYPAIR_COMPONENT_PK, + NULL); + keystat->keypair = rspamd_keypair_ref(kp); + /* We map entries by pubkey in binary form for faster lookup */ + khiter_t k; + int r; + + k = kh_put(rspamd_fuzzy_keys_hash, target, pk, &r); + + if (r == 0) { + msg_err("duplicate keypair found: pk=%*bs", + 32, pk); + REF_RELEASE(key); + + return NULL; + } + else if (r == -1) { + msg_err("hash insertion error: pk=%*bs", + 32, pk); + REF_RELEASE(key); + + return NULL; + } + + kh_val(target, k) = key; + + const ucl_object_t *extensions = rspamd_keypair_get_extensions(kp); + + if (extensions) { + key->extensions = ucl_object_ref(extensions); + lua_State *L = RSPAMD_LUA_CFG_STATE(cfg); + const ucl_object_t *forbidden_ids = ucl_object_lookup(extensions, "forbidden_ids"); + + if (forbidden_ids && ucl_object_type(forbidden_ids) == UCL_ARRAY) { + key->forbidden_ids = kh_init(fuzzy_key_ids_set); + const ucl_object_t *cur; + ucl_object_iter_t it = NULL; + + while ((cur = ucl_object_iterate(forbidden_ids, &it, true)) != NULL) { + if (ucl_object_type(cur) == UCL_INT || ucl_object_type(cur) == UCL_FLOAT) { + int id = ucl_object_toint(cur); + int ids_r; + + kh_put(fuzzy_key_ids_set, key->forbidden_ids, id, &ids_r); + } + } + } + + const ucl_object_t *ratelimit = ucl_object_lookup(extensions, "ratelimit"); + + static int ratelimit_lua_id = -1; + + if (ratelimit_lua_id == -1) { + /* Load ratelimit parsing function */ + if (!rspamd_lua_require_function(L, "plugins/ratelimit", "parse_limit")) { + msg_err_config("cannot load ratelimit parser from ratelimit plugin"); + } + else { + ratelimit_lua_id = luaL_ref(L, LUA_REGISTRYINDEX); + } + } + + if (ratelimit && ratelimit_lua_id != -1) { + lua_rawgeti(L, LUA_REGISTRYINDEX, ratelimit_lua_id); + lua_pushstring(L, "fuzzy_key_ratelimit"); + ucl_object_push_lua(L, ratelimit, false); + + if (lua_pcall(L, 2, 1, 0) != 0) { + msg_err_config("cannot call ratelimit parser from ratelimit plugin"); + } + else { + if (lua_type(L, -1) == LUA_TTABLE) { + /* The returned table is in form { rate = xx, burst = yy } */ + lua_getfield(L, -1, "rate"); + key->rate = lua_tonumber(L, -1); + lua_pop(L, 1); + + lua_getfield(L, -1, "burst"); + key->burst = lua_tonumber(L, -1); + lua_pop(L, 1); + + key->rl_bucket = g_malloc0(sizeof(*key->rl_bucket)); + } + } + + lua_settop(L, 0); + } + + const ucl_object_t *expire = ucl_object_lookup(extensions, "expire"); + if (expire && ucl_object_type(expire) == UCL_STRING) { + struct tm tm; + + /* DD-MM-YYYY */ + char *end = strptime(ucl_object_tostring(expire), "%d-%m-%Y", &tm); + + if (end != NULL && *end != '\0') { + msg_err_config("cannot parse expire date: %s", ucl_object_tostring(expire)); + } + else { + key->expire = mktime(&tm); + } + } + + const ucl_object_t *name = ucl_object_lookup(extensions, "name"); + if (name && ucl_object_type(name) == UCL_STRING) { + key->name = g_strdup(ucl_object_tostring(name)); + } + + /* Check permissions */ + const ucl_object_t *read_only = ucl_object_lookup(extensions, "read_only"); + if (read_only && ucl_object_type(read_only) == UCL_BOOLEAN) { + if (ucl_object_toboolean(read_only)) { + key->flags &= ~(FUZZY_KEY_WRITE | FUZZY_KEY_DELETE); + } + else { + key->flags |= (FUZZY_KEY_WRITE | FUZZY_KEY_DELETE); + } + } + + const ucl_object_t *allowed_ops = ucl_object_lookup(extensions, "allowed_ops"); + if (allowed_ops && ucl_object_type(allowed_ops) == UCL_ARRAY) { + const ucl_object_t *cur; + ucl_object_iter_t it = NULL; + /* Reset to only allowed */ + key->flags = 0; + + while ((cur = ucl_object_iterate(allowed_ops, &it, true)) != NULL) { + if (ucl_object_type(cur) == UCL_STRING) { + const char *op = ucl_object_tostring(cur); + + if (g_ascii_strcasecmp(op, "read") == 0) { + key->flags |= FUZZY_KEY_READ; + } + else if (g_ascii_strcasecmp(op, "write") == 0) { + key->flags |= FUZZY_KEY_WRITE; + } + else if (g_ascii_strcasecmp(op, "delete") == 0) { + key->flags |= FUZZY_KEY_DELETE; + } + else { + msg_warn_config("invalid operation: %s", op); + } + } + } + } + } + + msg_debug("loaded keypair %*bs; expire=%f; rate=%f; burst=%f; name=%s", + (int) crypto_box_publickeybytes(), pk, + key->expire, key->rate, key->burst, key->name); + + return key; +} + +gboolean +fuzzy_parse_keypair(rspamd_mempool_t *pool, + const ucl_object_t *obj, + gpointer ud, + struct rspamd_rcl_section *section, + GError **err) +{ + struct rspamd_rcl_struct_parser *pd = ud; + struct rspamd_fuzzy_storage_ctx *ctx; + struct fuzzy_key *key; + const ucl_object_t *cur; + ucl_object_iter_t it = NULL; + gboolean ret; + + ctx = pd->user_struct; + pd->offset = G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, default_keypair); + + /* + * Single key + */ + if (ucl_object_type(obj) == UCL_STRING || ucl_object_type(obj) == UCL_OBJECT) { + ret = rspamd_rcl_parse_struct_keypair(pool, obj, pd, section, err); + + if (!ret) { + return ret; + } + + key = fuzzy_add_keypair_from_ucl(ctx->cfg, obj, ctx->keys); + + if (key == NULL) { + return FALSE; + } + + /* Use the last one ? */ + ctx->default_key = key; + } + else if (ucl_object_type(obj) == UCL_ARRAY) { + while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { + if (!fuzzy_parse_keypair(pool, cur, pd, section, err)) { + msg_err_pool("cannot parse keypair"); + } + } + } + + return TRUE; +} diff --git a/src/libserver/fuzzy_storage_ratelimit.c b/src/libserver/fuzzy_storage_ratelimit.c new file mode 100644 index 000000000..1f7c1e8fa --- /dev/null +++ b/src/libserver/fuzzy_storage_ratelimit.c @@ -0,0 +1,549 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Rspamd fuzzy storage server: ratelimits + */ + +#include "config.h" + +#include "fuzzy_storage_internal.h" + +#include "maps/map_helpers.h" +#include "rspamd_control.h" +#include "lua/lua_common.h" +#include "contrib/uthash/utlist.h" + +#include +#include +#include +#include +#include +#include + +enum rspamd_ratelimit_check_result +rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + ev_tstamp timestamp, + struct rspamd_leaky_bucket_elt *elt, + enum rspamd_ratelimit_check_policy policy, + double max_burst, double max_rate) +{ + gboolean ratelimited = FALSE, new_ratelimit = FALSE; + + /* Nothing to check */ + if (isnan(max_burst) || isnan(max_rate)) { + return ratelimit_pass; + } + + if (isnan(elt->cur)) { + /* There is an issue with the previous logic: the TTL is updated each time + * we see that new bucket. Hence, we need to check the `last` and act accordingly + */ + if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) { + /* + * We reset bucket to it's 90% capacity to allow some requests + * This should cope with the issue when we block an IP network for some burst and never unblock it + */ + elt->cur = max_burst * 0.9; + elt->last = timestamp; + } + else { + ratelimited = TRUE; + } + } + else { + /* Update bucket: leak some elements */ + if (elt->last < timestamp) { + elt->cur -= max_rate * (timestamp - elt->last); + elt->last = timestamp; + + if (elt->cur < 0) { + elt->cur = 0; + } + } + else { + elt->last = timestamp; + } + + /* Check the bucket */ + if (elt->cur >= max_burst) { + + if (policy == ratelimit_policy_permanent) { + elt->cur = NAN; + } + new_ratelimit = TRUE; + ratelimited = TRUE; + } + else { + elt->cur++; /* Allow one more request */ + } + } + + /* Note: Caller is responsible for calling the ratelimit handlers with + * proper context (new vs existing, bucket info, session info, etc.) + */ + + if (new_ratelimit) { + return ratelimit_new; + } + + return ratelimited ? ratelimit_existing : ratelimit_pass; +} + +gboolean +rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + struct rspamd_worker *worker, + ev_tstamp timestamp) +{ + rspamd_inet_addr_t *masked; + struct rspamd_leaky_bucket_elt *elt; + + if (!addr) { + return TRUE; + } + + if (ctx->ratelimit_whitelist != NULL) { + if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist, + addr) != NULL) { + return TRUE; + } + } + + /* Skip ratelimit for local addresses */ + if (rspamd_inet_address_is_local(addr)) { + return TRUE; + } + + masked = rspamd_inet_address_copy(addr, NULL); + + if (rspamd_inet_address_get_af(masked) == AF_INET) { + rspamd_inet_address_apply_mask(masked, + MIN(ctx->leaky_bucket_mask, 32)); + } + else { + /* Must be at least /64 */ + rspamd_inet_address_apply_mask(masked, + MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128)); + } + + elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked, + (time_t) timestamp); + + if (elt) { + enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr, + timestamp, elt, + ratelimit_policy_permanent, + ctx->leaky_bucket_burst, + ctx->leaky_bucket_rate); + + if (res == ratelimit_new) { + msg_info("ratelimiting %s (%s), %.1f max elts", + rspamd_inet_address_to_string(addr), + rspamd_inet_address_to_string(masked), + ctx->leaky_bucket_burst); + + struct rspamd_srv_command srv_cmd; + + srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED; + srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked); + + if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) { + socklen_t slen; + struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen); + + if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) { + memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen); + msg_debug("propagating blocked address to other workers"); + rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); + } + else { + msg_err("bad address length: %d, expected to be %d", + (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); + } + } + + if (ctx->lua_blacklist_handlers) { + struct rspamd_ratelimit_callback_ctx cb_ctx = { + .addr = addr, + .reason = "ratelimit", + .type = RATELIMIT_EVENT_NEW, + .bucket = elt, + .max_burst = ctx->leaky_bucket_burst, + .max_rate = ctx->leaky_bucket_rate, + .session = NULL, + }; + rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx); + } + } + else if (res == ratelimit_existing) { + if (ctx->lua_blacklist_handlers) { + struct rspamd_ratelimit_callback_ctx cb_ctx = { + .addr = addr, + .reason = "ratelimit", + .type = RATELIMIT_EVENT_EXISTING, + .bucket = elt, + .max_burst = ctx->leaky_bucket_burst, + .max_rate = ctx->leaky_bucket_rate, + .session = NULL, + }; + rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx); + } + } + + rspamd_inet_address_free(masked); + + return res == ratelimit_pass; + } + else { + /* New bucket */ + elt = g_malloc(sizeof(*elt)); + elt->addr = masked; /* transfer ownership */ + elt->cur = 1; + elt->last = timestamp; + + rspamd_lru_hash_insert(ctx->ratelimit_buckets, + masked, + elt, + timestamp, + ctx->leaky_bucket_ttl); + } + + return TRUE; +} + +static void +rspamd_fuzzy_bucket_info_tolua(lua_State *L, + const struct rspamd_ratelimit_callback_ctx *cb_ctx) +{ + if (!cb_ctx->bucket) { + lua_pushnil(L); + return; + } + + lua_createtable(L, 0, 6); + + /* bucket_level - current fill level (nil if permanently blocked) */ + if (isnan(cb_ctx->bucket->cur)) { + lua_pushnil(L); + lua_setfield(L, -2, "bucket_level"); + lua_pushboolean(L, TRUE); + lua_setfield(L, -2, "is_permanent"); + } + else { + lua_pushnumber(L, cb_ctx->bucket->cur); + lua_setfield(L, -2, "bucket_level"); + lua_pushboolean(L, FALSE); + lua_setfield(L, -2, "is_permanent"); + } + + /* max_burst */ + if (!isnan(cb_ctx->max_burst)) { + lua_pushnumber(L, cb_ctx->max_burst); + lua_setfield(L, -2, "max_burst"); + } + + /* max_rate */ + if (!isnan(cb_ctx->max_rate)) { + lua_pushnumber(L, cb_ctx->max_rate); + lua_setfield(L, -2, "max_rate"); + } + + /* exceeded_by - how much over the limit */ + if (!isnan(cb_ctx->bucket->cur) && !isnan(cb_ctx->max_burst) && + cb_ctx->bucket->cur > cb_ctx->max_burst) { + lua_pushnumber(L, cb_ctx->bucket->cur - cb_ctx->max_burst); + lua_setfield(L, -2, "exceeded_by"); + } + + /* last_seen */ + lua_pushnumber(L, cb_ctx->bucket->last); + lua_setfield(L, -2, "last_seen"); +} + +static void +rspamd_fuzzy_ratelimit_extensions_tolua(lua_State *L, + const struct rspamd_ratelimit_callback_ctx *cb_ctx) +{ + struct rspamd_fuzzy_cmd_extension *ext; + rspamd_inet_addr_t *addr; + + lua_createtable(L, 0, 2); + + if (!cb_ctx->session || !cb_ctx->session->extensions) { + return; + } + + LL_FOREACH(cb_ctx->session->extensions, ext) + { + switch (ext->ext) { + case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN: + lua_pushlstring(L, (const char *) ext->payload, ext->length); + lua_setfield(L, -2, "domain"); + break; + case RSPAMD_FUZZY_EXT_SOURCE_IP4: + addr = rspamd_inet_address_new(AF_INET, ext->payload); + rspamd_lua_ip_push(L, addr); + rspamd_inet_address_free(addr); + lua_setfield(L, -2, "source_ip"); + break; + case RSPAMD_FUZZY_EXT_SOURCE_IP6: + addr = rspamd_inet_address_new(AF_INET6, ext->payload); + rspamd_lua_ip_push(L, addr); + rspamd_inet_address_free(addr); + lua_setfield(L, -2, "source_ip"); + break; + } + } +} + +void rspamd_fuzzy_call_ratelimit_handlers(struct rspamd_fuzzy_storage_ctx *ctx, + const struct rspamd_ratelimit_callback_ctx *cb_ctx) +{ + if (ctx->lua_blacklist_handlers == NULL) { + return; + } + + struct rspamd_lua_fuzzy_script *cur; + LL_FOREACH(ctx->lua_blacklist_handlers, cur) + { + lua_State *L = ctx->cfg->lua_state; + int err_idx, ret; + const int nargs = 6; + + lua_pushcfunction(L, &rspamd_lua_traceback); + err_idx = lua_gettop(L); + lua_checkstack(L, err_idx + nargs + 2); + lua_rawgeti(L, LUA_REGISTRYINDEX, cur->cbref); + + /* Arg 1: client IP */ + rspamd_lua_ip_push(L, cb_ctx->addr); + + /* Arg 2: block reason */ + lua_pushstring(L, cb_ctx->reason); + + /* Arg 3: event type */ + switch (cb_ctx->type) { + case RATELIMIT_EVENT_NEW: + lua_pushliteral(L, "new"); + break; + case RATELIMIT_EVENT_EXISTING: + lua_pushliteral(L, "existing"); + break; + case RATELIMIT_EVENT_BLACKLIST: + lua_pushliteral(L, "blacklist"); + break; + } + + /* Arg 4: ratelimit_info table (or nil) */ + rspamd_fuzzy_bucket_info_tolua(L, cb_ctx); + + /* Arg 5: digest (or nil) */ + if (cb_ctx->session) { + (void) lua_new_text(L, (const char *) cb_ctx->session->cmd.basic.digest, + sizeof(cb_ctx->session->cmd.basic.digest), FALSE); + } + else { + lua_pushnil(L); + } + + /* Arg 6: extensions table */ + rspamd_fuzzy_ratelimit_extensions_tolua(L, cb_ctx); + + if ((ret = lua_pcall(L, nargs, 0, err_idx)) != 0) { + msg_err("call to lua_blacklist_cbref " + "script failed (%d): %s", + ret, lua_tostring(L, -1)); + } + + lua_settop(L, 0); + } +} + +void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + const char *reason) +{ + if (ctx->lua_blacklist_handlers == NULL) { + return; + } + + struct rspamd_ratelimit_callback_ctx cb_ctx = { + .addr = addr, + .reason = reason, + .type = g_strcmp0(reason, "blacklisted") == 0 ? RATELIMIT_EVENT_BLACKLIST : RATELIMIT_EVENT_EXISTING, + .bucket = NULL, + .max_burst = NAN, + .max_rate = NAN, + .session = NULL, + }; + rspamd_fuzzy_call_ratelimit_handlers(ctx, &cb_ctx); +} + +gboolean +rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr) +{ + if (ctx->blocked_ips != NULL) { + if (rspamd_match_radix_map_addr(ctx->blocked_ips, + addr) != NULL) { + + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "blacklisted"); + return FALSE; + } + } + + return TRUE; +} + +void fuzzy_rl_bucket_free(gpointer p) +{ + struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) p; + + rspamd_inet_address_free(elt->addr); + g_free(elt); +} + +ucl_object_t * +rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt) +{ + ucl_object_t *res; + + res = ucl_object_typed_new(UCL_OBJECT); + + ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->cur), "cur", 0, false); + ucl_object_insert_key(res, ucl_object_fromdouble(p_elt->last), "last", 0, false); + + return res; +} + +void rspamd_fuzzy_maybe_load_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx) +{ + char path[PATH_MAX]; + + rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl", + RSPAMD_DBDIR); + + if (access(path, R_OK) != -1) { + struct ucl_parser *parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); + if (ucl_parser_add_file(parser, path)) { + ucl_object_t *obj = ucl_parser_get_object(parser); + int loaded = 0; + + if (ucl_object_type(obj) == UCL_ARRAY) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + + while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { + const ucl_object_t *ip, *value, *last; + const char *ip_str; + double limit_val, last_val; + + ip = ucl_object_find_key(cur, "ip"); + value = ucl_object_find_key(cur, "value"); + last = ucl_object_find_key(cur, "last"); + + if (ip == NULL || value == NULL || last == NULL) { + msg_err("invalid ratelimit object"); + continue; + } + + ip_str = ucl_object_tostring(ip); + limit_val = ucl_object_todouble(value); + last_val = ucl_object_todouble(last); + + if (ip_str == NULL || isnan(last_val)) { + msg_err("invalid ratelimit object"); + continue; + } + + rspamd_inet_addr_t *addr; + if (rspamd_parse_inet_address(&addr, ip_str, strlen(ip_str), + RSPAMD_INET_ADDRESS_PARSE_NO_UNIX | RSPAMD_INET_ADDRESS_PARSE_NO_PORT)) { + struct rspamd_leaky_bucket_elt *elt = g_malloc(sizeof(*elt)); + + elt->cur = limit_val; + elt->last = last_val; + elt->addr = addr; + rspamd_lru_hash_insert(ctx->ratelimit_buckets, addr, elt, elt->last, ctx->leaky_bucket_ttl); + loaded++; + } + else { + msg_err("invalid ratelimit ip: %s", ip_str); + continue; + } + } + + msg_info("loaded %d ratelimit objects", loaded); + } + + ucl_object_unref(obj); + } + + ucl_parser_free(parser); + } +} + +void rspamd_fuzzy_maybe_save_ratelimits(struct rspamd_fuzzy_storage_ctx *ctx) +{ + char path[PATH_MAX]; + + rspamd_snprintf(path, sizeof(path), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl.new", + RSPAMD_DBDIR); + FILE *f = fopen(path, "w"); + + if (f != NULL) { + ucl_object_t *top = ucl_object_typed_new(UCL_ARRAY); + int it = 0; + gpointer k, v; + + ucl_object_reserve(top, rspamd_lru_hash_size(ctx->ratelimit_buckets)); + + while ((it = rspamd_lru_hash_foreach(ctx->ratelimit_buckets, it, &k, &v)) != -1) { + ucl_object_t *cur = ucl_object_typed_new(UCL_OBJECT); + struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *) v; + + ucl_object_insert_key(cur, ucl_object_fromdouble(elt->cur), "value", 0, false); + ucl_object_insert_key(cur, ucl_object_fromdouble(elt->last), "last", 0, false); + ucl_object_insert_key(cur, ucl_object_fromstring(rspamd_inet_address_to_string(elt->addr)), "ip", 0, false); + ucl_array_append(top, cur); + } + + if (ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, ucl_object_emit_file_funcs(f), NULL)) { + char npath[PATH_MAX]; + + fflush(f); + rspamd_snprintf(npath, sizeof(npath), "%s" G_DIR_SEPARATOR_S "fuzzy_ratelimits.ucl", + RSPAMD_DBDIR); + + if (rename(path, npath) == -1) { + msg_warn("cannot rename %s to %s: %s", path, npath, strerror(errno)); + } + else { + msg_info("saved %d ratelimits in %s", rspamd_lru_hash_size(ctx->ratelimit_buckets), npath); + } + } + else { + msg_warn("cannot serialize ratelimit buckets to %s: %s", path, strerror(errno)); + } + + fclose(f); + ucl_object_unref(top); + } + else { + msg_warn("cannot save ratelimit buckets to %s: %s", path, strerror(errno)); + } +} diff --git a/src/libserver/fuzzy_storage_stat.c b/src/libserver/fuzzy_storage_stat.c new file mode 100644 index 000000000..ef61001c7 --- /dev/null +++ b/src/libserver/fuzzy_storage_stat.c @@ -0,0 +1,300 @@ +/* + * Copyright 2026 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "rspamd.h" +#include "util.h" +#include "rspamd_control.h" +#include "libserver/worker_util.h" +#include "fuzzy_backend/fuzzy_backend.h" +#include "fuzzy_storage_internal.h" +#include "fuzzy_wire.h" +#include "libcryptobox/keypair.h" +#include "unix-std.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +ucl_object_t * +rspamd_fuzzy_storage_stat_key(const struct fuzzy_key_stat *key_stat) +{ + ucl_object_t *res; + + res = ucl_object_typed_new(UCL_OBJECT); + + ucl_object_insert_key(res, ucl_object_fromint(key_stat->checked), + "checked", 0, false); + ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->checked_ctr.mean), + "checked_per_hour", 0, false); + ucl_object_insert_key(res, ucl_object_fromint(key_stat->matched), + "matched", 0, false); + ucl_object_insert_key(res, ucl_object_fromdouble(key_stat->matched_ctr.mean), + "matched_per_hour", 0, false); + ucl_object_insert_key(res, ucl_object_fromint(key_stat->added), + "added", 0, false); + ucl_object_insert_key(res, ucl_object_fromint(key_stat->deleted), + "deleted", 0, false); + ucl_object_insert_key(res, ucl_object_fromint(key_stat->errors), + "errors", 0, false); + + return res; +} + +void rspamd_fuzzy_key_stat_iter(const unsigned char *pk_iter, + struct fuzzy_key *fuzzy_key, + ucl_object_t *keys_obj, + gboolean ip_stat) +{ + struct fuzzy_key_stat *key_stat = fuzzy_key->stat; + char keyname[17]; + + if (key_stat) { + rspamd_snprintf(keyname, sizeof(keyname), "%8bs", pk_iter); + + ucl_object_t *elt = rspamd_fuzzy_storage_stat_key(key_stat); + + if (key_stat->last_ips && ip_stat) { + int i = 0; + ucl_object_t *ip_elt = ucl_object_typed_new(UCL_OBJECT); + gpointer k, v; + + while ((i = rspamd_lru_hash_foreach(key_stat->last_ips, + i, &k, &v)) != -1) { + ucl_object_t *ip_cur = rspamd_fuzzy_storage_stat_key(v); + ucl_object_insert_key(ip_elt, ip_cur, + rspamd_inet_address_to_string(k), 0, true); + } + ucl_object_insert_key(elt, ip_elt, "ips", 0, false); + } + + int flag; + struct fuzzy_key_stat *flag_stat; + ucl_object_t *flags_ucl = ucl_object_typed_new(UCL_OBJECT); + + kh_foreach_key_value_ptr(fuzzy_key->flags_stat, flag, flag_stat, { + char intbuf[16]; + rspamd_snprintf(intbuf, sizeof(intbuf), "%d", flag); + ucl_object_insert_key(flags_ucl, rspamd_fuzzy_storage_stat_key(flag_stat), + intbuf, 0, true); + }); + + ucl_object_insert_key(elt, flags_ucl, "flags", 0, false); + + ucl_object_insert_key(elt, + rspamd_keypair_to_ucl(fuzzy_key->key, RSPAMD_KEYPAIR_ENCODING_DEFAULT, + RSPAMD_KEYPAIR_DUMP_NO_SECRET | RSPAMD_KEYPAIR_DUMP_FLATTENED), + "keypair", 0, false); + + if (fuzzy_key->rl_bucket) { + ucl_object_insert_key(elt, + rspamd_leaky_bucket_to_ucl(fuzzy_key->rl_bucket), + "ratelimit", 0, false); + } + + ucl_object_insert_key(keys_obj, elt, keyname, 0, true); + } +} + +ucl_object_t * +rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat) +{ + struct fuzzy_key *fuzzy_key; + ucl_object_t *obj, *keys_obj, *elt, *ip_elt; + const unsigned char *pk_iter; + + obj = ucl_object_typed_new(UCL_OBJECT); + + keys_obj = ucl_object_typed_new(UCL_OBJECT); + + kh_foreach(ctx->keys, pk_iter, fuzzy_key, { + rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat); + }); + + if (ctx->dynamic_keys) { + kh_foreach(ctx->dynamic_keys, pk_iter, fuzzy_key, { + rspamd_fuzzy_key_stat_iter(pk_iter, fuzzy_key, keys_obj, ip_stat); + }); + } + + ucl_object_insert_key(obj, keys_obj, "keys", 0, false); + + /* Now generic stats */ + ucl_object_insert_key(obj, + ucl_object_fromint(ctx->stat.fuzzy_hashes), + "fuzzy_stored", + 0, + false); + ucl_object_insert_key(obj, + ucl_object_fromint(ctx->stat.fuzzy_hashes_expired), + "fuzzy_expired", + 0, + false); + ucl_object_insert_key(obj, + ucl_object_fromint(ctx->stat.invalid_requests), + "invalid_requests", + 0, + false); + ucl_object_insert_key(obj, + ucl_object_fromint(ctx->stat.delayed_hashes), + "delayed_hashes", + 0, + false); + + if (ctx->errors_ips && ip_stat) { + gpointer k, v; + int i = 0; + ip_elt = ucl_object_typed_new(UCL_OBJECT); + + while ((i = rspamd_lru_hash_foreach(ctx->errors_ips, i, &k, &v)) != -1) { + ucl_object_insert_key(ip_elt, + ucl_object_fromint(*(uint64_t *) v), + rspamd_inet_address_to_string(k), 0, true); + } + + ucl_object_insert_key(obj, + ip_elt, + "errors_ips", + 0, + false); + } + + /* Checked by epoch */ + elt = ucl_object_typed_new(UCL_ARRAY); + + for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) { + ucl_array_append(elt, + ucl_object_fromint(ctx->stat.fuzzy_hashes_checked[i])); + } + + ucl_object_insert_key(obj, elt, "fuzzy_checked", 0, false); + + /* Shingles by epoch */ + elt = ucl_object_typed_new(UCL_ARRAY); + + for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) { + ucl_array_append(elt, + ucl_object_fromint(ctx->stat.fuzzy_shingles_checked[i])); + } + + ucl_object_insert_key(obj, elt, "fuzzy_shingles", 0, false); + + /* Matched by epoch */ + elt = ucl_object_typed_new(UCL_ARRAY); + + for (int i = RSPAMD_FUZZY_EPOCH10; i < RSPAMD_FUZZY_EPOCH_MAX; i++) { + ucl_array_append(elt, + ucl_object_fromint(ctx->stat.fuzzy_hashes_found[i])); + } + + ucl_object_insert_key(obj, elt, "fuzzy_found", 0, false); + + + return obj; +} + +gboolean +rspamd_fuzzy_storage_stat(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, int fd, + int attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = ud; + struct rspamd_control_reply rep; + ucl_object_t *obj; + struct ucl_emitter_functions *emit_subr; + unsigned char fdspace[CMSG_SPACE(sizeof(int))]; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + + int outfd = -1; + char tmppath[PATH_MAX]; + + memset(&rep, 0, sizeof(rep)); + rep.type = RSPAMD_CONTROL_FUZZY_STAT; + rep.id = cmd->id; + + rspamd_snprintf(tmppath, sizeof(tmppath), "%s%c%s-XXXXXXXXXX", + rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat"); + + if ((outfd = mkstemp(tmppath)) == -1) { + rep.reply.fuzzy_stat.status = errno; + msg_info_main("cannot make temporary stat file for fuzzy stat: %s", + strerror(errno)); + } + else { + const char *backend_id; + + rep.reply.fuzzy_stat.status = 0; + + backend_id = rspamd_fuzzy_backend_id(ctx->backend); + if (backend_id) { + memcpy(rep.reply.fuzzy_stat.storage_id, + backend_id, + sizeof(rep.reply.fuzzy_stat.storage_id)); + } + + obj = rspamd_fuzzy_stat_to_ucl(ctx, TRUE); + emit_subr = ucl_object_emit_fd_funcs(outfd); + ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL); + ucl_object_emit_funcs_free(emit_subr); + ucl_object_unref(obj); + /* Rewind output file */ + close(outfd); + outfd = open(tmppath, O_RDONLY); + unlink(tmppath); + } + + /* Now we can send outfd and status message */ + memset(&msg, 0, sizeof(msg)); + + /* Attach fd to the message */ + if (outfd != -1) { + memset(fdspace, 0, sizeof(fdspace)); + msg.msg_control = fdspace; + msg.msg_controllen = sizeof(fdspace); + cmsg = CMSG_FIRSTHDR(&msg); + + if (cmsg) { + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + memcpy(CMSG_DATA(cmsg), &outfd, sizeof(int)); + } + } + + iov.iov_base = &rep; + iov.iov_len = sizeof(rep); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + if (sendmsg(fd, &msg, 0) == -1) { + msg_err_main("cannot send fuzzy stat: %s", strerror(errno)); + } + + if (outfd != -1) { + close(outfd); + } + + return TRUE; +}