From: Vsevolod Stakhov Date: Tue, 7 Oct 2025 11:20:10 +0000 (+0100) Subject: [Feature] Fuzzy check: implement TCP error handling and command sending X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=a88375e81b55ca226b33125f2cc12c87dd1b70b3;p=thirdparty%2Frspamd.git [Feature] Fuzzy check: implement TCP error handling and command sending Add comprehensive error handling for TCP connections: - Cleanup pending requests when connections fail - Handle timeout, write, read, and protocol errors - Track connection per pending command for cleanup Implement TCP command sending: - Add TCP framing to encrypted commands - Queue commands for asynchronous sending - Register in pending pool for reply matching - Integrate with main check flow with UDP fallback --- diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 8e136c6727..7b2d3b6d8d 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -197,10 +197,11 @@ struct fuzzy_learn_session { * Used to match replies with requests */ struct fuzzy_tcp_pending_command { - struct fuzzy_cmd_io *io; /* Command I/O */ - struct rspamd_task *task; /* Task associated with command */ - struct fuzzy_client_session *session; /* Session for reply delivery */ - ev_tstamp send_time; /* When command was sent */ + struct fuzzy_cmd_io *io; /* Command I/O */ + struct rspamd_task *task; /* Task associated with command */ + struct fuzzy_client_session *session; /* Session for reply delivery */ + struct fuzzy_tcp_connection *connection; /* Connection this command was sent on */ + ev_tstamp send_time; /* When command was sent */ }; /** @@ -517,6 +518,10 @@ fuzzy_should_try_tcp(struct fuzzy_rule *rule, ev_tstamp now) static void fuzzy_tcp_io_handler(int fd, short what, gpointer ud); static void fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn); static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn); +static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn, + GPtrArray *commands, + struct fuzzy_client_session *session); +static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn); /* Forward declarations for helper functions */ static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule); @@ -700,6 +705,55 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule, return conn; } +/** + * Cleanup pending requests for a failed connection + * Removes all pending commands associated with this connection + */ +static void +fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn) +{ + GHashTableIter iter; + gpointer key, value; + struct fuzzy_tcp_pending_command *pending; + GPtrArray *to_remove; + unsigned int i; + struct rspamd_task *task = NULL; + + if (!conn || !conn->rule || !conn->rule->pending_requests) { + return; + } + + /* Collect tags to remove (can't modify hash table during iteration) */ + to_remove = g_ptr_array_new(); + + g_hash_table_iter_init(&iter, conn->rule->pending_requests); + while (g_hash_table_iter_next(&iter, &key, &value)) { + pending = (struct fuzzy_tcp_pending_command *) value; + + if (pending->connection == conn) { + g_ptr_array_add(to_remove, key); + + /* Get task for logging */ + if (!task && pending->task) { + task = pending->task; + } + } + } + + /* Remove pending commands */ + for (i = 0; i < to_remove->len; i++) { + g_hash_table_remove(conn->rule->pending_requests, + g_ptr_array_index(to_remove, i)); + } + + if (to_remove->len > 0 && task) { + msg_info_task("fuzzy_tcp: cleaned up %d pending commands for failed connection to %s", + (int) to_remove->len, rspamd_upstream_name(conn->server)); + } + + g_ptr_array_free(to_remove, TRUE); +} + /** * Main TCP I/O event handler * Handles connection establishment, reads, writes, and timeouts @@ -719,6 +773,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) msg_warn("fuzzy_tcp: connection timeout for rule %s to %s", conn->rule->name, rspamd_upstream_name(conn->server)); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; conn->connecting = FALSE; rspamd_upstream_fail(conn->server, TRUE, "timeout"); @@ -735,6 +790,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) conn->rule->name, rspamd_upstream_name(conn->server), strerror(errno)); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; conn->connecting = FALSE; rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed"); @@ -747,6 +803,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) conn->rule->name, rspamd_upstream_name(conn->server), strerror(so_error)); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; conn->connecting = FALSE; rspamd_upstream_fail(conn->server, TRUE, strerror(so_error)); @@ -818,6 +875,7 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn) conn->rule->name, rspamd_upstream_name(conn->server), strerror(errno)); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; return; } @@ -825,6 +883,7 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn) msg_info("fuzzy_tcp: connection closed by %s for rule %s", rspamd_upstream_name(conn->server), conn->rule->name); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; return; } @@ -847,6 +906,68 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn) /* Might want to disable EV_WRITE here if needed */ } +/** + * Send commands via TCP connection + * Adds TCP framing and queues for sending, registers in pending pool + */ +static gboolean +fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn, + GPtrArray *commands, + struct fuzzy_client_session *session) +{ + struct fuzzy_cmd_io *io; + struct fuzzy_tcp_write_buf *buf; + struct fuzzy_tcp_pending_command *pending; + unsigned int i; + struct rspamd_task *task = session->task; + + for (i = 0; i < commands->len; i++) { + io = g_ptr_array_index(commands, i); + + /* Skip if already sent or replied */ + if (io->flags & (FUZZY_CMD_FLAG_SENT | FUZZY_CMD_FLAG_REPLIED)) { + continue; + } + + /* Prepare TCP framed buffer */ + buf = g_malloc0(sizeof(*buf)); + buf->data = g_malloc(io->io.iov_len); + memcpy(buf->data, io->io.iov_base, io->io.iov_len); + + /* Set frame size in network byte order (big endian) */ + buf->size_hdr = GUINT16_TO_BE((uint16_t) io->io.iov_len); + buf->total_len = sizeof(buf->size_hdr) + io->io.iov_len; + buf->bytes_written = 0; + + /* Add to write queue */ + g_queue_push_tail(conn->write_queue, buf); + + /* Register in pending requests pool */ + pending = g_malloc0(sizeof(*pending)); + pending->io = io; + pending->task = task; + pending->session = session; + pending->connection = conn; + pending->send_time = rspamd_get_calendar_ticks(); + + g_hash_table_insert(conn->rule->pending_requests, + GINT_TO_POINTER(io->tag), pending); + + /* Mark as sent */ + io->flags |= FUZZY_CMD_FLAG_SENT; + + msg_debug_fuzzy_check("fuzzy_tcp: queued command with tag %u to %s", + io->tag, rspamd_upstream_name(conn->server)); + } + + /* Ensure write events are enabled */ + if (!g_queue_is_empty(conn->write_queue)) { + rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE); + } + + return TRUE; +} + /** * Process a single TCP reply frame * Decrypts reply, matches with pending command by tag, delivers result @@ -911,6 +1032,9 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn, return; } + /* Get task for debug logging */ + struct rspamd_task *task = pending->task; + /* Process the reply */ if (rep->v1.prob > 0.5) { if (pending->io->cmd.cmd == FUZZY_CHECK) { @@ -919,8 +1043,8 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn, } } - msg_debug("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)", - tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob); + msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)", + tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob); /* Remove from pending requests */ g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag)); @@ -941,6 +1065,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) if (available_space == 0) { msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection", conn->rule->name); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; return; } @@ -953,6 +1078,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) } msg_err("fuzzy_tcp: read error for rule %s: %s", conn->rule->name, strerror(errno)); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; return; } @@ -960,6 +1086,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) msg_info("fuzzy_tcp: connection closed by %s for rule %s", rspamd_upstream_name(conn->server), conn->rule->name); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; return; } @@ -1003,6 +1130,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn) msg_err("fuzzy_tcp: invalid frame length %d from %s, closing", (int) frame_len, rspamd_upstream_name(conn->server)); + fuzzy_tcp_connection_cleanup(conn); conn->failed = TRUE; return; } @@ -4398,15 +4526,39 @@ register_fuzzy_client_call(struct rspamd_task *task, tcp_conn = fuzzy_tcp_get_or_create_connection(rule, selected, task, FALSE); } - /* For now, always use UDP (TCP write/read handlers not ready yet) */ - /* TODO: Use tcp_conn when TCP I/O handlers are implemented */ + /* Use TCP if available and connected */ if (tcp_conn && tcp_conn->connected) { - /* TCP connection available - will use it in future */ - msg_debug_task("TCP connection available for %s, but not using yet (handlers not ready)", - rspamd_upstream_name(selected)); + /* Create session for TCP */ + session = rspamd_mempool_alloc0(task->task_pool, + sizeof(struct fuzzy_client_session)); + session->state = 0; + session->commands = commands; + session->task = task; + session->server = selected; + session->rule = rule; + session->results = g_ptr_array_sized_new(32); + + /* Send commands via TCP */ + if (fuzzy_tcp_send_command(tcp_conn, commands, session)) { + msg_debug_fuzzy_check("fuzzy_tcp: sent %d commands to %s via TCP", + (int) commands->len, rspamd_upstream_name(selected)); + + rspamd_session_add_event(task->s, fuzzy_io_fin, session, M); + session->item = rspamd_symcache_get_cur_item(task); + + if (session->item) { + rspamd_symcache_item_async_inc(task, session->item, M); + } + + return; /* TCP send successful */ + } + else { + msg_warn_task("fuzzy_tcp: failed to send commands, falling back to UDP"); + /* Fall through to UDP */ + } } - /* Use UDP for now */ + /* Use UDP as fallback or when TCP not available */ if (selected) { addr = rspamd_upstream_addr_cur(selected); rspamd_inet_address_set_port(addr, rspamd_upstream_port(selected));