]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Fuzzy check: add TCP support with auto-switch
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 07:57:04 +0000 (08:57 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 07:57:04 +0000 (08:57 +0100)
Add TCP protocol support to fuzzy check client with rate-based
automatic switching between UDP and TCP transports. This enables
efficient bulk checking while maintaining UDP fallback.

Changes:
- Add TCP configuration parameters (enabled, auto, threshold, window, timeout)
- Implement sliding window rate tracker for request frequency monitoring
- Add TCP connection state tracking (connected, connecting)
- Implement fuzzy_should_use_tcp() decision logic
- Add fuzzy_update_rate_tracker() for rate tracking
- Add fuzzy_tcp_connect_async() placeholder for lazy TCP connection
- Integrate TCP/UDP selection in register_fuzzy_client_call()

Configuration:
  tcp = true;  # Enable TCP explicitly
  tcp = {      # Auto-switch configuration
    auto = true;
    threshold = 1.0;  # req/sec threshold
    window = 1.0;     # time window in seconds
  };

TCP connections are established lazily when rate threshold is exceeded.
Falls back to UDP if TCP is not available.

src/plugins/fuzzy_check.c

index f538e6cb60de477ea14b8b926542325e48660672..20beda113c6e0b023f7d3007578e92df2c1086b2 100644 (file)
@@ -108,6 +108,26 @@ struct fuzzy_rule {
        struct rspamd_hash_map_helper *skip_map;
        struct fuzzy_ctx *ctx;
        int lua_id;
+
+       /* TCP configuration */
+       gboolean tcp_enabled; /* Explicitly enable TCP */
+       gboolean tcp_auto;    /* Auto-switch to TCP based on request rate */
+       double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */
+       double tcp_window;    /* Time window for rate calculation in seconds (default: 1.0) */
+       double tcp_timeout;   /* TCP connection timeout (default: 5.0) */
+
+       /* Rate tracking for TCP auto-switch */
+       struct {
+               uint32_t requests_count; /* Number of requests in current window */
+               ev_tstamp window_start;  /* Start of the current window */
+       } rate_tracker;
+
+       /* TCP connection state */
+       struct {
+               gboolean connected;  /* TCP connection is active and ready */
+               gboolean connecting; /* TCP connection in progress */
+               void *connection;    /* Placeholder for TCP connection handle (future) */
+       } tcp_state;
 };
 
 struct fuzzy_ctx {
@@ -382,6 +402,77 @@ fuzzy_free_rule(gpointer r)
        }
 }
 
+/**
+ * Update the rate tracker for TCP auto-switch decision
+ * Uses a sliding window to track request rate
+ */
+static void
+fuzzy_update_rate_tracker(struct fuzzy_rule *rule, ev_tstamp now)
+{
+       /* Check if we need to reset the window */
+       if (rule->rate_tracker.window_start == 0 ||
+               (now - rule->rate_tracker.window_start) >= rule->tcp_window) {
+               /* Start new window */
+               rule->rate_tracker.window_start = now;
+               rule->rate_tracker.requests_count = 1;
+       }
+       else {
+               /* Increment counter in current window */
+               rule->rate_tracker.requests_count++;
+       }
+}
+
+/**
+ * Determine whether to use TCP for this request
+ * Returns TRUE if TCP should be used, FALSE for UDP
+ */
+static gboolean
+fuzzy_should_use_tcp(struct fuzzy_rule *rule)
+{
+       /* If TCP is explicitly enabled, always prefer TCP when connected */
+       if (rule->tcp_enabled && rule->tcp_state.connected) {
+               return TRUE;
+       }
+
+       /* If auto-switch is enabled and TCP is connected */
+       if (rule->tcp_auto && rule->tcp_state.connected) {
+               /* Calculate current rate */
+               if (rule->rate_tracker.window_start > 0 && rule->tcp_window > 0) {
+                       ev_tstamp elapsed = rspamd_get_calendar_ticks() - rule->rate_tracker.window_start;
+                       if (elapsed > 0) {
+                               double rate = (double) rule->rate_tracker.requests_count / elapsed;
+                               if (rate > rule->tcp_threshold) {
+                                       return TRUE;
+                               }
+                       }
+               }
+       }
+
+       return FALSE;
+}
+
+/**
+ * Initiate asynchronous TCP connection (placeholder)
+ * This function will be implemented later with actual TCP connection logic
+ */
+static void
+fuzzy_tcp_connect_async(struct fuzzy_rule *rule)
+{
+       /* Mark connection as in progress */
+       if (!rule->tcp_state.connecting && !rule->tcp_state.connected) {
+               rule->tcp_state.connecting = TRUE;
+
+               /* TODO: Implement actual async TCP connection
+                * - Create socket
+                * - Set non-blocking mode
+                * - Start connect()
+                * - Register event handler for connection completion
+                * - On success: set tcp_state.connected = TRUE, connecting = FALSE
+                * - On failure: set connecting = FALSE, schedule retry
+                */
+       }
+}
+
 static int
 fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
                                 const char *name, int cb_id)
@@ -794,6 +885,66 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
                rule->weight_threshold = ucl_object_todouble(value);
        }
 
+       /* TCP configuration */
+       if ((value = ucl_object_lookup(obj, "tcp")) != NULL) {
+               const ucl_object_t *tcp_obj;
+
+               if (ucl_object_type(value) == UCL_BOOLEAN) {
+                       /* Simple boolean: enable/disable TCP */
+                       rule->tcp_enabled = ucl_object_toboolean(value);
+                       rule->tcp_auto = FALSE;
+               }
+               else if (ucl_object_type(value) == UCL_OBJECT) {
+                       /* Object with detailed TCP configuration */
+
+                       if ((tcp_obj = ucl_object_lookup(value, "enabled")) != NULL) {
+                               rule->tcp_enabled = ucl_object_toboolean(tcp_obj);
+                       }
+
+                       if ((tcp_obj = ucl_object_lookup(value, "auto")) != NULL) {
+                               rule->tcp_auto = ucl_object_toboolean(tcp_obj);
+                       }
+
+                       if ((tcp_obj = ucl_object_lookup(value, "threshold")) != NULL) {
+                               rule->tcp_threshold = ucl_object_todouble(tcp_obj);
+                       }
+                       else {
+                               rule->tcp_threshold = 1.0; /* Default: >1 req/sec */
+                       }
+
+                       if ((tcp_obj = ucl_object_lookup(value, "window")) != NULL) {
+                               rule->tcp_window = ucl_object_todouble(tcp_obj);
+                       }
+                       else {
+                               rule->tcp_window = 1.0; /* Default: 1 second window */
+                       }
+
+                       if ((tcp_obj = ucl_object_lookup(value, "timeout")) != NULL) {
+                               rule->tcp_timeout = ucl_object_todouble(tcp_obj);
+                       }
+                       else {
+                               rule->tcp_timeout = 5.0; /* Default: 5 seconds */
+                       }
+               }
+       }
+       else {
+               /* Default values if no TCP configuration */
+               rule->tcp_enabled = FALSE;
+               rule->tcp_auto = FALSE;
+               rule->tcp_threshold = 1.0;
+               rule->tcp_window = 1.0;
+               rule->tcp_timeout = 5.0;
+       }
+
+       /* Initialize rate tracker */
+       rule->rate_tracker.requests_count = 0;
+       rule->rate_tracker.window_start = 0;
+
+       /* Initialize TCP connection state */
+       rule->tcp_state.connected = FALSE;
+       rule->tcp_state.connecting = FALSE;
+       rule->tcp_state.connection = NULL;
+
        /*
         * Process rule in Lua
         */
@@ -3663,12 +3814,44 @@ register_fuzzy_client_call(struct rspamd_task *task,
        int sock;
 
        if (!rspamd_session_blocked(task->s)) {
+               /* Update rate tracker for TCP auto-switch decision */
+               ev_tstamp now = rspamd_get_calendar_ticks();
+               fuzzy_update_rate_tracker(rule, now);
+
+               /* Decide whether to use TCP or UDP */
+               gboolean use_tcp = fuzzy_should_use_tcp(rule);
+               int sock_type = SOCK_DGRAM; /* Default to UDP */
+
+               /* If TCP should be used but not connected, initiate connection */
+               if ((rule->tcp_enabled || rule->tcp_auto) && !rule->tcp_state.connected) {
+                       /* Check if rate threshold exceeded for auto-switch */
+                       if (rule->tcp_auto && rule->tcp_window > 0) {
+                               ev_tstamp elapsed = now - rule->rate_tracker.window_start;
+                               if (elapsed > 0) {
+                                       double rate = (double) rule->rate_tracker.requests_count / elapsed;
+                                       if (rate > rule->tcp_threshold) {
+                                               /* Rate exceeded, initiate TCP connection asynchronously */
+                                               fuzzy_tcp_connect_async(rule);
+                                       }
+                               }
+                       }
+                       else if (rule->tcp_enabled) {
+                               /* TCP explicitly enabled, try to connect */
+                               fuzzy_tcp_connect_async(rule);
+                       }
+               }
+
+               /* Use TCP if available and should be used, otherwise fall back to UDP */
+               if (use_tcp) {
+                       sock_type = SOCK_STREAM;
+               }
+
                /* Get upstream - use read_servers for check operations */
                selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
                                                                           NULL, 0);
                if (selected) {
                        addr = rspamd_upstream_addr_next(selected);
-                       if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
+                       if ((sock = rspamd_inet_address_connect(addr, sock_type, TRUE)) == -1) {
                                msg_warn_task("cannot connect to %s(%s), %d, %s",
                                                          rspamd_upstream_name(selected),
                                                          rspamd_inet_address_to_string_pretty(addr),