struct fuzzy_key *key;
struct rspamd_fuzzy_cmd_extension *extensions;
unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+ /* If this is a TCP session, this pointer will be set */
+ struct fuzzy_tcp_session *tcp_session;
};
struct fuzzy_peer_request {
}
}
- /*
- if (rspamd_inet_address_is_local (addr, TRUE)) {
+ /* Skip ratelimit for local addresses */
+ if (rspamd_inet_address_is_local(addr)) {
return TRUE;
}
- */
masked = rspamd_inet_address_copy(addr, NULL);
REF_RELEASE(session);
}
+static void
+rspamd_fuzzy_tcp_enqueue_reply(struct fuzzy_session *session)
+{
+ struct fuzzy_tcp_session *tcp_session = session->tcp_session;
+ struct fuzzy_tcp_reply_queue_elt *reply_elt;
+ gsize len;
+ gconstpointer data;
+
+ if (tcp_session == NULL) {
+ msg_err("internal error: tcp_session is NULL in rspamd_fuzzy_tcp_enqueue_reply");
+ return;
+ }
+
+ /* Determine reply data and length */
+ if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
+ session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
+ /* Encrypted reply */
+ data = &session->reply;
+
+ if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ len = sizeof(session->reply);
+ }
+ else {
+ len = sizeof(session->reply.hdr) + sizeof(session->reply.rep.v1);
+ }
+ }
+ else {
+ data = &session->reply.rep;
+
+ if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ len = sizeof(session->reply.rep);
+ }
+ else {
+ len = sizeof(session->reply.rep.v1);
+ }
+ }
+
+ /* Create reply queue element */
+ reply_elt = g_malloc0(sizeof(*reply_elt));
+ reply_elt->rep.size_hdr = htons((uint16_t) len);
+ memcpy(&reply_elt->rep.payload, data, len);
+ reply_elt->written = 0;
+
+ /* Add to queue */
+ DL_APPEND(tcp_session->replies_queue, reply_elt);
+
+ msg_debug_fuzzy_storage("enqueued TCP reply to %s, %z bytes",
+ rspamd_inet_address_to_string(session->addr),
+ len);
+
+ /* Enable write event if not already enabled */
+ if (ev_is_active(&tcp_session->common.io)) {
+ int events = tcp_session->common.io.events;
+ if (!(events & EV_WRITE)) {
+ ev_io_stop(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
+ ev_io_set(&tcp_session->common.io, tcp_session->common.fd, EV_READ | EV_WRITE);
+ ev_io_start(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
+ }
+ }
+ else {
+ /* Watcher is not active, start it with both read and write */
+ ev_io_set(&tcp_session->common.io, tcp_session->common.fd, EV_READ | EV_WRITE);
+ ev_io_start(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
+ }
+}
+
static void
rspamd_fuzzy_write_reply(struct fuzzy_session *session)
{
gsize len;
gconstpointer data;
+ /* Check if this is a TCP session */
+ if (session->tcp_session != NULL) {
+ rspamd_fuzzy_tcp_enqueue_reply(session);
+ return;
+ }
+
if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
/* Encrypted reply */
/* Check if we have complete frame */
if (session->bytes_unprocessed - processed_offset >= frame_len) {
- /* Process this frame using legacy session temporarily */
- struct fuzzy_session legacy_session;
-
- memset(&legacy_session, 0, sizeof(legacy_session));
- legacy_session.worker = session->common.worker;
- legacy_session.addr = session->common.addr;
- legacy_session.ctx = session->common.ctx;
- legacy_session.fd = session->common.fd;
- legacy_session.timestamp = session->common.timestamp;
- legacy_session.key = session->common.key;
- legacy_session.ip_stat = session->common.ip_stat;
- memcpy(legacy_session.nm, session->common.nm, sizeof(legacy_session.nm));
+ /* Create heap-allocated session for async processing */
+ struct fuzzy_session *cmd_session = g_malloc0(sizeof(*cmd_session));
+ REF_INIT_RETAIN(cmd_session, fuzzy_session_destroy);
+
+ /* Copy data from TCP session to command session */
+ cmd_session->worker = session->common.worker;
+ cmd_session->addr = rspamd_inet_address_copy(session->common.addr, NULL);
+ cmd_session->ctx = session->common.ctx;
+ cmd_session->fd = session->common.fd;
+ cmd_session->timestamp = session->common.timestamp;
+ cmd_session->key = session->common.key;
+ cmd_session->ip_stat = session->common.ip_stat;
+ memcpy(cmd_session->nm, session->common.nm, sizeof(cmd_session->nm));
+
+ /* Retain references to shared objects */
+ if (cmd_session->key) {
+ REF_RETAIN(cmd_session->key);
+ }
+ if (cmd_session->ip_stat) {
+ REF_RETAIN(cmd_session->ip_stat);
+ }
+ session->common.worker->nconns++;
+
+ /* Set TCP session pointer so replies go to TCP queue */
+ cmd_session->tcp_session = session;
+ REF_RETAIN(session); /* TCP session must live until command is processed */
if (rspamd_fuzzy_cmd_from_wire(session->input_buf + processed_offset,
- frame_len, &legacy_session)) {
- /* Copy parsed data back */
- session->common.epoch = legacy_session.epoch;
- session->common.cmd_type = legacy_session.cmd_type;
- memcpy(&session->common.cmd, &legacy_session.cmd, sizeof(session->common.cmd));
- session->common.key = legacy_session.key;
- session->common.extensions = legacy_session.extensions;
- memcpy(session->common.nm, legacy_session.nm, sizeof(session->common.nm));
-
- /* Process command - this will need to be adapted for TCP */
- rspamd_fuzzy_process_command(&legacy_session);
+ frame_len, cmd_session)) {
+ /* Copy parsed data back to TCP session for tracking */
+ session->common.epoch = cmd_session->epoch;
+ session->common.cmd_type = cmd_session->cmd_type;
+ memcpy(&session->common.cmd, &cmd_session->cmd, sizeof(session->common.cmd));
+
+ /* Note: key and extensions ownership transferred to cmd_session */
+ session->common.key = cmd_session->key;
+ session->common.extensions = cmd_session->extensions;
+ memcpy(session->common.nm, cmd_session->nm, sizeof(session->common.nm));
+
+ /* Process command - replies will go to TCP queue via tcp_session pointer */
+ rspamd_fuzzy_process_command(cmd_session);
}
else {
session->common.ctx->stat.invalid_requests++;
msg_debug_fuzzy_storage("invalid TCP fuzzy command of size %d received from %s",
(int) frame_len,
rspamd_inet_address_to_string(session->common.addr));
+ REF_RELEASE(session); /* Release TCP session reference */
}
+ /* Release our reference - session will be freed when all callbacks complete */
+ REF_RELEASE(cmd_session);
+
processed_offset += frame_len;
session->cur_frame_state = 0x0000; /* Reset for next frame */
}
struct fuzzy_rule *rule;
struct ev_loop *event_loop;
struct rspamd_io_ev ev;
+ struct rspamd_io_ev timer_ev; /* Separate timer for TCP requests */
int state;
int fd;
int retransmits;
ev_tstamp connect_start; /* When connection started */
ev_tstamp last_activity; /* Last I/O activity */
+ ev_tstamp last_failure; /* When connection last failed (for retry logic) */
ref_entry_t ref; /* Reference counting */
};
return NULL;
}
else if (conn->failed) {
- /* Previous connection failed - remove and recreate */
- g_ptr_array_remove(rule->tcp_connections, conn);
- conn = NULL;
+ /* Previous connection failed - check if enough time passed to retry */
+ ev_tstamp now = rspamd_get_calendar_ticks();
+ ev_tstamp time_since_failure = now - conn->last_failure;
+
+ if (time_since_failure < 10.0) {
+ /* Recent failure - don't retry TCP yet, fallback to UDP */
+ msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s, using UDP fallback",
+ time_since_failure,
+ rspamd_upstream_name(upstream));
+ return NULL;
+ }
+ else {
+ /* Old failure - remove and try reconnecting */
+ msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s, retrying TCP",
+ time_since_failure,
+ rspamd_upstream_name(upstream));
+ g_ptr_array_remove(rule->tcp_connections, conn);
+ conn = NULL;
+ }
}
}
/* Check session completion for all affected sessions */
GHashTableIter session_iter;
struct fuzzy_client_session *session;
+ int sessions_checked = 0;
g_hash_table_iter_init(&session_iter, sessions_to_check);
while (g_hash_table_iter_next(&session_iter, (gpointer *) &session, NULL)) {
+ sessions_checked++;
fuzzy_check_session_is_completed(session);
}
+ if (sessions_checked > 0 && task) {
+ msg_info_task("fuzzy_tcp: checked %d sessions for completion after connection cleanup",
+ sessions_checked);
+ }
+
g_ptr_array_free(to_remove, TRUE);
g_hash_table_unref(sessions_to_check);
}
conn->rule->name,
rspamd_upstream_name(conn->server));
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
conn->connecting = FALSE;
rspamd_upstream_fail(conn->server, TRUE, "timeout");
FUZZY_TCP_RELEASE(conn);
return;
}
- if (what == EV_WRITE) {
+ if (what & EV_WRITE) {
/* Check if we're still connecting */
if (conn->connecting && !conn->connected) {
/* Verify connection succeeded */
rspamd_upstream_name(conn->server),
strerror(errno));
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
conn->connecting = FALSE;
rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
FUZZY_TCP_RELEASE(conn);
rspamd_upstream_name(conn->server),
strerror(so_error));
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
conn->connecting = FALSE;
rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
FUZZY_TCP_RELEASE(conn);
conn->connected = TRUE;
conn->connecting = FALSE;
- msg_info("fuzzy_tcp: connection established to %s for rule %s",
+ msg_info("fuzzy_tcp: connection established to %s for rule %s (fd=%d, ev.io.fd=%d)",
rspamd_inet_address_to_string_pretty(conn->addr),
- conn->rule->name);
+ conn->rule->name, conn->fd, (int) conn->ev.io.fd);
rspamd_upstream_ok(conn->server);
/* Now wait for both read and write events */
rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev,
EV_READ | EV_WRITE);
+
+ msg_debug("fuzzy_tcp: after reschedule - fd=%d, ev.io.fd=%d",
+ conn->fd, (int) conn->ev.io.fd);
}
else if (conn->connected) {
/* Handle write */
}
}
- if (what == EV_READ && conn->connected) {
+ if (what & EV_READ && conn->connected) {
/* Handle read */
fuzzy_tcp_read_handler(conn);
}
rspamd_upstream_name(conn->server),
strerror(errno));
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
return;
}
else if (r == 0) {
rspamd_upstream_name(conn->server),
conn->rule->name);
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
return;
}
unsigned int i;
struct rspamd_task *task = session->task;
+ msg_debug_task("fuzzy_tcp_send_command: fd=%d, ev.io.fd=%d, connected=%d, failed=%d",
+ conn->fd, (int) conn->ev.io.fd, conn->connected, conn->failed);
+
+ /* Don't send if connection is failed or not connected */
+ if (!conn->connected || conn->failed) {
+ msg_warn_task("fuzzy_tcp: cannot send commands - connection not ready (connected=%d, failed=%d)",
+ conn->connected, conn->failed);
+ return FALSE;
+ }
+
for (i = 0; i < commands->len; i++) {
io = g_ptr_array_index(commands, i);
/* Ensure write events are enabled */
if (!g_queue_is_empty(conn->write_queue)) {
- rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE);
+ msg_debug_task("fuzzy_tcp: checking watcher before reschedule - fd=%d, ev.io.fd=%d",
+ conn->fd, (int) conn->ev.io.fd);
+
+ /* Verify fd is valid before rescheduling */
+ if (conn->fd >= 0 && conn->ev.io.fd == conn->fd) {
+ msg_debug_task("fuzzy_tcp: reschedule watcher for fd=%d", conn->fd);
+ rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev, EV_READ | EV_WRITE);
+ }
+ else if (conn->fd >= 0 && conn->ev.io.fd != conn->fd) {
+ /* Fd mismatch - reinitialize watcher */
+ msg_warn_task("fuzzy_tcp: fd mismatch in watcher (ev.fd=%d, conn.fd=%d), reinitializing",
+ (int) conn->ev.io.fd, conn->fd);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+ rspamd_ev_watcher_init(&conn->ev, conn->fd, EV_READ | EV_WRITE,
+ fuzzy_tcp_io_handler, conn);
+ rspamd_ev_watcher_start(conn->event_loop, &conn->ev, conn->rule->tcp_timeout);
+ }
+ else {
+ msg_warn_task("fuzzy_tcp: invalid fd in connection (fd=%d, ev.io.fd=%d), cannot reschedule",
+ conn->fd, (int) conn->ev.io.fd);
+ }
}
return TRUE;
msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %ud from %s (prob=%.2f)",
tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
+ /* Save session before removing pending (which may free it) */
+ struct fuzzy_client_session *session_to_check = pending->session;
+
/* Remove from pending requests */
g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
/* Check if session is completed */
- fuzzy_check_session_is_completed(pending->session);
+ fuzzy_check_session_is_completed(session_to_check);
}
/**
msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
conn->rule->name);
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
return;
}
msg_err("fuzzy_tcp: read error for rule %s: %s",
conn->rule->name, strerror(errno));
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
return;
}
else if (r == 0) {
rspamd_upstream_name(conn->server),
conn->rule->name);
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
return;
}
(int) frame_len,
rspamd_upstream_name(conn->server));
fuzzy_tcp_connection_cleanup(conn);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
return;
}
/* Remove any pending TCP requests for this session */
if (session->fd == -1) {
- /* TCP session - cleanup pending requests */
+ /* TCP session - cleanup pending requests and stop timer */
fuzzy_tcp_session_cleanup(session);
+ /* Stop pure timer (no IO) */
+ if (ev_is_active(&session->timer_ev.tm)) {
+ ev_timer_stop(session->event_loop, &session->timer_ev.tm);
+ }
}
if (session->commands) {
{
struct fuzzy_cmd_io *io;
unsigned int nreplied = 0, i;
+ struct rspamd_task *task = session->task;
rspamd_upstream_ok(session->server);
fuzzy_insert_metric_results(session->task, session->rule, session->results);
if (session->item) {
+ msg_debug_fuzzy_check("fuzzy_check: decrementing async counter for completed session");
rspamd_symcache_item_async_dec_check(session->task, session->item, M);
}
return TRUE;
}
+ else {
+ msg_debug_fuzzy_check("fuzzy_check: session not completed (%d/%d replied)",
+ nreplied, (int) session->commands->len);
+ }
return FALSE;
}
}
}
+/* libev wrapper for TCP timer - calls rspamd_io_ev style callback */
+static void
+fuzzy_tcp_timer_libev_cb(EV_P_ struct ev_timer *w, int revents)
+{
+ struct rspamd_io_ev *ev = (struct rspamd_io_ev *) w->data;
+ ev->cb(-1, EV_TIMER, ev->ud);
+}
+
+/* TCP timeout callback - no retransmits needed, connection is established */
+static void
+fuzzy_tcp_timer_callback(int fd, short what, void *arg)
+{
+ struct fuzzy_client_session *session = arg;
+ struct rspamd_task *task = session->task;
+ struct fuzzy_cmd_io *io;
+ unsigned int i, nreplied = 0;
+
+ /* Check if all commands have been replied */
+ for (i = 0; i < session->commands->len; i++) {
+ io = g_ptr_array_index(session->commands, i);
+ if (io->flags & FUZZY_CMD_FLAG_REPLIED) {
+ nreplied++;
+ }
+ }
+
+ if (nreplied == session->commands->len) {
+ /* All replied, just complete */
+ msg_debug_fuzzy_check("fuzzy_tcp: all commands replied, completing session");
+ fuzzy_check_session_is_completed(session);
+ return;
+ }
+
+ /* Timeout - just fail the request, don't retry via UDP */
+ msg_warn_task("fuzzy_tcp: timeout waiting for replies from %s (%d/%d replied), giving up",
+ rspamd_upstream_name(session->server),
+ nreplied, (int) session->commands->len);
+
+ /* Mark all unreplied commands as failed */
+ for (i = 0; i < session->commands->len; i++) {
+ io = g_ptr_array_index(session->commands, i);
+ if (!(io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+ io->flags |= FUZZY_CMD_FLAG_REPLIED;
+ }
+ }
+
+ rspamd_upstream_fail(session->server, TRUE, "timeout");
+
+ /* Mark TCP connection as failed so future requests use UDP for ~10 seconds */
+ for (i = 0; i < session->rule->tcp_connections->len; i++) {
+ struct fuzzy_tcp_connection *conn = g_ptr_array_index(session->rule->tcp_connections, i);
+ if (conn->server == session->server) {
+ conn->failed = TRUE;
+ conn->last_failure = rspamd_get_calendar_ticks();
+ conn->connected = FALSE;
+ msg_info_task("fuzzy_tcp: marked connection to %s as failed, switching to UDP for 10s",
+ rspamd_upstream_name(session->server));
+ break;
+ }
+ }
+
+ /* Clean up TCP session - stop timer and remove event */
+ /* Remove any pending TCP requests for this session */
+ fuzzy_tcp_session_cleanup(session);
+
+ /* Stop pure timer (no IO) */
+ if (ev_is_active(&session->timer_ev.tm)) {
+ ev_timer_stop(session->event_loop, &session->timer_ev.tm);
+ }
+
+ /* Decrement async counter for TCP session */
+ if (session->item) {
+ rspamd_symcache_item_async_dec_check(session->task, session->item, M);
+ }
+
+ /* Remove TCP session event */
+ rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+}
+
/* Fuzzy check callback */
static void
fuzzy_check_io_callback(int fd, short what, void *arg)
rspamd_symcache_item_async_inc(task, session->item, M);
}
+ /* Start timer for TCP request timeout */
+ /* Use pure timer (no IO), so use libev API directly */
+ session->timer_ev.cb = fuzzy_tcp_timer_callback;
+ session->timer_ev.ud = session;
+ session->timer_ev.timeout = rule->io_timeout;
+ session->timer_ev.tm.data = &session->timer_ev;
+ ev_timer_init(&session->timer_ev.tm, fuzzy_tcp_timer_libev_cb,
+ rule->io_timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timer_ev.tm);
+
return; /* TCP send successful */
}
else {