From: Vsevolod Stakhov Date: Tue, 7 Oct 2025 12:00:58 +0000 (+0100) Subject: [Feature] Fuzzy check: add reply processing and lifecycle management X-Git-Tag: 3.14.0~84^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9db45115a6d317d4369a437f07cf08eb3f099604;p=thirdparty%2Frspamd.git [Feature] Fuzzy check: add reply processing and lifecycle management Complete TCP reply handling: - Process all error codes (403, 503, 415, 401) like UDP - Handle FUZZY_STAT commands with proper storage - Mark commands as replied and check session completion Add memory safety and lifecycle management: - Cleanup pending requests when task finishes before reply - Timeout checking for pending requests (io_timeout) - Proper session cleanup for TCP (no fd/ev_watcher) - Initialize TCP session fields (fd=-1, event_loop) Prevents use-after-free when: - Task completes before TCP reply arrives - Reply takes too long (timeout) - Connection fails with pending requests --- diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 7b2d3b6d8d..0e733504e8 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -522,6 +522,7 @@ static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn, GPtrArray *commands, struct fuzzy_client_session *session); static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn); +static gboolean fuzzy_check_session_is_completed(struct fuzzy_client_session *session); /* Forward declarations for helper functions */ static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule); @@ -754,6 +755,54 @@ fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn) g_ptr_array_free(to_remove, TRUE); } +/** + * Check and cleanup timed out pending requests + * Called periodically to remove requests that waited too long for reply + */ +static void +fuzzy_tcp_check_pending_timeouts(struct fuzzy_rule *rule, ev_tstamp now) +{ + GHashTableIter iter; + gpointer key, value; + struct fuzzy_tcp_pending_command *pending; + GPtrArray *to_remove; + unsigned int i; + ev_tstamp timeout; + + if (!rule || !rule->pending_requests) { + return; + } + + timeout = rule->io_timeout; + to_remove = g_ptr_array_new(); + + g_hash_table_iter_init(&iter, rule->pending_requests); + while (g_hash_table_iter_next(&iter, &key, &value)) { + pending = (struct fuzzy_tcp_pending_command *) value; + + /* Check if request timed out */ + if ((now - pending->send_time) > timeout) { + g_ptr_array_add(to_remove, key); + + if (pending->task) { + struct rspamd_task *task = pending->task; + msg_info_task("fuzzy_tcp: request timeout after %.2fs for tag %u to %s", + now - pending->send_time, + pending->io->tag, + rspamd_upstream_name(pending->connection->server)); + } + } + } + + /* Remove timed out commands */ + for (i = 0; i < to_remove->len; i++) { + g_hash_table_remove(rule->pending_requests, + g_ptr_array_index(to_remove, i)); + } + + g_ptr_array_free(to_remove, TRUE); +} + /** * Main TCP I/O event handler * Handles connection establishment, reads, writes, and timeouts @@ -836,6 +885,11 @@ fuzzy_tcp_io_handler(int fd, short what, gpointer ud) fuzzy_tcp_read_handler(conn); } + /* Check for timed out pending requests */ + if (conn->connected) { + fuzzy_tcp_check_pending_timeouts(conn->rule, conn->last_activity); + } + FUZZY_TCP_RELEASE(conn); } @@ -1035,12 +1089,70 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn, /* Get task for debug logging */ struct rspamd_task *task = pending->task; - /* Process the reply */ + /* Process the reply - similar to UDP code in fuzzy_check_try_read */ if (rep->v1.prob > 0.5) { if (pending->io->cmd.cmd == FUZZY_CHECK) { fuzzy_insert_result(pending->session, rep, &pending->io->cmd, pending->io, rep->v1.flag); } + else if (pending->io->cmd.cmd == FUZZY_STAT) { + /* + * We store fuzzy stat in the following way: + * 1) We store fuzzy hashes as a hash of rspamd_fuzzy_stat_entry + * 2) We store the resulting hash table inside pool variable `fuzzy_stat` + */ + struct rspamd_fuzzy_stat_entry *pval; + GHashTable *stats_hash; + + stats_hash = (GHashTable *) rspamd_mempool_get_variable(task->task_pool, + RSPAMD_MEMPOOL_FUZZY_STAT); + + if (stats_hash == NULL) { + stats_hash = g_hash_table_new(rspamd_str_hash, rspamd_str_equal); + rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_FUZZY_STAT, + stats_hash, + (rspamd_mempool_destruct_t) g_hash_table_destroy); + } + + pval = g_hash_table_lookup(stats_hash, rule->name); + + if (pval == NULL) { + pval = rspamd_mempool_alloc(task->task_pool, sizeof(*pval)); + pval->name = rspamd_mempool_strdup(task->task_pool, rule->name); + /* Safe, as pval->name is owned by the pool */ + g_hash_table_insert(stats_hash, (char *) pval->name, pval); + } + + pval->fuzzy_cnt = (((uint64_t) rep->v1.value) << 32) + rep->v1.flag; + } + } + else if (rep->v1.value == 403) { + /* In fact, it should be 429, but we preserve compatibility */ + rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_RATELIMITED, 1.0, + rule->name); + } + else if (rep->v1.value == 503) { + rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_FORBIDDEN, 1.0, + rule->name); + } + else if (rep->v1.value == 415) { + rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_ENCRYPTION_REQUIRED, 1.0, + rule->name); + } + else if (rep->v1.value == 401) { + if (pending->io->cmd.cmd != FUZZY_CHECK) { + msg_info_task("fuzzy check error for %d: skipped by server", + rep->v1.flag); + } + } + else if (rep->v1.value != 0) { + msg_info_task("fuzzy check error for %d: unknown error (%d)", + rep->v1.flag, rep->v1.value); + } + + /* Mark as replied */ + if (!(pending->io->flags & FUZZY_CMD_FLAG_REPLIED)) { + pending->io->flags |= FUZZY_CMD_FLAG_REPLIED; } msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)", @@ -1048,6 +1160,9 @@ fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn, /* Remove from pending requests */ g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag)); + + /* Check if session is completed */ + fuzzy_check_session_is_completed(pending->session); } /** @@ -2348,12 +2463,62 @@ int fuzzy_check_module_reconfig(struct rspamd_config *cfg) return fuzzy_check_module_config(cfg, false); } +/** + * Cleanup pending TCP requests for a session + * Called when session finishes (task completes or times out) + */ +static void +fuzzy_tcp_session_cleanup(struct fuzzy_client_session *session) +{ + GHashTableIter iter; + gpointer key, value; + struct fuzzy_tcp_pending_command *pending; + GPtrArray *to_remove; + unsigned int i; + + if (!session || !session->rule || !session->rule->pending_requests) { + return; + } + + /* Collect tags to remove (can't modify hash table during iteration) */ + to_remove = g_ptr_array_new(); + + g_hash_table_iter_init(&iter, session->rule->pending_requests); + while (g_hash_table_iter_next(&iter, &key, &value)) { + pending = (struct fuzzy_tcp_pending_command *) value; + + if (pending->session == session) { + g_ptr_array_add(to_remove, key); + } + } + + /* Remove pending commands */ + for (i = 0; i < to_remove->len; i++) { + g_hash_table_remove(session->rule->pending_requests, + g_ptr_array_index(to_remove, i)); + } + + if (to_remove->len > 0 && session->task) { + struct rspamd_task *task = session->task; + msg_debug_fuzzy_check("fuzzy_tcp: cleaned up %d pending commands for finished session", + (int) to_remove->len); + } + + g_ptr_array_free(to_remove, TRUE); +} + /* Finalize IO */ static void fuzzy_io_fin(void *ud) { struct fuzzy_client_session *session = ud; + /* Remove any pending TCP requests for this session */ + if (session->fd == -1) { + /* TCP session - cleanup pending requests */ + fuzzy_tcp_session_cleanup(session); + } + if (session->commands) { g_ptr_array_free(session->commands, TRUE); } @@ -2362,8 +2527,12 @@ fuzzy_io_fin(void *ud) g_ptr_array_free(session->results, TRUE); } - rspamd_ev_watcher_stop(session->event_loop, &session->ev); - close(session->fd); + /* Only cleanup fd and ev_watcher for UDP sessions */ + if (session->fd != -1) { + rspamd_ev_watcher_stop(session->event_loop, &session->ev); + close(session->fd); + } + /* TCP sessions use shared connection, no cleanup needed here */ } static rspamd_words_t * @@ -4537,6 +4706,8 @@ register_fuzzy_client_call(struct rspamd_task *task, session->server = selected; session->rule = rule; session->results = g_ptr_array_sized_new(32); + session->fd = -1; /* TCP uses shared connection, no dedicated fd */ + session->event_loop = task->event_loop; /* Send commands via TCP */ if (fuzzy_tcp_send_command(tcp_conn, commands, session)) {