GPtrArray *commands,
struct fuzzy_client_session *session);
static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn);
+static gboolean fuzzy_check_session_is_completed(struct fuzzy_client_session *session);
/* Forward declarations for helper functions */
static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule);
g_ptr_array_free(to_remove, TRUE);
}
+/**
+ * Check and cleanup timed out pending requests
+ * Called periodically to remove requests that waited too long for reply
+ */
+static void
+fuzzy_tcp_check_pending_timeouts(struct fuzzy_rule *rule, ev_tstamp now)
+{
+ GHashTableIter iter;
+ gpointer key, value;
+ struct fuzzy_tcp_pending_command *pending;
+ GPtrArray *to_remove;
+ unsigned int i;
+ ev_tstamp timeout;
+
+ if (!rule || !rule->pending_requests) {
+ return;
+ }
+
+ timeout = rule->io_timeout;
+ to_remove = g_ptr_array_new();
+
+ g_hash_table_iter_init(&iter, rule->pending_requests);
+ while (g_hash_table_iter_next(&iter, &key, &value)) {
+ pending = (struct fuzzy_tcp_pending_command *) value;
+
+ /* Check if request timed out */
+ if ((now - pending->send_time) > timeout) {
+ g_ptr_array_add(to_remove, key);
+
+ if (pending->task) {
+ struct rspamd_task *task = pending->task;
+ msg_info_task("fuzzy_tcp: request timeout after %.2fs for tag %u to %s",
+ now - pending->send_time,
+ pending->io->tag,
+ rspamd_upstream_name(pending->connection->server));
+ }
+ }
+ }
+
+ /* Remove timed out commands */
+ for (i = 0; i < to_remove->len; i++) {
+ g_hash_table_remove(rule->pending_requests,
+ g_ptr_array_index(to_remove, i));
+ }
+
+ g_ptr_array_free(to_remove, TRUE);
+}
+
/**
* Main TCP I/O event handler
* Handles connection establishment, reads, writes, and timeouts
fuzzy_tcp_read_handler(conn);
}
+ /* Check for timed out pending requests */
+ if (conn->connected) {
+ fuzzy_tcp_check_pending_timeouts(conn->rule, conn->last_activity);
+ }
+
FUZZY_TCP_RELEASE(conn);
}
/* Get task for debug logging */
struct rspamd_task *task = pending->task;
- /* Process the reply */
+ /* Process the reply - similar to UDP code in fuzzy_check_try_read */
if (rep->v1.prob > 0.5) {
if (pending->io->cmd.cmd == FUZZY_CHECK) {
fuzzy_insert_result(pending->session, rep, &pending->io->cmd,
pending->io, rep->v1.flag);
}
+ else if (pending->io->cmd.cmd == FUZZY_STAT) {
+ /*
+ * We store fuzzy stat in the following way:
+ * 1) We store fuzzy hashes as a hash of rspamd_fuzzy_stat_entry
+ * 2) We store the resulting hash table inside pool variable `fuzzy_stat`
+ */
+ struct rspamd_fuzzy_stat_entry *pval;
+ GHashTable *stats_hash;
+
+ stats_hash = (GHashTable *) rspamd_mempool_get_variable(task->task_pool,
+ RSPAMD_MEMPOOL_FUZZY_STAT);
+
+ if (stats_hash == NULL) {
+ stats_hash = g_hash_table_new(rspamd_str_hash, rspamd_str_equal);
+ rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_FUZZY_STAT,
+ stats_hash,
+ (rspamd_mempool_destruct_t) g_hash_table_destroy);
+ }
+
+ pval = g_hash_table_lookup(stats_hash, rule->name);
+
+ if (pval == NULL) {
+ pval = rspamd_mempool_alloc(task->task_pool, sizeof(*pval));
+ pval->name = rspamd_mempool_strdup(task->task_pool, rule->name);
+ /* Safe, as pval->name is owned by the pool */
+ g_hash_table_insert(stats_hash, (char *) pval->name, pval);
+ }
+
+ pval->fuzzy_cnt = (((uint64_t) rep->v1.value) << 32) + rep->v1.flag;
+ }
+ }
+ else if (rep->v1.value == 403) {
+ /* In fact, it should be 429, but we preserve compatibility */
+ rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_RATELIMITED, 1.0,
+ rule->name);
+ }
+ else if (rep->v1.value == 503) {
+ rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_FORBIDDEN, 1.0,
+ rule->name);
+ }
+ else if (rep->v1.value == 415) {
+ rspamd_task_insert_result(task, RSPAMD_FUZZY_SYMBOL_ENCRYPTION_REQUIRED, 1.0,
+ rule->name);
+ }
+ else if (rep->v1.value == 401) {
+ if (pending->io->cmd.cmd != FUZZY_CHECK) {
+ msg_info_task("fuzzy check error for %d: skipped by server",
+ rep->v1.flag);
+ }
+ }
+ else if (rep->v1.value != 0) {
+ msg_info_task("fuzzy check error for %d: unknown error (%d)",
+ rep->v1.flag, rep->v1.value);
+ }
+
+ /* Mark as replied */
+ if (!(pending->io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+ pending->io->flags |= FUZZY_CMD_FLAG_REPLIED;
}
msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
/* Remove from pending requests */
g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
+
+ /* Check if session is completed */
+ fuzzy_check_session_is_completed(pending->session);
}
/**
return fuzzy_check_module_config(cfg, false);
}
+/**
+ * Cleanup pending TCP requests for a session
+ * Called when session finishes (task completes or times out)
+ */
+static void
+fuzzy_tcp_session_cleanup(struct fuzzy_client_session *session)
+{
+ GHashTableIter iter;
+ gpointer key, value;
+ struct fuzzy_tcp_pending_command *pending;
+ GPtrArray *to_remove;
+ unsigned int i;
+
+ if (!session || !session->rule || !session->rule->pending_requests) {
+ return;
+ }
+
+ /* Collect tags to remove (can't modify hash table during iteration) */
+ to_remove = g_ptr_array_new();
+
+ g_hash_table_iter_init(&iter, session->rule->pending_requests);
+ while (g_hash_table_iter_next(&iter, &key, &value)) {
+ pending = (struct fuzzy_tcp_pending_command *) value;
+
+ if (pending->session == session) {
+ g_ptr_array_add(to_remove, key);
+ }
+ }
+
+ /* Remove pending commands */
+ for (i = 0; i < to_remove->len; i++) {
+ g_hash_table_remove(session->rule->pending_requests,
+ g_ptr_array_index(to_remove, i));
+ }
+
+ if (to_remove->len > 0 && session->task) {
+ struct rspamd_task *task = session->task;
+ msg_debug_fuzzy_check("fuzzy_tcp: cleaned up %d pending commands for finished session",
+ (int) to_remove->len);
+ }
+
+ g_ptr_array_free(to_remove, TRUE);
+}
+
/* Finalize IO */
static void
fuzzy_io_fin(void *ud)
{
struct fuzzy_client_session *session = ud;
+ /* Remove any pending TCP requests for this session */
+ if (session->fd == -1) {
+ /* TCP session - cleanup pending requests */
+ fuzzy_tcp_session_cleanup(session);
+ }
+
if (session->commands) {
g_ptr_array_free(session->commands, TRUE);
}
g_ptr_array_free(session->results, TRUE);
}
- rspamd_ev_watcher_stop(session->event_loop, &session->ev);
- close(session->fd);
+ /* Only cleanup fd and ev_watcher for UDP sessions */
+ if (session->fd != -1) {
+ rspamd_ev_watcher_stop(session->event_loop, &session->ev);
+ close(session->fd);
+ }
+ /* TCP sessions use shared connection, no cleanup needed here */
}
static rspamd_words_t *
session->server = selected;
session->rule = rule;
session->results = g_ptr_array_sized_new(32);
+ session->fd = -1; /* TCP uses shared connection, no dedicated fd */
+ session->event_loop = task->event_loop;
/* Send commands via TCP */
if (fuzzy_tcp_send_command(tcp_conn, commands, session)) {