* Used to match replies with requests
*/
struct fuzzy_tcp_pending_command {
- struct fuzzy_cmd_io *io; /* Command I/O */
- struct rspamd_task *task; /* Task associated with command */
- struct fuzzy_client_session *session; /* Session for reply delivery */
- ev_tstamp send_time; /* When command was sent */
+ struct fuzzy_cmd_io *io; /* Command I/O */
+ struct rspamd_task *task; /* Task associated with command */
+ struct fuzzy_client_session *session; /* Session for reply delivery */
+ struct fuzzy_tcp_connection *connection; /* Connection this command was sent on */
+ ev_tstamp send_time; /* When command was sent */
};
/**
static void fuzzy_tcp_io_handler(int fd, short what, gpointer ud);
static void fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn);
static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn);
+static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
+ GPtrArray *commands,
+ struct fuzzy_client_session *session);
+static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn);
/* Forward declarations for helper functions */
static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule);
return conn;
}
+/**
+ * Cleanup pending requests for a failed connection
+ * Removes all pending commands associated with this connection
+ */
+static void
+fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
+{
+ GHashTableIter iter;
+ gpointer key, value;
+ struct fuzzy_tcp_pending_command *pending;
+ GPtrArray *to_remove;
+ unsigned int i;
+ struct rspamd_task *task = NULL;
+
+ if (!conn || !conn->rule || !conn->rule->pending_requests) {
+ return;
+ }
+
+ /* Collect tags to remove (can't modify hash table during iteration) */
+ to_remove = g_ptr_array_new();
+
+ g_hash_table_iter_init(&iter, conn->rule->pending_requests);
+ while (g_hash_table_iter_next(&iter, &key, &value)) {
+ pending = (struct fuzzy_tcp_pending_command *) value;
+
+ if (pending->connection == conn) {
+ g_ptr_array_add(to_remove, key);
+
+ /* Get task for logging */
+ if (!task && pending->task) {
+ task = pending->task;
+ }
+ }
+ }
+
+ /* Remove pending commands */
+ for (i = 0; i < to_remove->len; i++) {
+ g_hash_table_remove(conn->rule->pending_requests,
+ g_ptr_array_index(to_remove, i));
+ }
+
+ if (to_remove->len > 0 && task) {
+ msg_info_task("fuzzy_tcp: cleaned up %d pending commands for failed connection to %s",
+ (int) to_remove->len, rspamd_upstream_name(conn->server));
+ }
+
+ g_ptr_array_free(to_remove, TRUE);
+}
+
/**
* Main TCP I/O event handler
* Handles connection establishment, reads, writes, and timeouts
msg_warn("fuzzy_tcp: connection timeout for rule %s to %s",
conn->rule->name,
rspamd_upstream_name(conn->server));
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
conn->connecting = FALSE;
rspamd_upstream_fail(conn->server, TRUE, "timeout");
conn->rule->name,
rspamd_upstream_name(conn->server),
strerror(errno));
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
conn->connecting = FALSE;
rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
conn->rule->name,
rspamd_upstream_name(conn->server),
strerror(so_error));
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
conn->connecting = FALSE;
rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
conn->rule->name,
rspamd_upstream_name(conn->server),
strerror(errno));
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
return;
}
msg_info("fuzzy_tcp: connection closed by %s for rule %s",
rspamd_upstream_name(conn->server),
conn->rule->name);
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
return;
}
/* Might want to disable EV_WRITE here if needed */
}
+/**
+ * Send commands via TCP connection
+ * Adds TCP framing and queues for sending, registers in pending pool
+ */
+static gboolean
+fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
+ GPtrArray *commands,
+ struct fuzzy_client_session *session)
+{
+ struct fuzzy_cmd_io *io;
+ struct fuzzy_tcp_write_buf *buf;
+ struct fuzzy_tcp_pending_command *pending;
+ unsigned int i;
+ struct rspamd_task *task = session->task;
+
+ for (i = 0; i < commands->len; i++) {
+ io = g_ptr_array_index(commands, i);
+
+ /* Skip if already sent or replied */
+ if (io->flags & (FUZZY_CMD_FLAG_SENT | FUZZY_CMD_FLAG_REPLIED)) {
+ continue;
+ }
+
+ /* Prepare TCP framed buffer */
+ buf = g_malloc0(sizeof(*buf));
+ buf->data = g_malloc(io->io.iov_len);
+ memcpy(buf->data, io->io.iov_base, io->io.iov_len);
+
+ /* Set frame size in network byte order (big endian) */
+ buf->size_hdr = GUINT16_TO_BE((uint16_t) io->io.iov_len);
+ buf->total_len = sizeof(buf->size_hdr) + io->io.iov_len;
+ buf->bytes_written = 0;
+
+ /* Add to write queue */
+ g_queue_push_tail(conn->write_queue, buf);
+
+ /* Register in pending requests pool */
+ pending = g_malloc0(sizeof(*pending));
+ pending->io = io;
+ pending->task = task;
+ pending->session = session;
+ pending->connection = conn;
+ pending->send_time = rspamd_get_calendar_ticks();
+
+ g_hash_table_insert(conn->rule->pending_requests,
+ GINT_TO_POINTER(io->tag), pending);
+
+ /* Mark as sent */
+ io->flags |= FUZZY_CMD_FLAG_SENT;
+
+ msg_debug_fuzzy_check("fuzzy_tcp: queued command with tag %u to %s",
+ io->tag, rspamd_upstream_name(conn->server));
+ }
+
+ /* Ensure write events are enabled */
+ if (!g_queue_is_empty(conn->write_queue)) {
+ rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE);
+ }
+
+ return TRUE;
+}
+
/**
* Process a single TCP reply frame
* Decrypts reply, matches with pending command by tag, delivers result
return;
}
+ /* Get task for debug logging */
+ struct rspamd_task *task = pending->task;
+
/* Process the reply */
if (rep->v1.prob > 0.5) {
if (pending->io->cmd.cmd == FUZZY_CHECK) {
}
}
- msg_debug("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
- tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
+ msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
+ tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
/* Remove from pending requests */
g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
if (available_space == 0) {
msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
conn->rule->name);
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
return;
}
}
msg_err("fuzzy_tcp: read error for rule %s: %s",
conn->rule->name, strerror(errno));
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
return;
}
msg_info("fuzzy_tcp: connection closed by %s for rule %s",
rspamd_upstream_name(conn->server),
conn->rule->name);
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
return;
}
msg_err("fuzzy_tcp: invalid frame length %d from %s, closing",
(int) frame_len,
rspamd_upstream_name(conn->server));
+ fuzzy_tcp_connection_cleanup(conn);
conn->failed = TRUE;
return;
}
tcp_conn = fuzzy_tcp_get_or_create_connection(rule, selected, task, FALSE);
}
- /* For now, always use UDP (TCP write/read handlers not ready yet) */
- /* TODO: Use tcp_conn when TCP I/O handlers are implemented */
+ /* Use TCP if available and connected */
if (tcp_conn && tcp_conn->connected) {
- /* TCP connection available - will use it in future */
- msg_debug_task("TCP connection available for %s, but not using yet (handlers not ready)",
- rspamd_upstream_name(selected));
+ /* Create session for TCP */
+ session = rspamd_mempool_alloc0(task->task_pool,
+ sizeof(struct fuzzy_client_session));
+ session->state = 0;
+ session->commands = commands;
+ session->task = task;
+ session->server = selected;
+ session->rule = rule;
+ session->results = g_ptr_array_sized_new(32);
+
+ /* Send commands via TCP */
+ if (fuzzy_tcp_send_command(tcp_conn, commands, session)) {
+ msg_debug_fuzzy_check("fuzzy_tcp: sent %d commands to %s via TCP",
+ (int) commands->len, rspamd_upstream_name(selected));
+
+ rspamd_session_add_event(task->s, fuzzy_io_fin, session, M);
+ session->item = rspamd_symcache_get_cur_item(task);
+
+ if (session->item) {
+ rspamd_symcache_item_async_inc(task, session->item, M);
+ }
+
+ return; /* TCP send successful */
+ }
+ else {
+ msg_warn_task("fuzzy_tcp: failed to send commands, falling back to UDP");
+ /* Fall through to UDP */
+ }
}
- /* Use UDP for now */
+ /* Use UDP as fallback or when TCP not available */
if (selected) {
addr = rspamd_upstream_addr_cur(selected);
rspamd_inet_address_set_port(addr, rspamd_upstream_port(selected));