]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Fuzzy TCP: fix server replies and client event handling
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 18:20:03 +0000 (19:20 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 18:20:03 +0000 (19:20 +0100)
Server was accepting TCP connections but never sending replies back,
causing all TCP requests to timeout. The issue had multiple causes:

Server side:
- TCP replies were routed through UDP code path, which doesn't queue
  replies for TCP sessions
- Async backend operations used stack-allocated session, causing
  segfaults when callback executed after stack frame destroyed

Client side:
- Event handler used equality checks (==) instead of bitwise (&)
  for libev event flags, preventing read events from being processed
- Timer initialization used rspamd IO wrapper for pure timer,
  causing fd=-1 assertion failures in ev_io_start
- Pending requests not cleaned up on timeout, causing use-after-free
  when late replies arrived after task completion

Fix by implementing TCP reply queue on server, using heap allocation
for async operations with proper reference counting, fixing event
handling to use bitwise operators, and implementing pure libev timer
for TCP timeout handling.

src/fuzzy_storage.c
src/plugins/fuzzy_check.c

index 8d97ebca8d13f0551cd137f3c5b677c9d2897e74..6077593d0d39541edf48a68082dfb187551d55e8 100644 (file)
@@ -320,6 +320,9 @@ struct fuzzy_session {
        struct fuzzy_key *key;
        struct rspamd_fuzzy_cmd_extension *extensions;
        unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+       /* If this is a TCP session, this pointer will be set */
+       struct fuzzy_tcp_session *tcp_session;
 };
 
 struct fuzzy_peer_request {
@@ -597,11 +600,10 @@ rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
                }
        }
 
-       /*
-       if (rspamd_inet_address_is_local (addr, TRUE)) {
+       /* Skip ratelimit for local addresses */
+       if (rspamd_inet_address_is_local(addr)) {
                return TRUE;
        }
-       */
 
        masked = rspamd_inet_address_copy(addr, NULL);
 
@@ -1011,6 +1013,72 @@ rspamd_fuzzy_reply_io(EV_P_ ev_io *w, int revents)
        REF_RELEASE(session);
 }
 
+static void
+rspamd_fuzzy_tcp_enqueue_reply(struct fuzzy_session *session)
+{
+       struct fuzzy_tcp_session *tcp_session = session->tcp_session;
+       struct fuzzy_tcp_reply_queue_elt *reply_elt;
+       gsize len;
+       gconstpointer data;
+
+       if (tcp_session == NULL) {
+               msg_err("internal error: tcp_session is NULL in rspamd_fuzzy_tcp_enqueue_reply");
+               return;
+       }
+
+       /* Determine reply data and length */
+       if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
+               session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
+               /* Encrypted reply */
+               data = &session->reply;
+
+               if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+                       len = sizeof(session->reply);
+               }
+               else {
+                       len = sizeof(session->reply.hdr) + sizeof(session->reply.rep.v1);
+               }
+       }
+       else {
+               data = &session->reply.rep;
+
+               if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+                       len = sizeof(session->reply.rep);
+               }
+               else {
+                       len = sizeof(session->reply.rep.v1);
+               }
+       }
+
+       /* Create reply queue element */
+       reply_elt = g_malloc0(sizeof(*reply_elt));
+       reply_elt->rep.size_hdr = htons((uint16_t) len);
+       memcpy(&reply_elt->rep.payload, data, len);
+       reply_elt->written = 0;
+
+       /* Add to queue */
+       DL_APPEND(tcp_session->replies_queue, reply_elt);
+
+       msg_debug_fuzzy_storage("enqueued TCP reply to %s, %z bytes",
+                                                       rspamd_inet_address_to_string(session->addr),
+                                                       len);
+
+       /* Enable write event if not already enabled */
+       if (ev_is_active(&tcp_session->common.io)) {
+               int events = tcp_session->common.io.events;
+               if (!(events & EV_WRITE)) {
+                       ev_io_stop(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
+                       ev_io_set(&tcp_session->common.io, tcp_session->common.fd, EV_READ | EV_WRITE);
+                       ev_io_start(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
+               }
+       }
+       else {
+               /* Watcher is not active, start it with both read and write */
+               ev_io_set(&tcp_session->common.io, tcp_session->common.fd, EV_READ | EV_WRITE);
+               ev_io_start(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
+       }
+}
+
 static void
 rspamd_fuzzy_write_reply(struct fuzzy_session *session)
 {
@@ -1018,6 +1086,12 @@ rspamd_fuzzy_write_reply(struct fuzzy_session *session)
        gsize len;
        gconstpointer data;
 
+       /* Check if this is a TCP session */
+       if (session->tcp_session != NULL) {
+               rspamd_fuzzy_tcp_enqueue_reply(session);
+               return;
+       }
+
        if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
                session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
                /* Encrypted reply */
@@ -2607,39 +2681,59 @@ rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents)
 
                        /* Check if we have complete frame */
                        if (session->bytes_unprocessed - processed_offset >= frame_len) {
-                               /* Process this frame using legacy session temporarily */
-                               struct fuzzy_session legacy_session;
-
-                               memset(&legacy_session, 0, sizeof(legacy_session));
-                               legacy_session.worker = session->common.worker;
-                               legacy_session.addr = session->common.addr;
-                               legacy_session.ctx = session->common.ctx;
-                               legacy_session.fd = session->common.fd;
-                               legacy_session.timestamp = session->common.timestamp;
-                               legacy_session.key = session->common.key;
-                               legacy_session.ip_stat = session->common.ip_stat;
-                               memcpy(legacy_session.nm, session->common.nm, sizeof(legacy_session.nm));
+                               /* Create heap-allocated session for async processing */
+                               struct fuzzy_session *cmd_session = g_malloc0(sizeof(*cmd_session));
+                               REF_INIT_RETAIN(cmd_session, fuzzy_session_destroy);
+
+                               /* Copy data from TCP session to command session */
+                               cmd_session->worker = session->common.worker;
+                               cmd_session->addr = rspamd_inet_address_copy(session->common.addr, NULL);
+                               cmd_session->ctx = session->common.ctx;
+                               cmd_session->fd = session->common.fd;
+                               cmd_session->timestamp = session->common.timestamp;
+                               cmd_session->key = session->common.key;
+                               cmd_session->ip_stat = session->common.ip_stat;
+                               memcpy(cmd_session->nm, session->common.nm, sizeof(cmd_session->nm));
+
+                               /* Retain references to shared objects */
+                               if (cmd_session->key) {
+                                       REF_RETAIN(cmd_session->key);
+                               }
+                               if (cmd_session->ip_stat) {
+                                       REF_RETAIN(cmd_session->ip_stat);
+                               }
+                               session->common.worker->nconns++;
+
+                               /* Set TCP session pointer so replies go to TCP queue */
+                               cmd_session->tcp_session = session;
+                               REF_RETAIN(session); /* TCP session must live until command is processed */
 
                                if (rspamd_fuzzy_cmd_from_wire(session->input_buf + processed_offset,
-                                                                                          frame_len, &legacy_session)) {
-                                       /* Copy parsed data back */
-                                       session->common.epoch = legacy_session.epoch;
-                                       session->common.cmd_type = legacy_session.cmd_type;
-                                       memcpy(&session->common.cmd, &legacy_session.cmd, sizeof(session->common.cmd));
-                                       session->common.key = legacy_session.key;
-                                       session->common.extensions = legacy_session.extensions;
-                                       memcpy(session->common.nm, legacy_session.nm, sizeof(session->common.nm));
-
-                                       /* Process command - this will need to be adapted for TCP */
-                                       rspamd_fuzzy_process_command(&legacy_session);
+                                                                                          frame_len, cmd_session)) {
+                                       /* Copy parsed data back to TCP session for tracking */
+                                       session->common.epoch = cmd_session->epoch;
+                                       session->common.cmd_type = cmd_session->cmd_type;
+                                       memcpy(&session->common.cmd, &cmd_session->cmd, sizeof(session->common.cmd));
+
+                                       /* Note: key and extensions ownership transferred to cmd_session */
+                                       session->common.key = cmd_session->key;
+                                       session->common.extensions = cmd_session->extensions;
+                                       memcpy(session->common.nm, cmd_session->nm, sizeof(session->common.nm));
+
+                                       /* Process command - replies will go to TCP queue via tcp_session pointer */
+                                       rspamd_fuzzy_process_command(cmd_session);
                                }
                                else {
                                        session->common.ctx->stat.invalid_requests++;
                                        msg_debug_fuzzy_storage("invalid TCP fuzzy command of size %d received from %s",
                                                                                        (int) frame_len,
                                                                                        rspamd_inet_address_to_string(session->common.addr));
+                                       REF_RELEASE(session); /* Release TCP session reference */
                                }
 
+                               /* Release our reference - session will be freed when all callbacks complete */
+                               REF_RELEASE(cmd_session);
+
                                processed_offset += frame_len;
                                session->cur_frame_state = 0x0000; /* Reset for next frame */
                        }
index 506f323443a739ff42fd6315d7c8969161090ca2..c3d7defe0bdddb8425075e688239e14e895c5d72 100644 (file)
@@ -169,6 +169,7 @@ struct fuzzy_client_session {
        struct fuzzy_rule *rule;
        struct ev_loop *event_loop;
        struct rspamd_io_ev ev;
+       struct rspamd_io_ev timer_ev; /* Separate timer for TCP requests */
        int state;
        int fd;
        int retransmits;
@@ -248,6 +249,7 @@ struct fuzzy_tcp_connection {
 
        ev_tstamp connect_start; /* When connection started */
        ev_tstamp last_activity; /* Last I/O activity */
+       ev_tstamp last_failure;  /* When connection last failed (for retry logic) */
 
        ref_entry_t ref; /* Reference counting */
 };
@@ -692,9 +694,25 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
                        return NULL;
                }
                else if (conn->failed) {
-                       /* Previous connection failed - remove and recreate */
-                       g_ptr_array_remove(rule->tcp_connections, conn);
-                       conn = NULL;
+                       /* Previous connection failed - check if enough time passed to retry */
+                       ev_tstamp now = rspamd_get_calendar_ticks();
+                       ev_tstamp time_since_failure = now - conn->last_failure;
+
+                       if (time_since_failure < 10.0) {
+                               /* Recent failure - don't retry TCP yet, fallback to UDP */
+                               msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s, using UDP fallback",
+                                                          time_since_failure,
+                                                          rspamd_upstream_name(upstream));
+                               return NULL;
+                       }
+                       else {
+                               /* Old failure - remove and try reconnecting */
+                               msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s, retrying TCP",
+                                                         time_since_failure,
+                                                         rspamd_upstream_name(upstream));
+                               g_ptr_array_remove(rule->tcp_connections, conn);
+                               conn = NULL;
+                       }
                }
        }
 
@@ -767,11 +785,18 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
        /* Check session completion for all affected sessions */
        GHashTableIter session_iter;
        struct fuzzy_client_session *session;
+       int sessions_checked = 0;
        g_hash_table_iter_init(&session_iter, sessions_to_check);
        while (g_hash_table_iter_next(&session_iter, (gpointer *) &session, NULL)) {
+               sessions_checked++;
                fuzzy_check_session_is_completed(session);
        }
 
+       if (sessions_checked > 0 && task) {
+               msg_info_task("fuzzy_tcp: checked %d sessions for completion after connection cleanup",
+                                         sessions_checked);
+       }
+
        g_ptr_array_free(to_remove, TRUE);
        g_hash_table_unref(sessions_to_check);
 }
@@ -865,14 +890,16 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                                 conn->rule->name,
                                 rspamd_upstream_name(conn->server));
                fuzzy_tcp_connection_cleanup(conn);
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
+               conn->last_failure = rspamd_get_calendar_ticks();
                conn->connecting = FALSE;
                rspamd_upstream_fail(conn->server, TRUE, "timeout");
                FUZZY_TCP_RELEASE(conn);
                return;
        }
 
-       if (what == EV_WRITE) {
+       if (what & EV_WRITE) {
                /* Check if we're still connecting */
                if (conn->connecting && !conn->connected) {
                        /* Verify connection succeeded */
@@ -882,7 +909,9 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                                                 rspamd_upstream_name(conn->server),
                                                 strerror(errno));
                                fuzzy_tcp_connection_cleanup(conn);
+                               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                                conn->failed = TRUE;
+                               conn->last_failure = rspamd_get_calendar_ticks();
                                conn->connecting = FALSE;
                                rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
                                FUZZY_TCP_RELEASE(conn);
@@ -895,7 +924,9 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                                                 rspamd_upstream_name(conn->server),
                                                 strerror(so_error));
                                fuzzy_tcp_connection_cleanup(conn);
+                               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                                conn->failed = TRUE;
+                               conn->last_failure = rspamd_get_calendar_ticks();
                                conn->connecting = FALSE;
                                rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
                                FUZZY_TCP_RELEASE(conn);
@@ -906,15 +937,18 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                        conn->connected = TRUE;
                        conn->connecting = FALSE;
 
-                       msg_info("fuzzy_tcp: connection established to %s for rule %s",
+                       msg_info("fuzzy_tcp: connection established to %s for rule %s (fd=%d, ev.io.fd=%d)",
                                         rspamd_inet_address_to_string_pretty(conn->addr),
-                                        conn->rule->name);
+                                        conn->rule->name, conn->fd, (int) conn->ev.io.fd);
 
                        rspamd_upstream_ok(conn->server);
 
                        /* Now wait for both read and write events */
                        rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev,
                                                                                 EV_READ | EV_WRITE);
+
+                       msg_debug("fuzzy_tcp: after reschedule - fd=%d, ev.io.fd=%d",
+                                         conn->fd, (int) conn->ev.io.fd);
                }
                else if (conn->connected) {
                        /* Handle write */
@@ -922,7 +956,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                }
        }
 
-       if (what == EV_READ && conn->connected) {
+       if (what & EV_READ && conn->connected) {
                /* Handle read */
                fuzzy_tcp_read_handler(conn);
        }
@@ -974,7 +1008,9 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
                                        rspamd_upstream_name(conn->server),
                                        strerror(errno));
                        fuzzy_tcp_connection_cleanup(conn);
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        conn->failed = TRUE;
+                       conn->last_failure = rspamd_get_calendar_ticks();
                        return;
                }
                else if (r == 0) {
@@ -982,7 +1018,9 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
                                         rspamd_upstream_name(conn->server),
                                         conn->rule->name);
                        fuzzy_tcp_connection_cleanup(conn);
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        conn->failed = TRUE;
+                       conn->last_failure = rspamd_get_calendar_ticks();
                        return;
                }
 
@@ -1019,6 +1057,16 @@ fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
        unsigned int i;
        struct rspamd_task *task = session->task;
 
+       msg_debug_task("fuzzy_tcp_send_command: fd=%d, ev.io.fd=%d, connected=%d, failed=%d",
+                                  conn->fd, (int) conn->ev.io.fd, conn->connected, conn->failed);
+
+       /* Don't send if connection is failed or not connected */
+       if (!conn->connected || conn->failed) {
+               msg_warn_task("fuzzy_tcp: cannot send commands - connection not ready (connected=%d, failed=%d)",
+                                         conn->connected, conn->failed);
+               return FALSE;
+       }
+
        for (i = 0; i < commands->len; i++) {
                io = g_ptr_array_index(commands, i);
 
@@ -1060,7 +1108,27 @@ fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
 
        /* Ensure write events are enabled */
        if (!g_queue_is_empty(conn->write_queue)) {
-               rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE);
+               msg_debug_task("fuzzy_tcp: checking watcher before reschedule - fd=%d, ev.io.fd=%d",
+                                          conn->fd, (int) conn->ev.io.fd);
+
+               /* Verify fd is valid before rescheduling */
+               if (conn->fd >= 0 && conn->ev.io.fd == conn->fd) {
+                       msg_debug_task("fuzzy_tcp: reschedule watcher for fd=%d", conn->fd);
+                       rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE);
+               }
+               else if (conn->fd >= 0 && conn->ev.io.fd != conn->fd) {
+                       /* Fd mismatch - reinitialize watcher */
+                       msg_warn_task("fuzzy_tcp: fd mismatch in watcher (ev.fd=%d, conn.fd=%d), reinitializing",
+                                                 (int) conn->ev.io.fd, conn->fd);
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+                       rspamd_ev_watcher_init(&conn->ev, conn->fd, EV_READ | EV_WRITE,
+                                                                  fuzzy_tcp_io_handler, conn);
+                       rspamd_ev_watcher_start(conn->event_loop, &conn->ev, conn->rule->tcp_timeout);
+               }
+               else {
+                       msg_warn_task("fuzzy_tcp: invalid fd in connection (fd=%d, ev.io.fd=%d), cannot reschedule",
+                                                 conn->fd, (int) conn->ev.io.fd);
+               }
        }
 
        return TRUE;
@@ -1202,11 +1270,14 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
        msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %ud from %s (prob=%.2f)",
                                                  tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
 
+       /* Save session before removing pending (which may free it) */
+       struct fuzzy_client_session *session_to_check = pending->session;
+
        /* Remove from pending requests */
        g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
 
        /* Check if session is completed */
-       fuzzy_check_session_is_completed(pending->session);
+       fuzzy_check_session_is_completed(session_to_check);
 }
 
 /**
@@ -1225,7 +1296,9 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
                                conn->rule->name);
                fuzzy_tcp_connection_cleanup(conn);
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
+               conn->last_failure = rspamd_get_calendar_ticks();
                return;
        }
 
@@ -1238,7 +1311,9 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                msg_err("fuzzy_tcp: read error for rule %s: %s",
                                conn->rule->name, strerror(errno));
                fuzzy_tcp_connection_cleanup(conn);
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
+               conn->last_failure = rspamd_get_calendar_ticks();
                return;
        }
        else if (r == 0) {
@@ -1246,7 +1321,9 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                                 rspamd_upstream_name(conn->server),
                                 conn->rule->name);
                fuzzy_tcp_connection_cleanup(conn);
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                conn->failed = TRUE;
+               conn->last_failure = rspamd_get_calendar_ticks();
                return;
        }
 
@@ -1290,7 +1367,9 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                                        (int) frame_len,
                                        rspamd_upstream_name(conn->server));
                        fuzzy_tcp_connection_cleanup(conn);
+                       rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
                        conn->failed = TRUE;
+                       conn->last_failure = rspamd_get_calendar_ticks();
                        return;
                }
 
@@ -2644,8 +2723,12 @@ fuzzy_io_fin(void *ud)
 
        /* Remove any pending TCP requests for this session */
        if (session->fd == -1) {
-               /* TCP session - cleanup pending requests */
+               /* TCP session - cleanup pending requests and stop timer */
                fuzzy_tcp_session_cleanup(session);
+               /* Stop pure timer (no IO) */
+               if (ev_is_active(&session->timer_ev.tm)) {
+                       ev_timer_stop(session->event_loop, &session->timer_ev.tm);
+               }
        }
 
        if (session->commands) {
@@ -4101,6 +4184,7 @@ fuzzy_check_session_is_completed(struct fuzzy_client_session *session)
 {
        struct fuzzy_cmd_io *io;
        unsigned int nreplied = 0, i;
+       struct rspamd_task *task = session->task;
 
        rspamd_upstream_ok(session->server);
 
@@ -4116,6 +4200,7 @@ fuzzy_check_session_is_completed(struct fuzzy_client_session *session)
                fuzzy_insert_metric_results(session->task, session->rule, session->results);
 
                if (session->item) {
+                       msg_debug_fuzzy_check("fuzzy_check: decrementing async counter for completed session");
                        rspamd_symcache_item_async_dec_check(session->task, session->item, M);
                }
 
@@ -4123,6 +4208,10 @@ fuzzy_check_session_is_completed(struct fuzzy_client_session *session)
 
                return TRUE;
        }
+       else {
+               msg_debug_fuzzy_check("fuzzy_check: session not completed (%d/%d replied)",
+                                                         nreplied, (int) session->commands->len);
+       }
 
        return FALSE;
 }
@@ -4187,6 +4276,84 @@ fuzzy_check_timer_callback(int fd, short what, void *arg)
        }
 }
 
+/* libev wrapper for TCP timer - calls rspamd_io_ev style callback */
+static void
+fuzzy_tcp_timer_libev_cb(EV_P_ struct ev_timer *w, int revents)
+{
+       struct rspamd_io_ev *ev = (struct rspamd_io_ev *) w->data;
+       ev->cb(-1, EV_TIMER, ev->ud);
+}
+
+/* TCP timeout callback - no retransmits needed, connection is established */
+static void
+fuzzy_tcp_timer_callback(int fd, short what, void *arg)
+{
+       struct fuzzy_client_session *session = arg;
+       struct rspamd_task *task = session->task;
+       struct fuzzy_cmd_io *io;
+       unsigned int i, nreplied = 0;
+
+       /* Check if all commands have been replied */
+       for (i = 0; i < session->commands->len; i++) {
+               io = g_ptr_array_index(session->commands, i);
+               if (io->flags & FUZZY_CMD_FLAG_REPLIED) {
+                       nreplied++;
+               }
+       }
+
+       if (nreplied == session->commands->len) {
+               /* All replied, just complete */
+               msg_debug_fuzzy_check("fuzzy_tcp: all commands replied, completing session");
+               fuzzy_check_session_is_completed(session);
+               return;
+       }
+
+       /* Timeout - just fail the request, don't retry via UDP */
+       msg_warn_task("fuzzy_tcp: timeout waiting for replies from %s (%d/%d replied), giving up",
+                                 rspamd_upstream_name(session->server),
+                                 nreplied, (int) session->commands->len);
+
+       /* Mark all unreplied commands as failed */
+       for (i = 0; i < session->commands->len; i++) {
+               io = g_ptr_array_index(session->commands, i);
+               if (!(io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+                       io->flags |= FUZZY_CMD_FLAG_REPLIED;
+               }
+       }
+
+       rspamd_upstream_fail(session->server, TRUE, "timeout");
+
+       /* Mark TCP connection as failed so future requests use UDP for ~10 seconds */
+       for (i = 0; i < session->rule->tcp_connections->len; i++) {
+               struct fuzzy_tcp_connection *conn = g_ptr_array_index(session->rule->tcp_connections, i);
+               if (conn->server == session->server) {
+                       conn->failed = TRUE;
+                       conn->last_failure = rspamd_get_calendar_ticks();
+                       conn->connected = FALSE;
+                       msg_info_task("fuzzy_tcp: marked connection to %s as failed, switching to UDP for 10s",
+                                                 rspamd_upstream_name(session->server));
+                       break;
+               }
+       }
+
+       /* Clean up TCP session - stop timer and remove event */
+       /* Remove any pending TCP requests for this session */
+       fuzzy_tcp_session_cleanup(session);
+
+       /* Stop pure timer (no IO) */
+       if (ev_is_active(&session->timer_ev.tm)) {
+               ev_timer_stop(session->event_loop, &session->timer_ev.tm);
+       }
+
+       /* Decrement async counter for TCP session */
+       if (session->item) {
+               rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+       }
+
+       /* Remove TCP session event */
+       rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+}
+
 /* Fuzzy check callback */
 static void
 fuzzy_check_io_callback(int fd, short what, void *arg)
@@ -4863,6 +5030,16 @@ register_fuzzy_client_call(struct rspamd_task *task,
                                        rspamd_symcache_item_async_inc(task, session->item, M);
                                }
 
+                               /* Start timer for TCP request timeout */
+                               /* Use pure timer (no IO), so use libev API directly */
+                               session->timer_ev.cb = fuzzy_tcp_timer_callback;
+                               session->timer_ev.ud = session;
+                               session->timer_ev.timeout = rule->io_timeout;
+                               session->timer_ev.tm.data = &session->timer_ev;
+                               ev_timer_init(&session->timer_ev.tm, fuzzy_tcp_timer_libev_cb,
+                                                         rule->io_timeout, 0.0);
+                               ev_timer_start(session->event_loop, &session->timer_ev.tm);
+
                                return; /* TCP send successful */
                        }
                        else {