]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Fuzzy check: implement TCP error handling and command sending
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 11:20:10 +0000 (12:20 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 11:20:10 +0000 (12:20 +0100)
Add comprehensive error handling for TCP connections:
- Cleanup pending requests when connections fail
- Handle timeout, write, read, and protocol errors
- Track connection per pending command for cleanup

Implement TCP command sending:
- Add TCP framing to encrypted commands
- Queue commands for asynchronous sending
- Register in pending pool for reply matching
- Integrate with main check flow with UDP fallback

src/plugins/fuzzy_check.c

index 8e136c672783293f1461710c593de060759fcbec..7b2d3b6d8d5e4aa27e9f9f54a6fabc419aef0971 100644 (file)
@@ -197,10 +197,11 @@ struct fuzzy_learn_session {
  * Used to match replies with requests
  */
 struct fuzzy_tcp_pending_command {
-       struct fuzzy_cmd_io *io;              /* Command I/O */
-       struct rspamd_task *task;             /* Task associated with command */
-       struct fuzzy_client_session *session; /* Session for reply delivery */
-       ev_tstamp send_time;                  /* When command was sent */
+       struct fuzzy_cmd_io *io;                 /* Command I/O */
+       struct rspamd_task *task;                /* Task associated with command */
+       struct fuzzy_client_session *session;    /* Session for reply delivery */
+       struct fuzzy_tcp_connection *connection; /* Connection this command was sent on */
+       ev_tstamp send_time;                     /* When command was sent */
 };
 
 /**
@@ -517,6 +518,10 @@ fuzzy_should_try_tcp(struct fuzzy_rule *rule, ev_tstamp now)
 static void fuzzy_tcp_io_handler(int fd, short what, gpointer ud);
 static void fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn);
 static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn);
+static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
+                                                                          GPtrArray *commands,
+                                                                          struct fuzzy_client_session *session);
+static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn);
 
 /* Forward declarations for helper functions */
 static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule);
@@ -700,6 +705,55 @@ fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
        return conn;
 }
 
+/**
+ * Cleanup pending requests for a failed connection
+ * Removes all pending commands associated with this connection
+ */
+static void
+fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
+{
+       GHashTableIter iter;
+       gpointer key, value;
+       struct fuzzy_tcp_pending_command *pending;
+       GPtrArray *to_remove;
+       unsigned int i;
+       struct rspamd_task *task = NULL;
+
+       if (!conn || !conn->rule || !conn->rule->pending_requests) {
+               return;
+       }
+
+       /* Collect tags to remove (can't modify hash table during iteration) */
+       to_remove = g_ptr_array_new();
+
+       g_hash_table_iter_init(&iter, conn->rule->pending_requests);
+       while (g_hash_table_iter_next(&iter, &key, &value)) {
+               pending = (struct fuzzy_tcp_pending_command *) value;
+
+               if (pending->connection == conn) {
+                       g_ptr_array_add(to_remove, key);
+
+                       /* Get task for logging */
+                       if (!task && pending->task) {
+                               task = pending->task;
+                       }
+               }
+       }
+
+       /* Remove pending commands */
+       for (i = 0; i < to_remove->len; i++) {
+               g_hash_table_remove(conn->rule->pending_requests,
+                                                       g_ptr_array_index(to_remove, i));
+       }
+
+       if (to_remove->len > 0 && task) {
+               msg_info_task("fuzzy_tcp: cleaned up %d pending commands for failed connection to %s",
+                                         (int) to_remove->len, rspamd_upstream_name(conn->server));
+       }
+
+       g_ptr_array_free(to_remove, TRUE);
+}
+
 /**
  * Main TCP I/O event handler
  * Handles connection establishment, reads, writes, and timeouts
@@ -719,6 +773,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                msg_warn("fuzzy_tcp: connection timeout for rule %s to %s",
                                 conn->rule->name,
                                 rspamd_upstream_name(conn->server));
+               fuzzy_tcp_connection_cleanup(conn);
                conn->failed = TRUE;
                conn->connecting = FALSE;
                rspamd_upstream_fail(conn->server, TRUE, "timeout");
@@ -735,6 +790,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                                                 conn->rule->name,
                                                 rspamd_upstream_name(conn->server),
                                                 strerror(errno));
+                               fuzzy_tcp_connection_cleanup(conn);
                                conn->failed = TRUE;
                                conn->connecting = FALSE;
                                rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
@@ -747,6 +803,7 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                                                 conn->rule->name,
                                                 rspamd_upstream_name(conn->server),
                                                 strerror(so_error));
+                               fuzzy_tcp_connection_cleanup(conn);
                                conn->failed = TRUE;
                                conn->connecting = FALSE;
                                rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
@@ -818,6 +875,7 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
                                        conn->rule->name,
                                        rspamd_upstream_name(conn->server),
                                        strerror(errno));
+                       fuzzy_tcp_connection_cleanup(conn);
                        conn->failed = TRUE;
                        return;
                }
@@ -825,6 +883,7 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
                        msg_info("fuzzy_tcp: connection closed by %s for rule %s",
                                         rspamd_upstream_name(conn->server),
                                         conn->rule->name);
+                       fuzzy_tcp_connection_cleanup(conn);
                        conn->failed = TRUE;
                        return;
                }
@@ -847,6 +906,68 @@ fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
        /* Might want to disable EV_WRITE here if needed */
 }
 
+/**
+ * Send commands via TCP connection
+ * Adds TCP framing and queues for sending, registers in pending pool
+ */
+static gboolean
+fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
+                                          GPtrArray *commands,
+                                          struct fuzzy_client_session *session)
+{
+       struct fuzzy_cmd_io *io;
+       struct fuzzy_tcp_write_buf *buf;
+       struct fuzzy_tcp_pending_command *pending;
+       unsigned int i;
+       struct rspamd_task *task = session->task;
+
+       for (i = 0; i < commands->len; i++) {
+               io = g_ptr_array_index(commands, i);
+
+               /* Skip if already sent or replied */
+               if (io->flags & (FUZZY_CMD_FLAG_SENT | FUZZY_CMD_FLAG_REPLIED)) {
+                       continue;
+               }
+
+               /* Prepare TCP framed buffer */
+               buf = g_malloc0(sizeof(*buf));
+               buf->data = g_malloc(io->io.iov_len);
+               memcpy(buf->data, io->io.iov_base, io->io.iov_len);
+
+               /* Set frame size in network byte order (big endian) */
+               buf->size_hdr = GUINT16_TO_BE((uint16_t) io->io.iov_len);
+               buf->total_len = sizeof(buf->size_hdr) + io->io.iov_len;
+               buf->bytes_written = 0;
+
+               /* Add to write queue */
+               g_queue_push_tail(conn->write_queue, buf);
+
+               /* Register in pending requests pool */
+               pending = g_malloc0(sizeof(*pending));
+               pending->io = io;
+               pending->task = task;
+               pending->session = session;
+               pending->connection = conn;
+               pending->send_time = rspamd_get_calendar_ticks();
+
+               g_hash_table_insert(conn->rule->pending_requests,
+                                                       GINT_TO_POINTER(io->tag), pending);
+
+               /* Mark as sent */
+               io->flags |= FUZZY_CMD_FLAG_SENT;
+
+               msg_debug_fuzzy_check("fuzzy_tcp: queued command with tag %u to %s",
+                                                         io->tag, rspamd_upstream_name(conn->server));
+       }
+
+       /* Ensure write events are enabled */
+       if (!g_queue_is_empty(conn->write_queue)) {
+               rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE);
+       }
+
+       return TRUE;
+}
+
 /**
  * Process a single TCP reply frame
  * Decrypts reply, matches with pending command by tag, delivers result
@@ -911,6 +1032,9 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
                return;
        }
 
+       /* Get task for debug logging */
+       struct rspamd_task *task = pending->task;
+
        /* Process the reply */
        if (rep->v1.prob > 0.5) {
                if (pending->io->cmd.cmd == FUZZY_CHECK) {
@@ -919,8 +1043,8 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
                }
        }
 
-       msg_debug("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
-                         tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
+       msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
+                                                 tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
 
        /* Remove from pending requests */
        g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
@@ -941,6 +1065,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
        if (available_space == 0) {
                msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
                                conn->rule->name);
+               fuzzy_tcp_connection_cleanup(conn);
                conn->failed = TRUE;
                return;
        }
@@ -953,6 +1078,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                }
                msg_err("fuzzy_tcp: read error for rule %s: %s",
                                conn->rule->name, strerror(errno));
+               fuzzy_tcp_connection_cleanup(conn);
                conn->failed = TRUE;
                return;
        }
@@ -960,6 +1086,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                msg_info("fuzzy_tcp: connection closed by %s for rule %s",
                                 rspamd_upstream_name(conn->server),
                                 conn->rule->name);
+               fuzzy_tcp_connection_cleanup(conn);
                conn->failed = TRUE;
                return;
        }
@@ -1003,6 +1130,7 @@ fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
                        msg_err("fuzzy_tcp: invalid frame length %d from %s, closing",
                                        (int) frame_len,
                                        rspamd_upstream_name(conn->server));
+                       fuzzy_tcp_connection_cleanup(conn);
                        conn->failed = TRUE;
                        return;
                }
@@ -4398,15 +4526,39 @@ register_fuzzy_client_call(struct rspamd_task *task,
                        tcp_conn = fuzzy_tcp_get_or_create_connection(rule, selected, task, FALSE);
                }
 
-               /* For now, always use UDP (TCP write/read handlers not ready yet) */
-               /* TODO: Use tcp_conn when TCP I/O handlers are implemented */
+               /* Use TCP if available and connected */
                if (tcp_conn && tcp_conn->connected) {
-                       /* TCP connection available - will use it in future */
-                       msg_debug_task("TCP connection available for %s, but not using yet (handlers not ready)",
-                                                  rspamd_upstream_name(selected));
+                       /* Create session for TCP */
+                       session = rspamd_mempool_alloc0(task->task_pool,
+                                                                                       sizeof(struct fuzzy_client_session));
+                       session->state = 0;
+                       session->commands = commands;
+                       session->task = task;
+                       session->server = selected;
+                       session->rule = rule;
+                       session->results = g_ptr_array_sized_new(32);
+
+                       /* Send commands via TCP */
+                       if (fuzzy_tcp_send_command(tcp_conn, commands, session)) {
+                               msg_debug_fuzzy_check("fuzzy_tcp: sent %d commands to %s via TCP",
+                                                                         (int) commands->len, rspamd_upstream_name(selected));
+
+                               rspamd_session_add_event(task->s, fuzzy_io_fin, session, M);
+                               session->item = rspamd_symcache_get_cur_item(task);
+
+                               if (session->item) {
+                                       rspamd_symcache_item_async_inc(task, session->item, M);
+                               }
+
+                               return; /* TCP send successful */
+                       }
+                       else {
+                               msg_warn_task("fuzzy_tcp: failed to send commands, falling back to UDP");
+                               /* Fall through to UDP */
+                       }
                }
 
-               /* Use UDP for now */
+               /* Use UDP as fallback or when TCP not available */
                if (selected) {
                        addr = rspamd_upstream_addr_cur(selected);
                        rspamd_inet_address_set_port(addr, rspamd_upstream_port(selected));