From: Vsevolod Stakhov Date: Tue, 7 Oct 2025 08:19:40 +0000 (+0100) Subject: [Feature] Fuzzy check: implement TCP connection management X-Git-Tag: 3.14.0~84^2~18 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=393ef424104cef4af36c969e80d27e22e3c547ef;p=thirdparty%2Frspamd.git [Feature] Fuzzy check: implement TCP connection management Add async TCP connection establishment and I/O framework. This implements Phase 2 of the TCP support - connection management with event-driven architecture. Changes: - Add fuzzy_tcp_connection structure for per-rule TCP state - Add fuzzy_tcp_pending_command for request/reply matching - Implement fuzzy_tcp_connect_async() with non-blocking connect - Implement fuzzy_tcp_io_handler() for connection/read/write events - Add connection lifecycle management with reference counting - Handle connection establishment with getsockopt SO_ERROR check - Add timeout handling and upstream failure reporting - Add placeholder write and read handlers for next phase TCP connection is established lazily when rate threshold is exceeded. Event handler manages connection state machine: connecting -> connected. Write/read handlers will be implemented in Phase 3. --- diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 20beda113c..8e136c6727 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -122,12 +122,9 @@ struct fuzzy_rule { ev_tstamp window_start; /* Start of the current window */ } rate_tracker; - /* TCP connection state */ - struct { - gboolean connected; /* TCP connection is active and ready */ - gboolean connecting; /* TCP connection in progress */ - void *connection; /* Placeholder for TCP connection handle (future) */ - } tcp_state; + /* TCP connection pool - array of connections, one per upstream */ + GPtrArray *tcp_connections; /* Array of fuzzy_tcp_connection* */ + GHashTable *pending_requests; /* Global: tag -> fuzzy_tcp_pending_command */ }; struct fuzzy_ctx { @@ -195,6 +192,65 @@ struct fuzzy_learn_session { int retransmits; }; +/** + * Pending command awaiting TCP reply + * Used to match replies with requests + */ +struct fuzzy_tcp_pending_command { + struct fuzzy_cmd_io *io; /* Command I/O */ + struct rspamd_task *task; /* Task associated with command */ + struct fuzzy_client_session *session; /* Session for reply delivery */ + ev_tstamp send_time; /* When command was sent */ +}; + +/** + * TCP write queue element - contains framed command ready to send + */ +struct fuzzy_tcp_write_buf { + uint16_t size_hdr; /* Frame size in network byte order */ + unsigned char *data; /* Command data (encrypted) */ + gsize total_len; /* Total length: sizeof(size_hdr) + data_len */ + gsize bytes_written; /* How many bytes already written */ +}; + +/** + * TCP connection state for a fuzzy rule + * One TCP connection per fuzzy_rule, shared across all tasks + */ +struct fuzzy_tcp_connection { + struct fuzzy_rule *rule; /* Parent rule */ + struct upstream *server; /* Connected upstream */ + rspamd_inet_addr_t *addr; /* Server address */ + + int fd; /* Socket file descriptor */ + struct ev_loop *event_loop; /* Event loop */ + struct rspamd_io_ev ev; /* Event watcher */ + + /* Write state */ + GQueue *write_queue; /* Queue of iovec to send */ + gsize bytes_sent; /* Bytes sent from current iovec */ + + /* Read state - TCP framing */ + uint16_t cur_frame_state; /* 0x0000/0x8000/0xC000 like server */ + gsize bytes_unprocessed; /* Bytes in read_buf not yet processed */ + unsigned char read_buf[8192]; /* Read buffer for incoming data */ + + /* Encryption keys for this connection */ + struct rspamd_cryptobox_keypair *local_key; /* Local keypair (can be NULL) */ + struct rspamd_cryptobox_pubkey *peer_key; /* Server public key (can be NULL) */ + gboolean encrypted; /* TRUE if connection uses encryption */ + + /* Connection state */ + gboolean connected; /* TCP handshake complete */ + gboolean connecting; /* Connection in progress */ + gboolean failed; /* Connection failed */ + + ev_tstamp connect_start; /* When connection started */ + ev_tstamp last_activity; /* Last I/O activity */ + + ref_entry_t ref; /* Reference counting */ +}; + #define FUZZY_CMD_FLAG_REPLIED (1 << 0) #define FUZZY_CMD_FLAG_SENT (1 << 1) #define FUZZY_CMD_FLAG_IMAGE (1 << 2) @@ -400,6 +456,16 @@ fuzzy_free_rule(gpointer r) if (rule->write_peer_key) { rspamd_pubkey_unref(rule->write_peer_key); } + + /* Clean up TCP connections */ + if (rule->tcp_connections) { + g_ptr_array_free(rule->tcp_connections, TRUE); + } + + /* Clean up pending requests pool */ + if (rule->pending_requests) { + g_hash_table_destroy(rule->pending_requests); + } } /** @@ -423,27 +489,23 @@ fuzzy_update_rate_tracker(struct fuzzy_rule *rule, ev_tstamp now) } /** - * Determine whether to use TCP for this request - * Returns TRUE if TCP should be used, FALSE for UDP + * Check if TCP should be attempted based on configuration and rate */ static gboolean -fuzzy_should_use_tcp(struct fuzzy_rule *rule) +fuzzy_should_try_tcp(struct fuzzy_rule *rule, ev_tstamp now) { - /* If TCP is explicitly enabled, always prefer TCP when connected */ - if (rule->tcp_enabled && rule->tcp_state.connected) { + /* TCP explicitly enabled - always try */ + if (rule->tcp_enabled) { return TRUE; } - /* If auto-switch is enabled and TCP is connected */ - if (rule->tcp_auto && rule->tcp_state.connected) { - /* Calculate current rate */ - if (rule->rate_tracker.window_start > 0 && rule->tcp_window > 0) { - ev_tstamp elapsed = rspamd_get_calendar_ticks() - rule->rate_tracker.window_start; - if (elapsed > 0) { - double rate = (double) rule->rate_tracker.requests_count / elapsed; - if (rate > rule->tcp_threshold) { - return TRUE; - } + /* TCP auto-switch based on rate */ + if (rule->tcp_auto && rule->tcp_window > 0) { + ev_tstamp elapsed = now - rule->rate_tracker.window_start; + if (elapsed > 0) { + double rate = (double) rule->rate_tracker.requests_count / elapsed; + if (rate > rule->tcp_threshold) { + return TRUE; } } } @@ -451,25 +513,525 @@ fuzzy_should_use_tcp(struct fuzzy_rule *rule) return FALSE; } +/* Forward declarations for TCP functions */ +static void fuzzy_tcp_io_handler(int fd, short what, gpointer ud); +static void fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn); +static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn); + +/* Forward declarations for helper functions */ +static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule); +static void fuzzy_insert_result(struct fuzzy_client_session *session, + const struct rspamd_fuzzy_reply *rep, + struct rspamd_fuzzy_cmd *cmd, + struct fuzzy_cmd_io *io, + unsigned int flag); + +#define FUZZY_TCP_RETAIN(x) REF_RETAIN(x) +#define FUZZY_TCP_RELEASE(x) REF_RELEASE(x) + /** - * Initiate asynchronous TCP connection (placeholder) - * This function will be implemented later with actual TCP connection logic + * Free TCP connection resources */ static void -fuzzy_tcp_connect_async(struct fuzzy_rule *rule) +fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn) { - /* Mark connection as in progress */ - if (!rule->tcp_state.connecting && !rule->tcp_state.connected) { - rule->tcp_state.connecting = TRUE; - - /* TODO: Implement actual async TCP connection - * - Create socket - * - Set non-blocking mode - * - Start connect() - * - Register event handler for connection completion - * - On success: set tcp_state.connected = TRUE, connecting = FALSE - * - On failure: set connecting = FALSE, schedule retry - */ + if (conn->fd != -1) { + rspamd_ev_watcher_stop(conn->event_loop, &conn->ev); + close(conn->fd); + } + + if (conn->write_queue) { + g_queue_free(conn->write_queue); + } + + g_free(conn); +} + +/** + * Create new TCP connection structure + */ +static struct fuzzy_tcp_connection * +fuzzy_tcp_connection_new(struct fuzzy_rule *rule, struct ev_loop *event_loop) +{ + struct fuzzy_tcp_connection *conn; + + conn = g_malloc0(sizeof(struct fuzzy_tcp_connection)); + conn->rule = rule; + conn->fd = -1; + conn->event_loop = event_loop; + conn->write_queue = g_queue_new(); + conn->connected = FALSE; + conn->connecting = FALSE; + conn->failed = FALSE; + conn->cur_frame_state = 0x0000; + conn->bytes_sent = 0; + conn->bytes_unprocessed = 0; + + REF_INIT_RETAIN(conn, fuzzy_tcp_connection_free); + + return conn; +} + +/** + * Initiate asynchronous TCP connection for specific upstream + */ +static struct fuzzy_tcp_connection * +fuzzy_tcp_connect_async(struct fuzzy_rule *rule, + struct upstream *upstream, + struct rspamd_task *task, + gboolean is_write_server) +{ + struct fuzzy_tcp_connection *conn; + rspamd_inet_addr_t *addr; + int fd; + + /* Get current server address (not next, to avoid address rotation) */ + addr = rspamd_upstream_addr_cur(upstream); + if (!addr) { + msg_warn_task("fuzzy_tcp: no address for upstream %s in rule %s", + rspamd_upstream_name(upstream), + rule->name); + return NULL; + } + + rspamd_inet_address_set_port(addr, rspamd_upstream_port(upstream)); + + /* Create non-blocking TCP socket */ + fd = rspamd_inet_address_connect(addr, SOCK_STREAM, TRUE); + if (fd == -1) { + msg_warn_task("fuzzy_tcp: cannot connect to %s (%s): %s", + rspamd_upstream_name(upstream), + rspamd_inet_address_to_string_pretty(addr), + strerror(errno)); + rspamd_upstream_fail(upstream, FALSE, strerror(errno)); + return NULL; + } + + /* Create connection structure */ + conn = fuzzy_tcp_connection_new(rule, task->event_loop); + conn->fd = fd; + conn->server = upstream; + conn->addr = addr; + conn->connecting = TRUE; + conn->connect_start = rspamd_get_calendar_ticks(); + + /* Determine encryption keys for this connection */ + if (fuzzy_rule_has_encryption(rule)) { + if (is_write_server) { + /* Write server - use write keys */ + conn->local_key = rule->write_local_key ? rule->write_local_key : rule->local_key; + conn->peer_key = rule->write_peer_key ? rule->write_peer_key : rule->peer_key; + } + else { + /* Read server - use read keys */ + conn->local_key = rule->read_local_key ? rule->read_local_key : rule->local_key; + conn->peer_key = rule->read_peer_key ? rule->read_peer_key : rule->peer_key; + } + conn->encrypted = TRUE; + } + else { + conn->local_key = NULL; + conn->peer_key = NULL; + conn->encrypted = FALSE; + } + + /* Initialize event watcher for connection establishment (wait for write) */ + rspamd_ev_watcher_init(&conn->ev, fd, EV_WRITE, + fuzzy_tcp_io_handler, conn); + rspamd_ev_watcher_start(conn->event_loop, &conn->ev, rule->tcp_timeout); + + FUZZY_TCP_RETAIN(conn); + + /* Store in connection pool array */ + g_ptr_array_add(rule->tcp_connections, conn); + + msg_info_task("fuzzy_tcp: initiating connection to %s for rule %s", + rspamd_inet_address_to_string_pretty(addr), + rule->name); + + return conn; +} + +/** + * Get or create TCP connection for specific upstream + * Returns existing connection if available, creates new one if needed + */ +static struct fuzzy_tcp_connection * +fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule, + struct upstream *upstream, + struct rspamd_task *task, + gboolean is_write_server) +{ + struct fuzzy_tcp_connection *conn = NULL; + guint i; + + /* Search for existing connection to this upstream */ + for (i = 0; i < rule->tcp_connections->len; i++) { + struct fuzzy_tcp_connection *c = g_ptr_array_index(rule->tcp_connections, i); + + if (c->server == upstream) { + conn = c; + break; + } + } + + if (conn) { + /* Connection exists - check state */ + if (conn->connected) { + /* Ready to use */ + return conn; + } + else if (conn->connecting) { + /* Connection in progress - cannot use yet */ + return NULL; + } + else if (conn->failed) { + /* Previous connection failed - remove and recreate */ + g_ptr_array_remove(rule->tcp_connections, conn); + conn = NULL; + } + } + + /* No connection or failed - create new one */ + if (!conn) { + conn = fuzzy_tcp_connect_async(rule, upstream, task, is_write_server); + } + + return conn; +} + +/** + * Main TCP I/O event handler + * Handles connection establishment, reads, writes, and timeouts + */ +static void +fuzzy_tcp_io_handler(int fd, short what, gpointer ud) +{ + struct fuzzy_tcp_connection *conn = ud; + int so_error = 0; + socklen_t so_len = sizeof(so_error); + + FUZZY_TCP_RETAIN(conn); + + conn->last_activity = rspamd_get_calendar_ticks(); + + if (what == EV_TIMEOUT) { + msg_warn("fuzzy_tcp: connection timeout for rule %s to %s", + conn->rule->name, + rspamd_upstream_name(conn->server)); + conn->failed = TRUE; + conn->connecting = FALSE; + rspamd_upstream_fail(conn->server, TRUE, "timeout"); + FUZZY_TCP_RELEASE(conn); + return; + } + + if (what == EV_WRITE) { + /* Check if we're still connecting */ + if (conn->connecting && !conn->connected) { + /* Verify connection succeeded */ + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_len) == -1) { + msg_warn("fuzzy_tcp: getsockopt failed for rule %s to %s: %s", + conn->rule->name, + rspamd_upstream_name(conn->server), + strerror(errno)); + conn->failed = TRUE; + conn->connecting = FALSE; + rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed"); + FUZZY_TCP_RELEASE(conn); + return; + } + + if (so_error != 0) { + msg_warn("fuzzy_tcp: connection failed for rule %s to %s: %s", + conn->rule->name, + rspamd_upstream_name(conn->server), + strerror(so_error)); + conn->failed = TRUE; + conn->connecting = FALSE; + rspamd_upstream_fail(conn->server, TRUE, strerror(so_error)); + FUZZY_TCP_RELEASE(conn); + return; + } + + /* Connection established! */ + conn->connected = TRUE; + conn->connecting = FALSE; + + msg_info("fuzzy_tcp: connection established to %s for rule %s", + rspamd_inet_address_to_string_pretty(conn->addr), + conn->rule->name); + + rspamd_upstream_ok(conn->server); + + /* Now wait for both read and write events */ + rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, + EV_READ | EV_WRITE); + } + else if (conn->connected) { + /* Handle write */ + fuzzy_tcp_write_handler(conn); + } + } + + if (what == EV_READ && conn->connected) { + /* Handle read */ + fuzzy_tcp_read_handler(conn); + } + + FUZZY_TCP_RELEASE(conn); +} + +/** + * TCP write handler + * Sends queued commands to the server with TCP framing + */ +static void +fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn) +{ + struct fuzzy_tcp_write_buf *buf; + ssize_t r; + + while ((buf = g_queue_peek_head(conn->write_queue)) != NULL) { + /* Write remaining data */ + gsize remaining = buf->total_len - buf->bytes_written; + unsigned char *write_ptr; + + /* Determine what to write: size_hdr or data */ + if (buf->bytes_written < sizeof(buf->size_hdr)) { + /* Still writing size header */ + write_ptr = (unsigned char *) &buf->size_hdr + buf->bytes_written; + } + else { + /* Writing data */ + write_ptr = buf->data + (buf->bytes_written - sizeof(buf->size_hdr)); + } + + r = write(conn->fd, write_ptr, remaining); + + if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + /* Cannot write more, wait for next event */ + return; + } + msg_err("fuzzy_tcp: write error for rule %s to %s: %s", + conn->rule->name, + rspamd_upstream_name(conn->server), + strerror(errno)); + conn->failed = TRUE; + return; + } + else if (r == 0) { + msg_info("fuzzy_tcp: connection closed by %s for rule %s", + rspamd_upstream_name(conn->server), + conn->rule->name); + conn->failed = TRUE; + return; + } + + buf->bytes_written += r; + + if (buf->bytes_written >= buf->total_len) { + /* Buffer fully sent, remove from queue and free */ + g_queue_pop_head(conn->write_queue); + g_free(buf->data); + g_free(buf); + } + else { + /* Partial write, wait for next event */ + return; + } + } + + /* Queue is empty, no more data to write */ + /* Might want to disable EV_WRITE here if needed */ +} + +/** + * Process a single TCP reply frame + * Decrypts reply, matches with pending command by tag, delivers result + */ +static void +fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn, + unsigned char *data, gsize len) +{ + struct rspamd_fuzzy_encrypted_reply encrep; + const struct rspamd_fuzzy_reply *rep; + struct fuzzy_rule *rule = conn->rule; + unsigned int required_size; + struct fuzzy_tcp_pending_command *pending; + uint32_t tag; + + /* Check if we have encryption */ + if (conn->encrypted) { + required_size = sizeof(encrep); + } + else { + required_size = sizeof(struct rspamd_fuzzy_reply); + } + + if (len < required_size) { + msg_warn("fuzzy_tcp: invalid reply size %d from %s, expected at least %d", + (int) len, rspamd_upstream_name(conn->server), (int) required_size); + return; + } + + /* Decrypt if needed - use keys from connection */ + if (conn->encrypted) { + memcpy(&encrep, data, sizeof(encrep)); + + /* Process keys through cache */ + rspamd_keypair_cache_process(rule->ctx->keypairs_cache, + conn->local_key, conn->peer_key); + + /* Decrypt with connection keys */ + if (!rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep.rep, + sizeof(encrep.rep), + encrep.hdr.nonce, + rspamd_pubkey_get_nm(conn->peer_key, conn->local_key), + encrep.hdr.mac)) { + msg_warn("fuzzy_tcp: cannot decrypt reply from %s", + rspamd_upstream_name(conn->server)); + return; + } + + rep = &encrep.rep; + } + else { + rep = (const struct rspamd_fuzzy_reply *) data; + } + + /* Extract tag and lookup pending command */ + tag = rep->v1.tag; + pending = g_hash_table_lookup(rule->pending_requests, GINT_TO_POINTER(tag)); + + if (!pending) { + msg_debug("fuzzy_tcp: unexpected tag %u from %s", + tag, rspamd_upstream_name(conn->server)); + return; + } + + /* Process the reply */ + if (rep->v1.prob > 0.5) { + if (pending->io->cmd.cmd == FUZZY_CHECK) { + fuzzy_insert_result(pending->session, rep, &pending->io->cmd, + pending->io, rep->v1.flag); + } + } + + msg_debug("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)", + tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob); + + /* Remove from pending requests */ + g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag)); +} + +/** + * TCP read handler + * Reads replies from server, parses framing, matches with pending commands + */ +static void +fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) +{ + ssize_t r; + gsize available_space; + + /* Read data from socket into buffer */ + available_space = sizeof(conn->read_buf) - conn->bytes_unprocessed; + if (available_space == 0) { + msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection", + conn->rule->name); + conn->failed = TRUE; + return; + } + + r = read(conn->fd, conn->read_buf + conn->bytes_unprocessed, available_space); + + if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + return; /* Try again later */ + } + msg_err("fuzzy_tcp: read error for rule %s: %s", + conn->rule->name, strerror(errno)); + conn->failed = TRUE; + return; + } + else if (r == 0) { + msg_info("fuzzy_tcp: connection closed by %s for rule %s", + rspamd_upstream_name(conn->server), + conn->rule->name); + conn->failed = TRUE; + return; + } + + conn->bytes_unprocessed += r; + + /* Process frames using state machine */ + unsigned int processed_offset = 0; + + while (processed_offset < conn->bytes_unprocessed) { + uint16_t frame_len; + + /* State: 0x0000 - need first byte of length */ + if ((conn->cur_frame_state & 0xC000) == 0x0000) { + if (processed_offset < conn->bytes_unprocessed) { + conn->cur_frame_state = 0x8000 | conn->read_buf[processed_offset]; + processed_offset++; + } + else { + break; + } + } + + /* State: 0x8000 - need second byte of length */ + if ((conn->cur_frame_state & 0xC000) == 0x8000) { + if (processed_offset < conn->bytes_unprocessed) { + uint16_t first_byte = conn->cur_frame_state & 0xFF; + uint16_t second_byte = conn->read_buf[processed_offset]; + conn->cur_frame_state = 0xC000 | ((first_byte << 8) | second_byte); + processed_offset++; + } + else { + break; + } + } + + /* State: 0xC000 - have length, reading data */ + frame_len = conn->cur_frame_state & 0x3FFF; + + if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_reply)) { + msg_err("fuzzy_tcp: invalid frame length %d from %s, closing", + (int) frame_len, + rspamd_upstream_name(conn->server)); + conn->failed = TRUE; + return; + } + + /* Check if we have complete frame */ + if (conn->bytes_unprocessed - processed_offset >= frame_len) { + /* Process complete frame - decrypt and deliver to session */ + fuzzy_tcp_process_reply(conn, conn->read_buf + processed_offset, frame_len); + + processed_offset += frame_len; + conn->cur_frame_state = 0x0000; /* Reset for next frame */ + } + else { + /* Incomplete frame, wait for more data */ + break; + } + } + + /* Move unprocessed data to beginning of buffer */ + if (processed_offset > 0) { + if (processed_offset < conn->bytes_unprocessed) { + memmove(conn->read_buf, + conn->read_buf + processed_offset, + conn->bytes_unprocessed - processed_offset); + conn->bytes_unprocessed -= processed_offset; + } + else { + conn->bytes_unprocessed = 0; + } } } @@ -940,10 +1502,12 @@ fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj, rule->rate_tracker.requests_count = 0; rule->rate_tracker.window_start = 0; - /* Initialize TCP connection state */ - rule->tcp_state.connected = FALSE; - rule->tcp_state.connecting = FALSE; - rule->tcp_state.connection = NULL; + /* Initialize TCP connection pool - array of connections */ + rule->tcp_connections = g_ptr_array_new(); + + /* Initialize global pending requests pool - keyed by tag */ + rule->pending_requests = g_hash_table_new_full(g_direct_hash, g_direct_equal, + NULL, g_free); /* * Process rule in Lua @@ -3818,40 +4382,36 @@ register_fuzzy_client_call(struct rspamd_task *task, ev_tstamp now = rspamd_get_calendar_ticks(); fuzzy_update_rate_tracker(rule, now); - /* Decide whether to use TCP or UDP */ - gboolean use_tcp = fuzzy_should_use_tcp(rule); - int sock_type = SOCK_DGRAM; /* Default to UDP */ - - /* If TCP should be used but not connected, initiate connection */ - if ((rule->tcp_enabled || rule->tcp_auto) && !rule->tcp_state.connected) { - /* Check if rate threshold exceeded for auto-switch */ - if (rule->tcp_auto && rule->tcp_window > 0) { - ev_tstamp elapsed = now - rule->rate_tracker.window_start; - if (elapsed > 0) { - double rate = (double) rule->rate_tracker.requests_count / elapsed; - if (rate > rule->tcp_threshold) { - /* Rate exceeded, initiate TCP connection asynchronously */ - fuzzy_tcp_connect_async(rule); - } - } - } - else if (rule->tcp_enabled) { - /* TCP explicitly enabled, try to connect */ - fuzzy_tcp_connect_async(rule); - } + /* Get upstream first - use read_servers for check operations */ + selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, 0); + if (!selected) { + msg_warn_task("cannot get upstream for rule %s", rule->name); + g_ptr_array_free(commands, TRUE); + return; } - /* Use TCP if available and should be used, otherwise fall back to UDP */ - if (use_tcp) { - sock_type = SOCK_STREAM; + /* Try TCP if enabled/auto and rate threshold exceeded */ + struct fuzzy_tcp_connection *tcp_conn = NULL; + if (fuzzy_should_try_tcp(rule, now)) { + /* This is read server (CHECK operation) */ + tcp_conn = fuzzy_tcp_get_or_create_connection(rule, selected, task, FALSE); } - /* Get upstream - use read_servers for check operations */ - selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN, - NULL, 0); + /* For now, always use UDP (TCP write/read handlers not ready yet) */ + /* TODO: Use tcp_conn when TCP I/O handlers are implemented */ + if (tcp_conn && tcp_conn->connected) { + /* TCP connection available - will use it in future */ + msg_debug_task("TCP connection available for %s, but not using yet (handlers not ready)", + rspamd_upstream_name(selected)); + } + + /* Use UDP for now */ if (selected) { - addr = rspamd_upstream_addr_next(selected); - if ((sock = rspamd_inet_address_connect(addr, sock_type, TRUE)) == -1) { + addr = rspamd_upstream_addr_cur(selected); + rspamd_inet_address_set_port(addr, rspamd_upstream_port(selected)); + + if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) { msg_warn_task("cannot connect to %s(%s), %d, %s", rspamd_upstream_name(selected), rspamd_inet_address_to_string_pretty(addr),