]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Fuzzy check: add reply processing and lifecycle management
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 12:00:58 +0000 (13:00 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 12:00:58 +0000 (13:00 +0100)
Complete TCP reply handling:
- Process all error codes (403, 503, 415, 401) like UDP
- Handle FUZZY_STAT commands with proper storage
- Mark commands as replied and check session completion

Add memory safety and lifecycle management:
- Cleanup pending requests when task finishes before reply
- Timeout checking for pending requests (io_timeout)
- Proper session cleanup for TCP (no fd/ev_watcher)
- Initialize TCP session fields (fd=-1, event_loop)

Prevents use-after-free when:
- Task completes before TCP reply arrives
- Reply takes too long (timeout)
- Connection fails with pending requests

src/plugins/fuzzy_check.c

index 7b2d3b6d8d5e4aa27e9f9f54a6fabc419aef0971..0e733504e83309d1243223dceeebf2e4a3346be8 100644 (file)
@@ -522,6 +522,7 @@ static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
                                                                           GPtrArray *commands,
                                                                           struct fuzzy_client_session *session);
 static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn);
+static gboolean fuzzy_check_session_is_completed(struct fuzzy_client_session *session);
 
 /* Forward declarations for helper functions */
 static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule);
@@ -754,6 +755,54 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
        g_ptr_array_free(to_remove, TRUE);
 }
 
+/**
+ * Check and cleanup timed out pending requests
+ * Called periodically to remove requests that waited too long for reply
+ */
+static void
+fuzzy_tcp_check_pending_timeouts(struct fuzzy_rule *rule, ev_tstamp now)
+{
+       GHashTableIter iter;
+       gpointer key, value;
+       struct fuzzy_tcp_pending_command *pending;
+       GPtrArray *to_remove;
+       unsigned int i;
+       ev_tstamp timeout;
+
+       if (!rule || !rule->pending_requests) {
+               return;
+       }
+
+       timeout = rule->io_timeout;
+       to_remove = g_ptr_array_new();
+
+       g_hash_table_iter_init(&iter, rule->pending_requests);
+       while (g_hash_table_iter_next(&iter, &key, &value)) {
+               pending = (struct fuzzy_tcp_pending_command *) value;
+
+               /* Check if request timed out */
+               if ((now - pending->send_time) > timeout) {
+                       g_ptr_array_add(to_remove, key);
+
+                       if (pending->task) {
+                               struct rspamd_task *task = pending->task;
+                               msg_info_task("fuzzy_tcp: request timeout after %.2fs for tag %u to %s",
+                                                         now - pending->send_time,
+                                                         pending->io->tag,
+                                                         rspamd_upstream_name(pending->connection->server));
+                       }
+               }
+       }
+
+       /* Remove timed out commands */
+       for (i = 0; i < to_remove->len; i++) {
+               g_hash_table_remove(rule->pending_requests,
+                                                       g_ptr_array_index(to_remove, i));
+       }
+
+       g_ptr_array_free(to_remove, TRUE);
+}
+
 /**
  * Main TCP I/O event handler
  * Handles connection establishment, reads, writes, and timeouts
@@ -836,6 +885,11 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
                fuzzy_tcp_read_handler(conn);
        }
 
+       /* Check for timed out pending requests */
+       if (conn->connected) {
+               fuzzy_tcp_check_pending_timeouts(conn->rule, conn->last_activity);
+       }
+
        FUZZY_TCP_RELEASE(conn);
 }
 
@@ -1035,12 +1089,70 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
        /* Get task for debug logging */
        struct rspamd_task *task = pending->task;
 
-       /* Process the reply */
+       /* Process the reply - similar to UDP code in fuzzy_check_try_read */
        if (rep->v1.prob > 0.5) {
                if (pending->io->cmd.cmd == FUZZY_CHECK) {
                        fuzzy_insert_result(pending->session, rep, &pending->io->cmd,
                                                                pending->io, rep->v1.flag);
                }
+               else if (pending->io->cmd.cmd == FUZZY_STAT) {
+                       /*
+                        * We store fuzzy stat in the following way:
+                        * 1) We store fuzzy hashes as a hash of rspamd_fuzzy_stat_entry
+                        * 2) We store the resulting hash table inside pool variable `fuzzy_stat`
+                        */
+                       struct rspamd_fuzzy_stat_entry *pval;
+                       GHashTable *stats_hash;
+
+                       stats_hash = (GHashTable *) rspamd_mempool_get_variable(task->task_pool,
+                                                                                                                                       RSPAMD_MEMPOOL_FUZZY_STAT);
+
+                       if (stats_hash == NULL) {
+                               stats_hash = g_hash_table_new(rspamd_str_hash, rspamd_str_equal);
+                               rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_FUZZY_STAT,
+                                                                                       stats_hash,
+                                                                                       (rspamd_mempool_destruct_t) g_hash_table_destroy);
+                       }
+
+                       pval = g_hash_table_lookup(stats_hash, rule->name);
+
+                       if (pval == NULL) {
+                               pval = rspamd_mempool_alloc(task->task_pool, sizeof(*pval));
+                               pval->name = rspamd_mempool_strdup(task->task_pool, rule->name);
+                               /* Safe, as pval->name is owned by the pool */
+                               g_hash_table_insert(stats_hash, (char *) pval->name, pval);
+                       }
+
+                       pval->fuzzy_cnt = (((uint64_t) rep->v1.value) << 32) + rep->v1.flag;
+               }
+       }
+       else if (rep->v1.value == 403) {
+               /* In fact, it should be 429, but we preserve compatibility */
+               rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_RATELIMITED, 1.0,
+                                                                 rule->name);
+       }
+       else if (rep->v1.value == 503) {
+               rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_FORBIDDEN, 1.0,
+                                                                 rule->name);
+       }
+       else if (rep->v1.value == 415) {
+               rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_ENCRYPTION_REQUIRED, 1.0,
+                                                                 rule->name);
+       }
+       else if (rep->v1.value == 401) {
+               if (pending->io->cmd.cmd != FUZZY_CHECK) {
+                       msg_info_task("fuzzy check error for %d: skipped by server",
+                                                 rep->v1.flag);
+               }
+       }
+       else if (rep->v1.value != 0) {
+               msg_info_task("fuzzy check error for %d: unknown error (%d)",
+                                         rep->v1.flag, rep->v1.value);
+       }
+
+       /* Mark as replied */
+       if (!(pending->io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+               pending->io->flags |= FUZZY_CMD_FLAG_REPLIED;
        }
 
        msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
@@ -1048,6 +1160,9 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
 
        /* Remove from pending requests */
        g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
+
+       /* Check if session is completed */
+       fuzzy_check_session_is_completed(pending->session);
 }
 
 /**
@@ -2348,12 +2463,62 @@ int fuzzy_check_module_reconfig(struct rspamd_config *cfg)
        return fuzzy_check_module_config(cfg, false);
 }
 
+/**
+ * Cleanup pending TCP requests for a session
+ * Called when session finishes (task completes or times out)
+ */
+static void
+fuzzy_tcp_session_cleanup(struct fuzzy_client_session *session)
+{
+       GHashTableIter iter;
+       gpointer key, value;
+       struct fuzzy_tcp_pending_command *pending;
+       GPtrArray *to_remove;
+       unsigned int i;
+
+       if (!session || !session->rule || !session->rule->pending_requests) {
+               return;
+       }
+
+       /* Collect tags to remove (can't modify hash table during iteration) */
+       to_remove = g_ptr_array_new();
+
+       g_hash_table_iter_init(&iter, session->rule->pending_requests);
+       while (g_hash_table_iter_next(&iter, &key, &value)) {
+               pending = (struct fuzzy_tcp_pending_command *) value;
+
+               if (pending->session == session) {
+                       g_ptr_array_add(to_remove, key);
+               }
+       }
+
+       /* Remove pending commands */
+       for (i = 0; i < to_remove->len; i++) {
+               g_hash_table_remove(session->rule->pending_requests,
+                                                       g_ptr_array_index(to_remove, i));
+       }
+
+       if (to_remove->len > 0 && session->task) {
+               struct rspamd_task *task = session->task;
+               msg_debug_fuzzy_check("fuzzy_tcp: cleaned up %d pending commands for finished session",
+                                                         (int) to_remove->len);
+       }
+
+       g_ptr_array_free(to_remove, TRUE);
+}
+
 /* Finalize IO */
 static void
 fuzzy_io_fin(void *ud)
 {
        struct fuzzy_client_session *session = ud;
 
+       /* Remove any pending TCP requests for this session */
+       if (session->fd == -1) {
+               /* TCP session - cleanup pending requests */
+               fuzzy_tcp_session_cleanup(session);
+       }
+
        if (session->commands) {
                g_ptr_array_free(session->commands, TRUE);
        }
@@ -2362,8 +2527,12 @@ fuzzy_io_fin(void *ud)
                g_ptr_array_free(session->results, TRUE);
        }
 
-       rspamd_ev_watcher_stop(session->event_loop, &session->ev);
-       close(session->fd);
+       /* Only cleanup fd and ev_watcher for UDP sessions */
+       if (session->fd != -1) {
+               rspamd_ev_watcher_stop(session->event_loop, &session->ev);
+               close(session->fd);
+       }
+       /* TCP sessions use shared connection, no cleanup needed here */
 }
 
 static rspamd_words_t *
@@ -4537,6 +4706,8 @@ register_fuzzy_client_call(struct rspamd_task *task,
                        session->server = selected;
                        session->rule = rule;
                        session->results = g_ptr_array_sized_new(32);
+                       session->fd = -1; /* TCP uses shared connection, no dedicated fd */
+                       session->event_loop = task->event_loop;
 
                        /* Send commands via TCP */
                        if (fuzzy_tcp_send_command(tcp_conn, commands, session)) {