From: Vsevolod Stakhov Date: Wed, 8 Oct 2025 09:45:04 +0000 (+0100) Subject: [Fix] Fuzzy TCP: separate session timeouts from connection failures X-Git-Tag: 3.14.0~84^2~6 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=1c5d88228634284629be2eb5be08632ae8f88910;p=thirdparty%2Frspamd.git [Fix] Fuzzy TCP: separate session timeouts from connection failures This addresses several timeout handling issues: - Session timeouts no longer mark entire TCP connection as failed, allowing other sessions to continue - Made tcp_retry_delay configurable (default: 10.0s) - Added diagnostic reason strings to all cleanup paths - Fixed reference counting with proper free_func for connection pool - Added periodic timeout checks to detect stalled requests - Unconditional timer cleanup (ev_timer_stop is safe to call) - Enhanced logging with connection state and elapsed time details --- diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index d379d8e986..1fb50d14ef 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -110,11 +110,12 @@ struct fuzzy_rule { int lua_id; /* TCP configuration */ - gboolean tcp_enabled; /* Explicitly enable TCP */ - gboolean tcp_auto; /* Auto-switch to TCP based on request rate */ - double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */ - double tcp_window; /* Time window for rate calculation in seconds (default: 1.0) */ - double tcp_timeout; /* TCP connection timeout (default: 5.0) */ + gboolean tcp_enabled; /* Explicitly enable TCP */ + gboolean tcp_auto; /* Auto-switch to TCP based on request rate */ + double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */ + double tcp_window; /* Time window for rate calculation in seconds (default: 1.0) */ + double tcp_timeout; /* TCP connection timeout (default: 5.0) */ + double tcp_retry_delay; /* Delay before retrying failed TCP connection (default: 10.0) */ /* Rate tracking for TCP auto-switch */ struct { @@ -523,7 +524,7 @@ static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn); static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn, GPtrArray *commands, struct fuzzy_client_session *session); -static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn); +static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn, const char *reason); static gboolean fuzzy_check_session_is_completed(struct fuzzy_client_session *session); /* Forward declarations for helper functions */ @@ -538,7 +539,7 @@ static void fuzzy_insert_result(struct fuzzy_client_session *session, #define FUZZY_TCP_RELEASE(x) REF_RELEASE(x) /** - * Free TCP connection resources + * Free TCP connection resources (called by reference counting) */ static void fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn) @@ -552,9 +553,23 @@ fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn) g_queue_free(conn->write_queue); } + msg_debug("fuzzy_tcp: freeing connection to %s for rule %s", + rspamd_upstream_name(conn->server), + conn->rule->name); + g_free(conn); } +/** + * Wrapper for g_ptr_array free function - releases reference + */ +static void +fuzzy_tcp_connection_unref(gpointer conn_ptr) +{ + struct fuzzy_tcp_connection *conn = (struct fuzzy_tcp_connection *) conn_ptr; + FUZZY_TCP_RELEASE(conn); +} + /** * Create new TCP connection structure */ @@ -646,9 +661,7 @@ fuzzy_tcp_connect_async(struct fuzzy_rule *rule, fuzzy_tcp_io_handler, conn); rspamd_ev_watcher_start(conn->event_loop, &conn->ev, rule->tcp_timeout); - FUZZY_TCP_RETAIN(conn); - - /* Store in connection pool array */ + /* Store in connection pool array - array takes ownership of initial reference */ g_ptr_array_add(rule->tcp_connections, conn); msg_info_task("fuzzy_tcp: initiating connection to %s for rule %s", @@ -685,10 +698,14 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule, /* Connection exists - check state */ if (conn->connected) { /* Ready to use */ + msg_debug_task("fuzzy_tcp: reusing established connection to %s for rule %s", + rspamd_upstream_name(upstream), rule->name); return conn; } else if (conn->connecting) { /* Connection in progress - cannot use yet */ + msg_debug_task("fuzzy_tcp: connection to %s is still connecting, using UDP fallback", + rspamd_upstream_name(upstream)); return NULL; } else if (conn->failed) { @@ -696,19 +713,22 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule, ev_tstamp now = rspamd_get_calendar_ticks(); ev_tstamp time_since_failure = now - conn->last_failure; - if (time_since_failure < 10.0) { + if (time_since_failure < rule->tcp_retry_delay) { /* Recent failure - don't retry TCP yet, fallback to UDP */ - msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s, using UDP fallback", + msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s (retry_delay=%.1fs), using UDP fallback", time_since_failure, - rspamd_upstream_name(upstream)); + rspamd_upstream_name(upstream), + rule->tcp_retry_delay); return NULL; } else { /* Old failure - remove and try reconnecting */ - msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s, retrying TCP", + msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s (retry_delay=%.1fs), retrying TCP", time_since_failure, - rspamd_upstream_name(upstream)); + rspamd_upstream_name(upstream), + rule->tcp_retry_delay); g_ptr_array_remove(rule->tcp_connections, conn); + FUZZY_TCP_RELEASE(conn); /* Release reference held by array */ conn = NULL; } } @@ -725,9 +745,10 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule, /** * Cleanup pending requests for a failed connection * Removes all pending commands associated with this connection + * This is called only for connection-level failures (socket errors, not session timeouts) */ static void -fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn) +fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn, const char *reason) { GHashTableIter iter; gpointer key, value; @@ -741,6 +762,11 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn) return; } + msg_info("fuzzy_tcp: cleaning up connection to %s for rule %s, reason: %s", + rspamd_upstream_name(conn->server), + conn->rule->name, + reason ? reason : "unknown"); + /* Collect commands to remove and sessions to check */ to_remove = g_ptr_array_new(); sessions_to_check = g_hash_table_new(g_direct_hash, g_direct_equal); @@ -776,8 +802,8 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn) } if (to_remove->len > 0 && task) { - msg_info_task("fuzzy_tcp: cleaned up %d pending commands for failed connection to %s", - (int) to_remove->len, rspamd_upstream_name(conn->server)); + msg_warn_task("fuzzy_tcp: cleaned up %d pending commands due to connection failure: %s", + (int) to_remove->len, reason ? reason : "unknown"); } /* Check session completion for all affected sessions */ @@ -884,15 +910,19 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) conn->last_activity = rspamd_get_calendar_ticks(); if (what == EV_TIMEOUT) { - msg_warn("fuzzy_tcp: connection timeout for rule %s to %s", + ev_tstamp elapsed = rspamd_get_calendar_ticks() - conn->connect_start; + msg_warn("fuzzy_tcp: connection timeout for rule %s to %s after %.2fs (connecting=%d, connected=%d)", conn->rule->name, - rspamd_upstream_name(conn->server)); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); + rspamd_upstream_name(conn->server), + elapsed, + conn->connecting, + conn->connected); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); conn->connecting = FALSE; - rspamd_upstream_fail(conn->server, TRUE, "timeout"); + fuzzy_tcp_connection_cleanup(conn, "connection timeout"); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); + rspamd_upstream_fail(conn->server, TRUE, "connection timeout"); FUZZY_TCP_RELEASE(conn); return; } @@ -906,26 +936,28 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) conn->rule->name, rspamd_upstream_name(conn->server), strerror(errno)); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); conn->connecting = FALSE; + fuzzy_tcp_connection_cleanup(conn, "getsockopt failed"); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed"); FUZZY_TCP_RELEASE(conn); return; } if (so_error != 0) { + char error_buf[128]; + rspamd_snprintf(error_buf, sizeof(error_buf), "connect error: %s", strerror(so_error)); msg_warn("fuzzy_tcp: connection failed for rule %s to %s: %s", conn->rule->name, rspamd_upstream_name(conn->server), strerror(so_error)); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); conn->connecting = FALSE; + fuzzy_tcp_connection_cleanup(conn, error_buf); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); rspamd_upstream_fail(conn->server, TRUE, strerror(so_error)); FUZZY_TCP_RELEASE(conn); return; @@ -934,10 +966,14 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) /* Connection established! */ conn->connected = TRUE; conn->connecting = FALSE; + ev_tstamp elapsed = rspamd_get_calendar_ticks() - conn->connect_start; - msg_info("fuzzy_tcp: connection established to %s for rule %s (fd=%d, ev.io.fd=%d)", + msg_info("fuzzy_tcp: connection established to %s for rule %s in %.3fs (fd=%d, encrypted=%d)", rspamd_inet_address_to_string_pretty(conn->addr), - conn->rule->name, conn->fd, (int) conn->ev.io.fd); + conn->rule->name, + elapsed, + conn->fd, + conn->encrypted); rspamd_upstream_ok(conn->server); @@ -1001,24 +1037,26 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn) /* Cannot write more, wait for next event */ return; } + char error_buf[128]; + rspamd_snprintf(error_buf, sizeof(error_buf), "write error: %s", strerror(errno)); msg_err("fuzzy_tcp: write error for rule %s to %s: %s", conn->rule->name, rspamd_upstream_name(conn->server), strerror(errno)); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); + fuzzy_tcp_connection_cleanup(conn, error_buf); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); return; } else if (r == 0) { msg_info("fuzzy_tcp: connection closed by %s for rule %s", rspamd_upstream_name(conn->server), conn->rule->name); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); + fuzzy_tcp_connection_cleanup(conn, "connection closed by peer"); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); return; } @@ -1293,10 +1331,10 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) if (available_space == 0) { msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection", conn->rule->name); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); + fuzzy_tcp_connection_cleanup(conn, "read buffer overflow"); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); return; } @@ -1306,22 +1344,24 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { return; /* Try again later */ } + char error_buf[128]; + rspamd_snprintf(error_buf, sizeof(error_buf), "read error: %s", strerror(errno)); msg_err("fuzzy_tcp: read error for rule %s: %s", conn->rule->name, strerror(errno)); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); + fuzzy_tcp_connection_cleanup(conn, error_buf); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); return; } else if (r == 0) { msg_info("fuzzy_tcp: connection closed by %s for rule %s", rspamd_upstream_name(conn->server), conn->rule->name); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); + fuzzy_tcp_connection_cleanup(conn, "connection closed by peer"); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); return; } @@ -1362,13 +1402,15 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) frame_len = conn->cur_frame_state & 0x3FFF; if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_reply)) { + char error_buf[128]; + rspamd_snprintf(error_buf, sizeof(error_buf), "invalid frame length: %d", (int) frame_len); msg_err("fuzzy_tcp: invalid frame length %d from %s, closing", (int) frame_len, rspamd_upstream_name(conn->server)); - fuzzy_tcp_connection_cleanup(conn); - rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); conn->failed = TRUE; conn->last_failure = rspamd_get_calendar_ticks(); + fuzzy_tcp_connection_cleanup(conn, error_buf); + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); return; } @@ -1582,6 +1624,13 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, else { rule->tcp_timeout = 5.0; /* Default: 5 seconds */ } + + if ((tcp_obj = ucl_object_lookup(value, "retry_delay")) != NULL) { + rule->tcp_retry_delay = ucl_object_todouble(tcp_obj); + } + else { + rule->tcp_retry_delay = 10.0; /* Default: 10 seconds */ + } } } else { @@ -1591,6 +1640,7 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, rule->tcp_threshold = 1.0; rule->tcp_window = 1.0; rule->tcp_timeout = 5.0; + rule->tcp_retry_delay = 10.0; } @@ -1883,8 +1933,8 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, rule->rate_tracker.requests_count = 0; rule->rate_tracker.window_start = 0; - /* Initialize TCP connection pool - array of connections */ - rule->tcp_connections = g_ptr_array_new(); + /* Initialize TCP connection pool - array of connections with proper free function */ + rule->tcp_connections = g_ptr_array_new_with_free_func(fuzzy_tcp_connection_unref); /* Initialize global pending requests pool - keyed by tag */ rule->pending_requests = g_hash_table_new_full(g_direct_hash, g_direct_equal, @@ -2660,10 +2710,8 @@ fuzzy_io_fin(void *ud) if (session->fd == -1) { /* TCP session - cleanup pending requests and stop timer */ fuzzy_tcp_session_cleanup(session); - /* Stop pure timer (no IO) */ - if (ev_is_active(&session->timer_ev.tm)) { - ev_timer_stop(session->event_loop, &session->timer_ev.tm); - } + /* Stop pure timer (no IO) - safe to call even if not active */ + ev_timer_stop(session->event_loop, &session->timer_ev.tm); } if (session->commands) { @@ -4219,7 +4267,7 @@ fuzzy_tcp_timer_libev_cb(EV_P_ struct ev_timer *w, int revents) ev->cb(-1, EV_TIMER, ev->ud); } -/* TCP timeout callback - no retransmits needed, connection is established */ +/* TCP timeout callback - session-level timeout, does NOT mark connection as failed */ static void fuzzy_tcp_timer_callback(int fd, short what, void *arg) { @@ -4227,6 +4275,10 @@ fuzzy_tcp_timer_callback(int fd, short what, void *arg) struct rspamd_task *task = session->task; struct fuzzy_cmd_io *io; unsigned int i, nreplied = 0; + ev_tstamp now = rspamd_get_calendar_ticks(); + + /* Check pending timeouts for all requests in this rule (periodic check) */ + fuzzy_tcp_check_pending_timeouts(session->rule, now); /* Check if all commands have been replied */ for (i = 0; i < session->commands->len; i++) { @@ -4243,8 +4295,8 @@ fuzzy_tcp_timer_callback(int fd, short what, void *arg) return; } - /* Timeout - just fail the request, don't retry via UDP */ - msg_warn_task("fuzzy_tcp: timeout waiting for replies from %s (%d/%d replied), giving up", + /* Session timeout - mark unreplied commands as failed for this session only */ + msg_warn_task("fuzzy_tcp: session timeout waiting for replies from %s (%d/%d replied)", rspamd_upstream_name(session->server), nreplied, (int) session->commands->len); @@ -4256,29 +4308,19 @@ fuzzy_tcp_timer_callback(int fd, short what, void *arg) } } - rspamd_upstream_fail(session->server, TRUE, "timeout"); + /* Report upstream issue but don't mark as completely failed */ + rspamd_upstream_fail(session->server, FALSE, "session timeout"); - /* Mark TCP connection as failed so future requests use UDP for ~10 seconds */ - for (i = 0; i < session->rule->tcp_connections->len; i++) { - struct fuzzy_tcp_connection *conn = g_ptr_array_index(session->rule->tcp_connections, i); - if (conn->server == session->server) { - conn->failed = TRUE; - conn->last_failure = rspamd_get_calendar_ticks(); - conn->connected = FALSE; - msg_info_task("fuzzy_tcp: marked connection to %s as failed, switching to UDP for 10s", - rspamd_upstream_name(session->server)); - break; - } - } + /* NOTE: We do NOT mark connection->failed = TRUE here! + * Session timeout is not a connection failure - other sessions may still succeed. + * Connection failures (socket errors) are handled in IO handlers. + */ - /* Clean up TCP session - stop timer and remove event */ - /* Remove any pending TCP requests for this session */ + /* Clean up pending requests for THIS session only */ fuzzy_tcp_session_cleanup(session); - /* Stop pure timer (no IO) */ - if (ev_is_active(&session->timer_ev.tm)) { - ev_timer_stop(session->event_loop, &session->timer_ev.tm); - } + /* Stop pure timer (no IO) - safe to call even if not active */ + ev_timer_stop(session->event_loop, &session->timer_ev.tm); /* Decrement async counter for TCP session */ if (session->item) {