From: Vsevolod Stakhov Date: Tue, 7 Oct 2025 07:57:04 +0000 (+0100) Subject: [Feature] Fuzzy check: add TCP support with auto-switch X-Git-Tag: 3.14.0~84^2~19 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=e63378f7d3b8b5109a8e74f60e3495068801077e;p=thirdparty%2Frspamd.git [Feature] Fuzzy check: add TCP support with auto-switch Add TCP protocol support to fuzzy check client with rate-based automatic switching between UDP and TCP transports. This enables efficient bulk checking while maintaining UDP fallback. Changes: - Add TCP configuration parameters (enabled, auto, threshold, window, timeout) - Implement sliding window rate tracker for request frequency monitoring - Add TCP connection state tracking (connected, connecting) - Implement fuzzy_should_use_tcp() decision logic - Add fuzzy_update_rate_tracker() for rate tracking - Add fuzzy_tcp_connect_async() placeholder for lazy TCP connection - Integrate TCP/UDP selection in register_fuzzy_client_call() Configuration: tcp = true; # Enable TCP explicitly tcp = { # Auto-switch configuration auto = true; threshold = 1.0; # req/sec threshold window = 1.0; # time window in seconds }; TCP connections are established lazily when rate threshold is exceeded. Falls back to UDP if TCP is not available. --- diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index f538e6cb60..20beda113c 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -108,6 +108,26 @@ struct fuzzy_rule { struct rspamd_hash_map_helper *skip_map; struct fuzzy_ctx *ctx; int lua_id; + + /* TCP configuration */ + gboolean tcp_enabled; /* Explicitly enable TCP */ + gboolean tcp_auto; /* Auto-switch to TCP based on request rate */ + double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */ + double tcp_window; /* Time window for rate calculation in seconds (default: 1.0) */ + double tcp_timeout; /* TCP connection timeout (default: 5.0) */ + + /* Rate tracking for TCP auto-switch */ + struct { + uint32_t requests_count; /* Number of requests in current window */ + ev_tstamp window_start; /* Start of the current window */ + } rate_tracker; + + /* TCP connection state */ + struct { + gboolean connected; /* TCP connection is active and ready */ + gboolean connecting; /* TCP connection in progress */ + void *connection; /* Placeholder for TCP connection handle (future) */ + } tcp_state; }; struct fuzzy_ctx { @@ -382,6 +402,77 @@ fuzzy_free_rule(gpointer r) } } +/** + * Update the rate tracker for TCP auto-switch decision + * Uses a sliding window to track request rate + */ +static void +fuzzy_update_rate_tracker(struct fuzzy_rule *rule, ev_tstamp now) +{ + /* Check if we need to reset the window */ + if (rule->rate_tracker.window_start == 0 || + (now - rule->rate_tracker.window_start) >= rule->tcp_window) { + /* Start new window */ + rule->rate_tracker.window_start = now; + rule->rate_tracker.requests_count = 1; + } + else { + /* Increment counter in current window */ + rule->rate_tracker.requests_count++; + } +} + +/** + * Determine whether to use TCP for this request + * Returns TRUE if TCP should be used, FALSE for UDP + */ +static gboolean +fuzzy_should_use_tcp(struct fuzzy_rule *rule) +{ + /* If TCP is explicitly enabled, always prefer TCP when connected */ + if (rule->tcp_enabled && rule->tcp_state.connected) { + return TRUE; + } + + /* If auto-switch is enabled and TCP is connected */ + if (rule->tcp_auto && rule->tcp_state.connected) { + /* Calculate current rate */ + if (rule->rate_tracker.window_start > 0 && rule->tcp_window > 0) { + ev_tstamp elapsed = rspamd_get_calendar_ticks() - rule->rate_tracker.window_start; + if (elapsed > 0) { + double rate = (double) rule->rate_tracker.requests_count / elapsed; + if (rate > rule->tcp_threshold) { + return TRUE; + } + } + } + } + + return FALSE; +} + +/** + * Initiate asynchronous TCP connection (placeholder) + * This function will be implemented later with actual TCP connection logic + */ +static void +fuzzy_tcp_connect_async(struct fuzzy_rule *rule) +{ + /* Mark connection as in progress */ + if (!rule->tcp_state.connecting && !rule->tcp_state.connected) { + rule->tcp_state.connecting = TRUE; + + /* TODO: Implement actual async TCP connection + * - Create socket + * - Set non-blocking mode + * - Start connect() + * - Register event handler for connection completion + * - On success: set tcp_state.connected = TRUE, connecting = FALSE + * - On failure: set connecting = FALSE, schedule retry + */ + } +} + static int fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, const char *name, int cb_id) @@ -794,6 +885,66 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, rule->weight_threshold = ucl_object_todouble(value); } + /* TCP configuration */ + if ((value = ucl_object_lookup(obj, "tcp")) != NULL) { + const ucl_object_t *tcp_obj; + + if (ucl_object_type(value) == UCL_BOOLEAN) { + /* Simple boolean: enable/disable TCP */ + rule->tcp_enabled = ucl_object_toboolean(value); + rule->tcp_auto = FALSE; + } + else if (ucl_object_type(value) == UCL_OBJECT) { + /* Object with detailed TCP configuration */ + + if ((tcp_obj = ucl_object_lookup(value, "enabled")) != NULL) { + rule->tcp_enabled = ucl_object_toboolean(tcp_obj); + } + + if ((tcp_obj = ucl_object_lookup(value, "auto")) != NULL) { + rule->tcp_auto = ucl_object_toboolean(tcp_obj); + } + + if ((tcp_obj = ucl_object_lookup(value, "threshold")) != NULL) { + rule->tcp_threshold = ucl_object_todouble(tcp_obj); + } + else { + rule->tcp_threshold = 1.0; /* Default: >1 req/sec */ + } + + if ((tcp_obj = ucl_object_lookup(value, "window")) != NULL) { + rule->tcp_window = ucl_object_todouble(tcp_obj); + } + else { + rule->tcp_window = 1.0; /* Default: 1 second window */ + } + + if ((tcp_obj = ucl_object_lookup(value, "timeout")) != NULL) { + rule->tcp_timeout = ucl_object_todouble(tcp_obj); + } + else { + rule->tcp_timeout = 5.0; /* Default: 5 seconds */ + } + } + } + else { + /* Default values if no TCP configuration */ + rule->tcp_enabled = FALSE; + rule->tcp_auto = FALSE; + rule->tcp_threshold = 1.0; + rule->tcp_window = 1.0; + rule->tcp_timeout = 5.0; + } + + /* Initialize rate tracker */ + rule->rate_tracker.requests_count = 0; + rule->rate_tracker.window_start = 0; + + /* Initialize TCP connection state */ + rule->tcp_state.connected = FALSE; + rule->tcp_state.connecting = FALSE; + rule->tcp_state.connection = NULL; + /* * Process rule in Lua */ @@ -3663,12 +3814,44 @@ register_fuzzy_client_call(struct rspamd_task *task, int sock; if (!rspamd_session_blocked(task->s)) { + /* Update rate tracker for TCP auto-switch decision */ + ev_tstamp now = rspamd_get_calendar_ticks(); + fuzzy_update_rate_tracker(rule, now); + + /* Decide whether to use TCP or UDP */ + gboolean use_tcp = fuzzy_should_use_tcp(rule); + int sock_type = SOCK_DGRAM; /* Default to UDP */ + + /* If TCP should be used but not connected, initiate connection */ + if ((rule->tcp_enabled || rule->tcp_auto) && !rule->tcp_state.connected) { + /* Check if rate threshold exceeded for auto-switch */ + if (rule->tcp_auto && rule->tcp_window > 0) { + ev_tstamp elapsed = now - rule->rate_tracker.window_start; + if (elapsed > 0) { + double rate = (double) rule->rate_tracker.requests_count / elapsed; + if (rate > rule->tcp_threshold) { + /* Rate exceeded, initiate TCP connection asynchronously */ + fuzzy_tcp_connect_async(rule); + } + } + } + else if (rule->tcp_enabled) { + /* TCP explicitly enabled, try to connect */ + fuzzy_tcp_connect_async(rule); + } + } + + /* Use TCP if available and should be used, otherwise fall back to UDP */ + if (use_tcp) { + sock_type = SOCK_STREAM; + } + /* Get upstream - use read_servers for check operations */ selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); if (selected) { addr = rspamd_upstream_addr_next(selected); - if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) { + if ((sock = rspamd_inet_address_connect(addr, sock_type, TRUE)) == -1) { msg_warn_task("cannot connect to %s(%s), %d, %s", rspamd_upstream_name(selected), rspamd_inet_address_to_string_pretty(addr),