]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Fuzzy check: implement TCP connection management
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 08:19:40 +0000 (09:19 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 09:27:18 +0000 (10:27 +0100)
Add async TCP connection establishment and I/O framework. This
implements Phase 2 of the TCP support - connection management with
event-driven architecture.

Changes:
- Add fuzzy_tcp_connection structure for per-rule TCP state
- Add fuzzy_tcp_pending_command for request/reply matching
- Implement fuzzy_tcp_connect_async() with non-blocking connect
- Implement fuzzy_tcp_io_handler() for connection/read/write events
- Add connection lifecycle management with reference counting
- Handle connection establishment with getsockopt SO_ERROR check
- Add timeout handling and upstream failure reporting
- Add placeholder write and read handlers for next phase

TCP connection is established lazily when rate threshold is exceeded.
Event handler manages connection state machine: connecting -> connected.
Write/read handlers will be implemented in Phase 3.

src/plugins/fuzzy_check.c

index 20beda113c6e0b023f7d3007578e92df2c1086b2..8e136c672783293f1461710c593de060759fcbec 100644 (file)
@@ -122,12 +122,9 @@ struct fuzzy_rule {
                ev_tstamp window_start;  /* Start of the current window */
        } rate_tracker;
 
-       /* TCP connection state */
-       struct {
-               gboolean connected;  /* TCP connection is active and ready */
-               gboolean connecting; /* TCP connection in progress */
-               void *connection;    /* Placeholder for TCP connection handle (future) */
-       } tcp_state;
+       /* TCP connection pool - array of connections, one per upstream */
+       GPtrArray *tcp_connections;   /* Array of fuzzy_tcp_connection* */
+       GHashTable *pending_requests; /* Global: tag -> fuzzy_tcp_pending_command */
 };
 
 struct fuzzy_ctx {
@@ -195,6 +192,65 @@ struct fuzzy_learn_session {
        int retransmits;
 };
 
+/**
+ * Pending command awaiting TCP reply
+ * Used to match replies with requests
+ */
+struct fuzzy_tcp_pending_command {
+       struct fuzzy_cmd_io *io;              /* Command I/O */
+       struct rspamd_task *task;             /* Task associated with command */
+       struct fuzzy_client_session *session; /* Session for reply delivery */
+       ev_tstamp send_time;                  /* When command was sent */
+};
+
+/**
+ * TCP write queue element - contains framed command ready to send
+ */
+struct fuzzy_tcp_write_buf {
+       uint16_t size_hdr;   /* Frame size in network byte order */
+       unsigned char *data; /* Command data (encrypted) */
+       gsize total_len;     /* Total length: sizeof(size_hdr) + data_len */
+       gsize bytes_written; /* How many bytes already written */
+};
+
+/**
+ * TCP connection state for a fuzzy rule
+ * One TCP connection per fuzzy_rule, shared across all tasks
+ */
+struct fuzzy_tcp_connection {
+       struct fuzzy_rule *rule;  /* Parent rule */
+       struct upstream *server;  /* Connected upstream */
+       rspamd_inet_addr_t *addr; /* Server address */
+
+       int fd;                     /* Socket file descriptor */
+       struct ev_loop *event_loop; /* Event loop */
+       struct rspamd_io_ev ev;     /* Event watcher */
+
+       /* Write state */
+       GQueue *write_queue; /* Queue of iovec to send */
+       gsize bytes_sent;    /* Bytes sent from current iovec */
+
+       /* Read state - TCP framing */
+       uint16_t cur_frame_state;     /* 0x0000/0x8000/0xC000 like server */
+       gsize bytes_unprocessed;      /* Bytes in read_buf not yet processed */
+       unsigned char read_buf[8192]; /* Read buffer for incoming data */
+
+       /* Encryption keys for this connection */
+       struct rspamd_cryptobox_keypair *local_key; /* Local keypair (can be NULL) */
+       struct rspamd_cryptobox_pubkey *peer_key;   /* Server public key (can be NULL) */
+       gboolean encrypted;                         /* TRUE if connection uses encryption */
+
+       /* Connection state */
+       gboolean connected;  /* TCP handshake complete */
+       gboolean connecting; /* Connection in progress */
+       gboolean failed;     /* Connection failed */
+
+       ev_tstamp connect_start; /* When connection started */
+       ev_tstamp last_activity; /* Last I/O activity */
+
+       ref_entry_t ref; /* Reference counting */
+};
+
 #define FUZZY_CMD_FLAG_REPLIED (1 << 0)
 #define FUZZY_CMD_FLAG_SENT (1 << 1)
 #define FUZZY_CMD_FLAG_IMAGE (1 << 2)
@@ -400,6 +456,16 @@ fuzzy_free_rule(gpointer r)
        if (rule->write_peer_key) {
                rspamd_pubkey_unref(rule->write_peer_key);
        }
+
+       /* Clean up TCP connections */
+       if (rule->tcp_connections) {
+               g_ptr_array_free(rule->tcp_connections, TRUE);
+       }
+
+       /* Clean up pending requests pool */
+       if (rule->pending_requests) {
+               g_hash_table_destroy(rule->pending_requests);
+       }
 }
 
 /**
@@ -423,27 +489,23 @@ fuzzy_update_rate_tracker(struct fuzzy_rule *rule, ev_tstamp now)
 }
 
 /**
- * Determine whether to use TCP for this request
- * Returns TRUE if TCP should be used, FALSE for UDP
+ * Check if TCP should be attempted based on configuration and rate
  */
 static gboolean
-fuzzy_should_use_tcp(struct fuzzy_rule *rule)
+fuzzy_should_try_tcp(struct fuzzy_rule *rule, ev_tstamp now)
 {
-       /* If TCP is explicitly enabled, always prefer TCP when connected */
-       if (rule->tcp_enabled && rule->tcp_state.connected) {
+       /* TCP explicitly enabled - always try */
+       if (rule->tcp_enabled) {
                return TRUE;
        }
 
-       /* If auto-switch is enabled and TCP is connected */
-       if (rule->tcp_auto && rule->tcp_state.connected) {
-               /* Calculate current rate */
-               if (rule->rate_tracker.window_start > 0 && rule->tcp_window > 0) {
-                       ev_tstamp elapsed = rspamd_get_calendar_ticks() - rule->rate_tracker.window_start;
-                       if (elapsed > 0) {
-                               double rate = (double) rule->rate_tracker.requests_count / elapsed;
-                               if (rate > rule->tcp_threshold) {
-                                       return TRUE;
-                               }
+       /* TCP auto-switch based on rate */
+       if (rule->tcp_auto && rule->tcp_window > 0) {
+               ev_tstamp elapsed = now - rule->rate_tracker.window_start;
+               if (elapsed > 0) {
+                       double rate = (double) rule->rate_tracker.requests_count / elapsed;
+                       if (rate > rule->tcp_threshold) {
+                               return TRUE;
                        }
                }
        }
@@ -451,25 +513,525 @@ fuzzy_should_use_tcp(struct fuzzy_rule *rule)
        return FALSE;
 }
 
+/* Forward declarations for TCP functions */
+static void fuzzy_tcp_io_handler(int fd, short what, gpointer ud);
+static void fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn);
+static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn);
+
+/* Forward declarations for helper functions */
+static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule);
+static void fuzzy_insert_result(struct fuzzy_client_session *session,
+                                                               const struct rspamd_fuzzy_reply *rep,
+                                                               struct rspamd_fuzzy_cmd *cmd,
+                                                               struct fuzzy_cmd_io *io,
+                                                               unsigned int flag);
+
+#define FUZZY_TCP_RETAIN(x) REF_RETAIN(x)
+#define FUZZY_TCP_RELEASE(x) REF_RELEASE(x)
+
 /**
- * Initiate asynchronous TCP connection (placeholder)
- * This function will be implemented later with actual TCP connection logic
+ * Free TCP connection resources
  */
 static void
-fuzzy_tcp_connect_async(struct fuzzy_rule *rule)
+fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn)
 {
-       /* Mark connection as in progress */
-       if (!rule->tcp_state.connecting && !rule->tcp_state.connected) {
-               rule->tcp_state.connecting = TRUE;
-
-               /* TODO: Implement actual async TCP connection
-                * - Create socket
-                * - Set non-blocking mode
-                * - Start connect()
-                * - Register event handler for connection completion
-                * - On success: set tcp_state.connected = TRUE, connecting = FALSE
-                * - On failure: set connecting = FALSE, schedule retry
-                */
+       if (conn->fd != -1) {
+               rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+               close(conn->fd);
+       }
+
+       if (conn->write_queue) {
+               g_queue_free(conn->write_queue);
+       }
+
+       g_free(conn);
+}
+
+/**
+ * Create new TCP connection structure
+ */
+static struct fuzzy_tcp_connection *
+fuzzy_tcp_connection_new(struct fuzzy_rule *rule, struct ev_loop *event_loop)
+{
+       struct fuzzy_tcp_connection *conn;
+
+       conn = g_malloc0(sizeof(struct fuzzy_tcp_connection));
+       conn->rule = rule;
+       conn->fd = -1;
+       conn->event_loop = event_loop;
+       conn->write_queue = g_queue_new();
+       conn->connected = FALSE;
+       conn->connecting = FALSE;
+       conn->failed = FALSE;
+       conn->cur_frame_state = 0x0000;
+       conn->bytes_sent = 0;
+       conn->bytes_unprocessed = 0;
+
+       REF_INIT_RETAIN(conn, fuzzy_tcp_connection_free);
+
+       return conn;
+}
+
+/**
+ * Initiate asynchronous TCP connection for specific upstream
+ */
+static struct fuzzy_tcp_connection *
+fuzzy_tcp_connect_async(struct fuzzy_rule *rule,
+                                               struct upstream *upstream,
+                                               struct rspamd_task *task,
+                                               gboolean is_write_server)
+{
+       struct fuzzy_tcp_connection *conn;
+       rspamd_inet_addr_t *addr;
+       int fd;
+
+       /* Get current server address (not next, to avoid address rotation) */
+       addr = rspamd_upstream_addr_cur(upstream);
+       if (!addr) {
+               msg_warn_task("fuzzy_tcp: no address for upstream %s in rule %s",
+                                         rspamd_upstream_name(upstream),
+                                         rule->name);
+               return NULL;
+       }
+
+       rspamd_inet_address_set_port(addr, rspamd_upstream_port(upstream));
+
+       /* Create non-blocking TCP socket */
+       fd = rspamd_inet_address_connect(addr, SOCK_STREAM, TRUE);
+       if (fd == -1) {
+               msg_warn_task("fuzzy_tcp: cannot connect to %s (%s): %s",
+                                         rspamd_upstream_name(upstream),
+                                         rspamd_inet_address_to_string_pretty(addr),
+                                         strerror(errno));
+               rspamd_upstream_fail(upstream, FALSE, strerror(errno));
+               return NULL;
+       }
+
+       /* Create connection structure */
+       conn = fuzzy_tcp_connection_new(rule, task->event_loop);
+       conn->fd = fd;
+       conn->server = upstream;
+       conn->addr = addr;
+       conn->connecting = TRUE;
+       conn->connect_start = rspamd_get_calendar_ticks();
+
+       /* Determine encryption keys for this connection */
+       if (fuzzy_rule_has_encryption(rule)) {
+               if (is_write_server) {
+                       /* Write server - use write keys */
+                       conn->local_key = rule->write_local_key ? rule->write_local_key : rule->local_key;
+                       conn->peer_key = rule->write_peer_key ? rule->write_peer_key : rule->peer_key;
+               }
+               else {
+                       /* Read server - use read keys */
+                       conn->local_key = rule->read_local_key ? rule->read_local_key : rule->local_key;
+                       conn->peer_key = rule->read_peer_key ? rule->read_peer_key : rule->peer_key;
+               }
+               conn->encrypted = TRUE;
+       }
+       else {
+               conn->local_key = NULL;
+               conn->peer_key = NULL;
+               conn->encrypted = FALSE;
+       }
+
+       /* Initialize event watcher for connection establishment (wait for write) */
+       rspamd_ev_watcher_init(&conn->ev, fd, EV_WRITE,
+                                                  fuzzy_tcp_io_handler, conn);
+       rspamd_ev_watcher_start(conn->event_loop, &conn->ev, rule->tcp_timeout);
+
+       FUZZY_TCP_RETAIN(conn);
+
+       /* Store in connection pool array */
+       g_ptr_array_add(rule->tcp_connections, conn);
+
+       msg_info_task("fuzzy_tcp: initiating connection to %s for rule %s",
+                                 rspamd_inet_address_to_string_pretty(addr),
+                                 rule->name);
+
+       return conn;
+}
+
+/**
+ * Get or create TCP connection for specific upstream
+ * Returns existing connection if available, creates new one if needed
+ */
+static struct fuzzy_tcp_connection *
+fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
+                                                                  struct upstream *upstream,
+                                                                  struct rspamd_task *task,
+                                                                  gboolean is_write_server)
+{
+       struct fuzzy_tcp_connection *conn = NULL;
+       guint i;
+
+       /* Search for existing connection to this upstream */
+       for (i = 0; i < rule->tcp_connections->len; i++) {
+               struct fuzzy_tcp_connection *c = g_ptr_array_index(rule->tcp_connections, i);
+
+               if (c->server == upstream) {
+                       conn = c;
+                       break;
+               }
+       }
+
+       if (conn) {
+               /* Connection exists - check state */
+               if (conn->connected) {
+                       /* Ready to use */
+                       return conn;
+               }
+               else if (conn->connecting) {
+                       /* Connection in progress - cannot use yet */
+                       return NULL;
+               }
+               else if (conn->failed) {
+                       /* Previous connection failed - remove and recreate */
+                       g_ptr_array_remove(rule->tcp_connections, conn);
+                       conn = NULL;
+               }
+       }
+
+       /* No connection or failed - create new one */
+       if (!conn) {
+               conn = fuzzy_tcp_connect_async(rule, upstream, task, is_write_server);
+       }
+
+       return conn;
+}
+
+/**
+ * Main TCP I/O event handler
+ * Handles connection establishment, reads, writes, and timeouts
+ */
+static void
+fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
+{
+       struct fuzzy_tcp_connection *conn = ud;
+       int so_error = 0;
+       socklen_t so_len = sizeof(so_error);
+
+       FUZZY_TCP_RETAIN(conn);
+
+       conn->last_activity = rspamd_get_calendar_ticks();
+
+       if (what == EV_TIMEOUT) {
+               msg_warn("fuzzy_tcp: connection timeout for rule %s to %s",
+                                conn->rule->name,
+                                rspamd_upstream_name(conn->server));
+               conn->failed = TRUE;
+               conn->connecting = FALSE;
+               rspamd_upstream_fail(conn->server, TRUE, "timeout");
+               FUZZY_TCP_RELEASE(conn);
+               return;
+       }
+
+       if (what == EV_WRITE) {
+               /* Check if we're still connecting */
+               if (conn->connecting && !conn->connected) {
+                       /* Verify connection succeeded */
+                       if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_len) == -1) {
+                               msg_warn("fuzzy_tcp: getsockopt failed for rule %s to %s: %s",
+                                                conn->rule->name,
+                                                rspamd_upstream_name(conn->server),
+                                                strerror(errno));
+                               conn->failed = TRUE;
+                               conn->connecting = FALSE;
+                               rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
+                               FUZZY_TCP_RELEASE(conn);
+                               return;
+                       }
+
+                       if (so_error != 0) {
+                               msg_warn("fuzzy_tcp: connection failed for rule %s to %s: %s",
+                                                conn->rule->name,
+                                                rspamd_upstream_name(conn->server),
+                                                strerror(so_error));
+                               conn->failed = TRUE;
+                               conn->connecting = FALSE;
+                               rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
+                               FUZZY_TCP_RELEASE(conn);
+                               return;
+                       }
+
+                       /* Connection established! */
+                       conn->connected = TRUE;
+                       conn->connecting = FALSE;
+
+                       msg_info("fuzzy_tcp: connection established to %s for rule %s",
+                                        rspamd_inet_address_to_string_pretty(conn->addr),
+                                        conn->rule->name);
+
+                       rspamd_upstream_ok(conn->server);
+
+                       /* Now wait for both read and write events */
+                       rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev,
+                                                                                EV_READ | EV_WRITE);
+               }
+               else if (conn->connected) {
+                       /* Handle write */
+                       fuzzy_tcp_write_handler(conn);
+               }
+       }
+
+       if (what == EV_READ && conn->connected) {
+               /* Handle read */
+               fuzzy_tcp_read_handler(conn);
+       }
+
+       FUZZY_TCP_RELEASE(conn);
+}
+
+/**
+ * TCP write handler
+ * Sends queued commands to the server with TCP framing
+ */
+static void
+fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
+{
+       struct fuzzy_tcp_write_buf *buf;
+       ssize_t r;
+
+       while ((buf = g_queue_peek_head(conn->write_queue)) != NULL) {
+               /* Write remaining data */
+               gsize remaining = buf->total_len - buf->bytes_written;
+               unsigned char *write_ptr;
+
+               /* Determine what to write: size_hdr or data */
+               if (buf->bytes_written < sizeof(buf->size_hdr)) {
+                       /* Still writing size header */
+                       write_ptr = (unsigned char *) &buf->size_hdr + buf->bytes_written;
+               }
+               else {
+                       /* Writing data */
+                       write_ptr = buf->data + (buf->bytes_written - sizeof(buf->size_hdr));
+               }
+
+               r = write(conn->fd, write_ptr, remaining);
+
+               if (r == -1) {
+                       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                               /* Cannot write more, wait for next event */
+                               return;
+                       }
+                       msg_err("fuzzy_tcp: write error for rule %s to %s: %s",
+                                       conn->rule->name,
+                                       rspamd_upstream_name(conn->server),
+                                       strerror(errno));
+                       conn->failed = TRUE;
+                       return;
+               }
+               else if (r == 0) {
+                       msg_info("fuzzy_tcp: connection closed by %s for rule %s",
+                                        rspamd_upstream_name(conn->server),
+                                        conn->rule->name);
+                       conn->failed = TRUE;
+                       return;
+               }
+
+               buf->bytes_written += r;
+
+               if (buf->bytes_written >= buf->total_len) {
+                       /* Buffer fully sent, remove from queue and free */
+                       g_queue_pop_head(conn->write_queue);
+                       g_free(buf->data);
+                       g_free(buf);
+               }
+               else {
+                       /* Partial write, wait for next event */
+                       return;
+               }
+       }
+
+       /* Queue is empty, no more data to write */
+       /* Might want to disable EV_WRITE here if needed */
+}
+
+/**
+ * Process a single TCP reply frame
+ * Decrypts reply, matches with pending command by tag, delivers result
+ */
+static void
+fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
+                                               unsigned char *data, gsize len)
+{
+       struct rspamd_fuzzy_encrypted_reply encrep;
+       const struct rspamd_fuzzy_reply *rep;
+       struct fuzzy_rule *rule = conn->rule;
+       unsigned int required_size;
+       struct fuzzy_tcp_pending_command *pending;
+       uint32_t tag;
+
+       /* Check if we have encryption */
+       if (conn->encrypted) {
+               required_size = sizeof(encrep);
+       }
+       else {
+               required_size = sizeof(struct rspamd_fuzzy_reply);
+       }
+
+       if (len < required_size) {
+               msg_warn("fuzzy_tcp: invalid reply size %d from %s, expected at least %d",
+                                (int) len, rspamd_upstream_name(conn->server), (int) required_size);
+               return;
+       }
+
+       /* Decrypt if needed - use keys from connection */
+       if (conn->encrypted) {
+               memcpy(&encrep, data, sizeof(encrep));
+
+               /* Process keys through cache */
+               rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
+                                                                        conn->local_key, conn->peer_key);
+
+               /* Decrypt with connection keys */
+               if (!rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep.rep,
+                                                                                                sizeof(encrep.rep),
+                                                                                                encrep.hdr.nonce,
+                                                                                                rspamd_pubkey_get_nm(conn->peer_key, conn->local_key),
+                                                                                                encrep.hdr.mac)) {
+                       msg_warn("fuzzy_tcp: cannot decrypt reply from %s",
+                                        rspamd_upstream_name(conn->server));
+                       return;
+               }
+
+               rep = &encrep.rep;
+       }
+       else {
+               rep = (const struct rspamd_fuzzy_reply *) data;
+       }
+
+       /* Extract tag and lookup pending command */
+       tag = rep->v1.tag;
+       pending = g_hash_table_lookup(rule->pending_requests, GINT_TO_POINTER(tag));
+
+       if (!pending) {
+               msg_debug("fuzzy_tcp: unexpected tag %u from %s",
+                                 tag, rspamd_upstream_name(conn->server));
+               return;
+       }
+
+       /* Process the reply */
+       if (rep->v1.prob > 0.5) {
+               if (pending->io->cmd.cmd == FUZZY_CHECK) {
+                       fuzzy_insert_result(pending->session, rep, &pending->io->cmd,
+                                                               pending->io, rep->v1.flag);
+               }
+       }
+
+       msg_debug("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
+                         tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
+
+       /* Remove from pending requests */
+       g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
+}
+
+/**
+ * TCP read handler
+ * Reads replies from server, parses framing, matches with pending commands
+ */
+static void
+fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
+{
+       ssize_t r;
+       gsize available_space;
+
+       /* Read data from socket into buffer */
+       available_space = sizeof(conn->read_buf) - conn->bytes_unprocessed;
+       if (available_space == 0) {
+               msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
+                               conn->rule->name);
+               conn->failed = TRUE;
+               return;
+       }
+
+       r = read(conn->fd, conn->read_buf + conn->bytes_unprocessed, available_space);
+
+       if (r == -1) {
+               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                       return; /* Try again later */
+               }
+               msg_err("fuzzy_tcp: read error for rule %s: %s",
+                               conn->rule->name, strerror(errno));
+               conn->failed = TRUE;
+               return;
+       }
+       else if (r == 0) {
+               msg_info("fuzzy_tcp: connection closed by %s for rule %s",
+                                rspamd_upstream_name(conn->server),
+                                conn->rule->name);
+               conn->failed = TRUE;
+               return;
+       }
+
+       conn->bytes_unprocessed += r;
+
+       /* Process frames using state machine */
+       unsigned int processed_offset = 0;
+
+       while (processed_offset < conn->bytes_unprocessed) {
+               uint16_t frame_len;
+
+               /* State: 0x0000 - need first byte of length */
+               if ((conn->cur_frame_state & 0xC000) == 0x0000) {
+                       if (processed_offset < conn->bytes_unprocessed) {
+                               conn->cur_frame_state = 0x8000 | conn->read_buf[processed_offset];
+                               processed_offset++;
+                       }
+                       else {
+                               break;
+                       }
+               }
+
+               /* State: 0x8000 - need second byte of length */
+               if ((conn->cur_frame_state & 0xC000) == 0x8000) {
+                       if (processed_offset < conn->bytes_unprocessed) {
+                               uint16_t first_byte = conn->cur_frame_state & 0xFF;
+                               uint16_t second_byte = conn->read_buf[processed_offset];
+                               conn->cur_frame_state = 0xC000 | ((first_byte << 8) | second_byte);
+                               processed_offset++;
+                       }
+                       else {
+                               break;
+                       }
+               }
+
+               /* State: 0xC000 - have length, reading data */
+               frame_len = conn->cur_frame_state & 0x3FFF;
+
+               if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_reply)) {
+                       msg_err("fuzzy_tcp: invalid frame length %d from %s, closing",
+                                       (int) frame_len,
+                                       rspamd_upstream_name(conn->server));
+                       conn->failed = TRUE;
+                       return;
+               }
+
+               /* Check if we have complete frame */
+               if (conn->bytes_unprocessed - processed_offset >= frame_len) {
+                       /* Process complete frame - decrypt and deliver to session */
+                       fuzzy_tcp_process_reply(conn, conn->read_buf + processed_offset, frame_len);
+
+                       processed_offset += frame_len;
+                       conn->cur_frame_state = 0x0000; /* Reset for next frame */
+               }
+               else {
+                       /* Incomplete frame, wait for more data */
+                       break;
+               }
+       }
+
+       /* Move unprocessed data to beginning of buffer */
+       if (processed_offset > 0) {
+               if (processed_offset < conn->bytes_unprocessed) {
+                       memmove(conn->read_buf,
+                                       conn->read_buf + processed_offset,
+                                       conn->bytes_unprocessed - processed_offset);
+                       conn->bytes_unprocessed -= processed_offset;
+               }
+               else {
+                       conn->bytes_unprocessed = 0;
+               }
        }
 }
 
@@ -940,10 +1502,12 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
        rule->rate_tracker.requests_count = 0;
        rule->rate_tracker.window_start = 0;
 
-       /* Initialize TCP connection state */
-       rule->tcp_state.connected = FALSE;
-       rule->tcp_state.connecting = FALSE;
-       rule->tcp_state.connection = NULL;
+       /* Initialize TCP connection pool - array of connections */
+       rule->tcp_connections = g_ptr_array_new();
+
+       /* Initialize global pending requests pool - keyed by tag */
+       rule->pending_requests = g_hash_table_new_full(g_direct_hash, g_direct_equal,
+                                                                                                  NULL, g_free);
 
        /*
         * Process rule in Lua
@@ -3818,40 +4382,36 @@ register_fuzzy_client_call(struct rspamd_task *task,
                ev_tstamp now = rspamd_get_calendar_ticks();
                fuzzy_update_rate_tracker(rule, now);
 
-               /* Decide whether to use TCP or UDP */
-               gboolean use_tcp = fuzzy_should_use_tcp(rule);
-               int sock_type = SOCK_DGRAM; /* Default to UDP */
-
-               /* If TCP should be used but not connected, initiate connection */
-               if ((rule->tcp_enabled || rule->tcp_auto) && !rule->tcp_state.connected) {
-                       /* Check if rate threshold exceeded for auto-switch */
-                       if (rule->tcp_auto && rule->tcp_window > 0) {
-                               ev_tstamp elapsed = now - rule->rate_tracker.window_start;
-                               if (elapsed > 0) {
-                                       double rate = (double) rule->rate_tracker.requests_count / elapsed;
-                                       if (rate > rule->tcp_threshold) {
-                                               /* Rate exceeded, initiate TCP connection asynchronously */
-                                               fuzzy_tcp_connect_async(rule);
-                                       }
-                               }
-                       }
-                       else if (rule->tcp_enabled) {
-                               /* TCP explicitly enabled, try to connect */
-                               fuzzy_tcp_connect_async(rule);
-                       }
+               /* Get upstream first - use read_servers for check operations */
+               selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+                                                                          NULL, 0);
+               if (!selected) {
+                       msg_warn_task("cannot get upstream for rule %s", rule->name);
+                       g_ptr_array_free(commands, TRUE);
+                       return;
                }
 
-               /* Use TCP if available and should be used, otherwise fall back to UDP */
-               if (use_tcp) {
-                       sock_type = SOCK_STREAM;
+               /* Try TCP if enabled/auto and rate threshold exceeded */
+               struct fuzzy_tcp_connection *tcp_conn = NULL;
+               if (fuzzy_should_try_tcp(rule, now)) {
+                       /* This is read server (CHECK operation) */
+                       tcp_conn = fuzzy_tcp_get_or_create_connection(rule, selected, task, FALSE);
                }
 
-               /* Get upstream - use read_servers for check operations */
-               selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
-                                                                          NULL, 0);
+               /* For now, always use UDP (TCP write/read handlers not ready yet) */
+               /* TODO: Use tcp_conn when TCP I/O handlers are implemented */
+               if (tcp_conn && tcp_conn->connected) {
+                       /* TCP connection available - will use it in future */
+                       msg_debug_task("TCP connection available for %s, but not using yet (handlers not ready)",
+                                                  rspamd_upstream_name(selected));
+               }
+
+               /* Use UDP for now */
                if (selected) {
-                       addr = rspamd_upstream_addr_next(selected);
-                       if ((sock = rspamd_inet_address_connect(addr, sock_type, TRUE)) == -1) {
+                       addr = rspamd_upstream_addr_cur(selected);
+                       rspamd_inet_address_set_port(addr, rspamd_upstream_port(selected));
+
+                       if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
                                msg_warn_task("cannot connect to %s(%s), %d, %s",
                                                          rspamd_upstream_name(selected),
                                                          rspamd_inet_address_to_string_pretty(addr),