]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Fuzzy TCP: separate session timeouts from connection failures
authorVsevolod Stakhov <vsevolod@rspamd.com>
Wed, 8 Oct 2025 09:45:04 +0000 (10:45 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Wed, 8 Oct 2025 09:45:04 +0000 (10:45 +0100)
This addresses several timeout handling issues:
- Session timeouts no longer mark entire TCP connection as failed, allowing other sessions to continue
- Made tcp_retry_delay configurable (default: 10.0s)
- Added diagnostic reason strings to all cleanup paths
- Fixed reference counting with proper free_func for connection pool
- Added periodic timeout checks to detect stalled requests
- Unconditional timer cleanup (ev_timer_stop is safe to call)
- Enhanced logging with connection state and elapsed time details

src/plugins/fuzzy_check.c

index d379d8e9869e86d08e8b4cdf0ef8ce32af5bc887..1fb50d14efd26a3bb6ae5c0856a969aa3c0e2624 100644 (file)
@@ -110,11 +110,12 @@ struct fuzzy_rule {
        int lua_id;
 
        /* TCP configuration */
-       gboolean tcp_enabled; /* Explicitly enable TCP */
-       gboolean tcp_auto;    /* Auto-switch to TCP based on request rate */
-       double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */
-       double tcp_window;    /* Time window for rate calculation in seconds (default: 1.0) */
-       double tcp_timeout;   /* TCP connection timeout (default: 5.0) */
+       gboolean tcp_enabled;   /* Explicitly enable TCP */
+       gboolean tcp_auto;      /* Auto-switch to TCP based on request rate */
+       double tcp_threshold;   /* Requests/sec threshold for auto-switch (default: 1.0) */
+       double tcp_window;      /* Time window for rate calculation in seconds (default: 1.0) */
+       double tcp_timeout;     /* TCP connection timeout (default: 5.0) */
+       double tcp_retry_delay; /* Delay before retrying failed TCP connection (default: 10.0) */
 
        /* Rate tracking for TCP auto-switch */
        struct {
@@ -523,7 +524,7 @@ static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn);
 static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
                                                                           GPtrArray *commands,
                                                                           struct fuzzy_client_session *session);
-static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn);
+static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn, const char *reason);
 static gboolean fuzzy_check_session_is_completed(struct fuzzy_client_session *session);
 
 /* Forward declarations for helper functions */
@@ -538,7 +539,7 @@ static void fuzzy_insert_result(struct fuzzy_client_session *session,
 #define FUZZY_TCP_RELEASE(x) REF_RELEASE(x)
 
 /**
- * Free TCP connection resources
+ * Free TCP connection resources (called by reference counting)
  */
 static void
 fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn)
@@ -552,9 +553,23 @@ fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn)
                g_queue_free(conn->write_queue);
        }
 
+       msg_debug("fuzzy_tcp: freeing connection to %s for rule %s",
+                         rspamd_upstream_name(conn->server),
+                         conn->rule->name);
+
        g_free(conn);
 }
 
+/**
+ * Wrapper for g_ptr_array free function - releases reference
+ */
+static void
+fuzzy_tcp_connection_unref(gpointer conn_ptr)
+{
+       struct fuzzy_tcp_connection *conn = (struct fuzzy_tcp_connection *) conn_ptr;
+       FUZZY_TCP_RELEASE(conn);
+}
+
 /**
  * Create new TCP connection structure
  */
@@ -646,9 +661,7 @@ fuzzy_tcp_connect_async(struct fuzzy_rule *rule,
                                                   fuzzy_tcp_io_handler, conn);
        rspamd_ev_watcher_start(conn->event_loop, &conn->ev, rule->tcp_timeout);
 
-       FUZZY_TCP_RETAIN(conn);
-
-       /* Store in connection pool array */
+       /* Store in connection pool array - array takes ownership of initial reference */
        g_ptr_array_add(rule->tcp_connections, conn);
 
        msg_info_task("fuzzy_tcp: initiating connection to %s for rule %s",
@@ -685,10 +698,14 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
                /* Connection exists - check state */
                if (conn->connected) {
                        /* Ready to use */
+                       msg_debug_task("fuzzy_tcp: reusing established connection to %s for rule %s",
+                                                  rspamd_upstream_name(upstream), rule->name);
                        return conn;
                }
                else if (conn->connecting) {
                        /* Connection in progress - cannot use yet */
+                       msg_debug_task("fuzzy_tcp: connection to %s is still connecting, using UDP fallback",
+                                                  rspamd_upstream_name(upstream));
                        return NULL;
                }
                else if (conn->failed) {
@@ -696,19 +713,22 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
                        ev_tstamp now = rspamd_get_calendar_ticks();
                        ev_tstamp time_since_failure = now - conn->last_failure;
 
-                       if (time_since_failure < 10.0) {
+                       if (time_since_failure < rule->tcp_retry_delay) {
                                /* Recent failure - don't retry TCP yet, fallback to UDP */
-                               msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s, using UDP fallback",
+                               msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s (retry_delay=%.1fs), using UDP fallback",
                                                           time_since_failure,
-                                                          rspamd_upstream_name(upstream));
+                                                          rspamd_upstream_name(upstream),
+                                                          rule->tcp_retry_delay);
                                return NULL;
                        }
                        else {
                                /* Old failure - remove and try reconnecting */
-                               msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s, retrying TCP",
+                               msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s (retry_delay=%.1fs), retrying TCP",
                                                          time_since_failure,
-                                                         rspamd_upstream_name(upstream));
+                                                         rspamd_upstream_name(upstream),
+                                                         rule->tcp_retry_delay);
                                g_ptr_array_remove(rule->tcp_connections, conn);
+                               FUZZY_TCP_RELEASE(conn); /* Release reference held by array */
                                conn = NULL;
                        }
                }
@@ -725,9 +745,10 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
 /**
  * Cleanup pending requests for a failed connection
  * Removes all pending commands associated with this connection
+ * This is called only for connection-level failures (socket errors, not session timeouts)
  */
 static void
-fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
+fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn, const char *reason)
 {
        GHashTableIter iter;
        gpointer key, value;
@@ -741,6 +762,11 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
                return;
        }
 
+       msg_info("fuzzy_tcp: cleaning up connection to %s for rule %s, reason: %s",
+                        rspamd_upstream_name(conn->server),
+                        conn->rule->name,
+                        reason ? reason : "unknown");
+
        /* Collect commands to remove and sessions to check */
        to_remove = g_ptr_array_new();
        sessions_to_check = g_hash_table_new(g_direct_hash, g_direct_equal);
@@ -776,8 +802,8 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
        }
 
        if (to_remove->len > 0 && task) {
-               msg_info_task("fuzzy_tcp: cleaned up %d pending commands for failed connection to %s",
-                                         (int) to_remove->len, rspamd_upstream_name(conn->server));
+               msg_warn_task("fuzzy_tcp: cleaned up %d pending commands due to connection failure: %s",
+                                         (int) to_remove->len, reason ? reason : "unknown");
        }
 
        /* Check session completion for all affected sessions */
@@ -884,15 +910,19 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
        conn->last_activity = rspamd_get_calendar_ticks();
 
        if (what == EV_TIMEOUT) {
-               msg_warn("fuzzy_tcp: connection timeout for rule %s to %s",
+               ev_tstamp elapsed = rspamd_get_calendar_ticks() - conn->connect_start;
+               msg_warn("fuzzy_tcp: connection timeout for rule %s to %s after %.2fs (connecting=%d, connected=%d)",
                                 conn->rule->name,
-                                rspamd_upstream_name(conn->server));
-               fuzzy_tcp_connection_cleanup(conn);
-               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+                                rspamd_upstream_name(conn->server),
+                                elapsed,
+                                conn->connecting,
+                                conn->connected);
                conn->failed = TRUE;
                conn->last_failure = rspamd_get_calendar_ticks();
                conn->connecting = FALSE;
-               rspamd_upstream_fail(conn->server, TRUE, "timeout");
+               fuzzy_tcp_connection_cleanup(conn, "connection timeout");
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+               rspamd_upstream_fail(conn->server, TRUE, "connection timeout");
                FUZZY_TCP_RELEASE(conn);
                return;
        }
@@ -906,26 +936,28 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                                                 conn->rule->name,
                                                 rspamd_upstream_name(conn->server),
                                                 strerror(errno));
-                               fuzzy_tcp_connection_cleanup(conn);
-                               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                                conn->failed = TRUE;
                                conn->last_failure = rspamd_get_calendar_ticks();
                                conn->connecting = FALSE;
+                               fuzzy_tcp_connection_cleanup(conn, "getsockopt failed");
+                               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                                rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
                                FUZZY_TCP_RELEASE(conn);
                                return;
                        }
 
                        if (so_error != 0) {
+                               char error_buf[128];
+                               rspamd_snprintf(error_buf, sizeof(error_buf), "connect error: %s", strerror(so_error));
                                msg_warn("fuzzy_tcp: connection failed for rule %s to %s: %s",
                                                 conn->rule->name,
                                                 rspamd_upstream_name(conn->server),
                                                 strerror(so_error));
-                               fuzzy_tcp_connection_cleanup(conn);
-                               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                                conn->failed = TRUE;
                                conn->last_failure = rspamd_get_calendar_ticks();
                                conn->connecting = FALSE;
+                               fuzzy_tcp_connection_cleanup(conn, error_buf);
+                               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                                rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
                                FUZZY_TCP_RELEASE(conn);
                                return;
@@ -934,10 +966,14 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                        /* Connection established! */
                        conn->connected = TRUE;
                        conn->connecting = FALSE;
+                       ev_tstamp elapsed = rspamd_get_calendar_ticks() - conn->connect_start;
 
-                       msg_info("fuzzy_tcp: connection established to %s for rule %s (fd=%d, ev.io.fd=%d)",
+                       msg_info("fuzzy_tcp: connection established to %s for rule %s in %.3fs (fd=%d, encrypted=%d)",
                                         rspamd_inet_address_to_string_pretty(conn->addr),
-                                        conn->rule->name, conn->fd, (int) conn->ev.io.fd);
+                                        conn->rule->name,
+                                        elapsed,
+                                        conn->fd,
+                                        conn->encrypted);
 
                        rspamd_upstream_ok(conn->server);
 
@@ -1001,24 +1037,26 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
                                /* Cannot write more, wait for next event */
                                return;
                        }
+                       char error_buf[128];
+                       rspamd_snprintf(error_buf, sizeof(error_buf), "write error: %s", strerror(errno));
                        msg_err("fuzzy_tcp: write error for rule %s to %s: %s",
                                        conn->rule->name,
                                        rspamd_upstream_name(conn->server),
                                        strerror(errno));
-                       fuzzy_tcp_connection_cleanup(conn);
-                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        conn->failed = TRUE;
                        conn->last_failure = rspamd_get_calendar_ticks();
+                       fuzzy_tcp_connection_cleanup(conn, error_buf);
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        return;
                }
                else if (r == 0) {
                        msg_info("fuzzy_tcp: connection closed by %s for rule %s",
                                         rspamd_upstream_name(conn->server),
                                         conn->rule->name);
-                       fuzzy_tcp_connection_cleanup(conn);
-                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        conn->failed = TRUE;
                        conn->last_failure = rspamd_get_calendar_ticks();
+                       fuzzy_tcp_connection_cleanup(conn, "connection closed by peer");
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        return;
                }
 
@@ -1293,10 +1331,10 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
        if (available_space == 0) {
                msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
                                conn->rule->name);
-               fuzzy_tcp_connection_cleanup(conn);
-               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
                conn->last_failure = rspamd_get_calendar_ticks();
+               fuzzy_tcp_connection_cleanup(conn, "read buffer overflow");
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                return;
        }
 
@@ -1306,22 +1344,24 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
                        return; /* Try again later */
                }
+               char error_buf[128];
+               rspamd_snprintf(error_buf, sizeof(error_buf), "read error: %s", strerror(errno));
                msg_err("fuzzy_tcp: read error for rule %s: %s",
                                conn->rule->name, strerror(errno));
-               fuzzy_tcp_connection_cleanup(conn);
-               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
                conn->last_failure = rspamd_get_calendar_ticks();
+               fuzzy_tcp_connection_cleanup(conn, error_buf);
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                return;
        }
        else if (r == 0) {
                msg_info("fuzzy_tcp: connection closed by %s for rule %s",
                                 rspamd_upstream_name(conn->server),
                                 conn->rule->name);
-               fuzzy_tcp_connection_cleanup(conn);
-               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
                conn->last_failure = rspamd_get_calendar_ticks();
+               fuzzy_tcp_connection_cleanup(conn, "connection closed by peer");
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                return;
        }
 
@@ -1362,13 +1402,15 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                frame_len = conn->cur_frame_state & 0x3FFF;
 
                if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_reply)) {
+                       char error_buf[128];
+                       rspamd_snprintf(error_buf, sizeof(error_buf), "invalid frame length: %d", (int) frame_len);
                        msg_err("fuzzy_tcp: invalid frame length %d from %s, closing",
                                        (int) frame_len,
                                        rspamd_upstream_name(conn->server));
-                       fuzzy_tcp_connection_cleanup(conn);
-                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        conn->failed = TRUE;
                        conn->last_failure = rspamd_get_calendar_ticks();
+                       fuzzy_tcp_connection_cleanup(conn, error_buf);
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        return;
                }
 
@@ -1582,6 +1624,13 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
                        else {
                                rule->tcp_timeout = 5.0; /* Default: 5 seconds */
                        }
+
+                       if ((tcp_obj = ucl_object_lookup(value, "retry_delay")) != NULL) {
+                               rule->tcp_retry_delay = ucl_object_todouble(tcp_obj);
+                       }
+                       else {
+                               rule->tcp_retry_delay = 10.0; /* Default: 10 seconds */
+                       }
                }
        }
        else {
@@ -1591,6 +1640,7 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
                rule->tcp_threshold = 1.0;
                rule->tcp_window = 1.0;
                rule->tcp_timeout = 5.0;
+               rule->tcp_retry_delay = 10.0;
        }
 
 
@@ -1883,8 +1933,8 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
        rule->rate_tracker.requests_count = 0;
        rule->rate_tracker.window_start = 0;
 
-       /* Initialize TCP connection pool - array of connections */
-       rule->tcp_connections = g_ptr_array_new();
+       /* Initialize TCP connection pool - array of connections with proper free function */
+       rule->tcp_connections = g_ptr_array_new_with_free_func(fuzzy_tcp_connection_unref);
 
        /* Initialize global pending requests pool - keyed by tag */
        rule->pending_requests = g_hash_table_new_full(g_direct_hash, g_direct_equal,
@@ -2660,10 +2710,8 @@ fuzzy_io_fin(void *ud)
        if (session->fd == -1) {
                /* TCP session - cleanup pending requests and stop timer */
                fuzzy_tcp_session_cleanup(session);
-               /* Stop pure timer (no IO) */
-               if (ev_is_active(&session->timer_ev.tm)) {
-                       ev_timer_stop(session->event_loop, &session->timer_ev.tm);
-               }
+               /* Stop pure timer (no IO) - safe to call even if not active */
+               ev_timer_stop(session->event_loop, &session->timer_ev.tm);
        }
 
        if (session->commands) {
@@ -4219,7 +4267,7 @@ fuzzy_tcp_timer_libev_cb(EV_P_ struct ev_timer *w, int revents)
        ev->cb(-1, EV_TIMER, ev->ud);
 }
 
-/* TCP timeout callback - no retransmits needed, connection is established */
+/* TCP timeout callback - session-level timeout, does NOT mark connection as failed */
 static void
 fuzzy_tcp_timer_callback(int fd, short what, void *arg)
 {
@@ -4227,6 +4275,10 @@ fuzzy_tcp_timer_callback(int fd, short what, void *arg)
        struct rspamd_task *task = session->task;
        struct fuzzy_cmd_io *io;
        unsigned int i, nreplied = 0;
+       ev_tstamp now = rspamd_get_calendar_ticks();
+
+       /* Check pending timeouts for all requests in this rule (periodic check) */
+       fuzzy_tcp_check_pending_timeouts(session->rule, now);
 
        /* Check if all commands have been replied */
        for (i = 0; i < session->commands->len; i++) {
@@ -4243,8 +4295,8 @@ fuzzy_tcp_timer_callback(int fd, short what, void *arg)
                return;
        }
 
-       /* Timeout - just fail the request, don't retry via UDP */
-       msg_warn_task("fuzzy_tcp: timeout waiting for replies from %s (%d/%d replied), giving up",
+       /* Session timeout - mark unreplied commands as failed for this session only */
+       msg_warn_task("fuzzy_tcp: session timeout waiting for replies from %s (%d/%d replied)",
                                  rspamd_upstream_name(session->server),
                                  nreplied, (int) session->commands->len);
 
@@ -4256,29 +4308,19 @@ fuzzy_tcp_timer_callback(int fd, short what, void *arg)
                }
        }
 
-       rspamd_upstream_fail(session->server, TRUE, "timeout");
+       /* Report upstream issue but don't mark as completely failed */
+       rspamd_upstream_fail(session->server, FALSE, "session timeout");
 
-       /* Mark TCP connection as failed so future requests use UDP for ~10 seconds */
-       for (i = 0; i < session->rule->tcp_connections->len; i++) {
-               struct fuzzy_tcp_connection *conn = g_ptr_array_index(session->rule->tcp_connections, i);
-               if (conn->server == session->server) {
-                       conn->failed = TRUE;
-                       conn->last_failure = rspamd_get_calendar_ticks();
-                       conn->connected = FALSE;
-                       msg_info_task("fuzzy_tcp: marked connection to %s as failed, switching to UDP for 10s",
-                                                 rspamd_upstream_name(session->server));
-                       break;
-               }
-       }
+       /* NOTE: We do NOT mark connection->failed = TRUE here!
+        * Session timeout is not a connection failure - other sessions may still succeed.
+        * Connection failures (socket errors) are handled in IO handlers.
+        */
 
-       /* Clean up TCP session - stop timer and remove event */
-       /* Remove any pending TCP requests for this session */
+       /* Clean up pending requests for THIS session only */
        fuzzy_tcp_session_cleanup(session);
 
-       /* Stop pure timer (no IO) */
-       if (ev_is_active(&session->timer_ev.tm)) {
-               ev_timer_stop(session->event_loop, &session->timer_ev.tm);
-       }
+       /* Stop pure timer (no IO) - safe to call even if not active */
+       ev_timer_stop(session->event_loop, &session->timer_ev.tm);
 
        /* Decrement async counter for TCP session */
        if (session->item) {