int lua_id;
/* TCP configuration */
- gboolean tcp_enabled; /* Explicitly enable TCP */
- gboolean tcp_auto; /* Auto-switch to TCP based on request rate */
- double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */
- double tcp_window; /* Time window for rate calculation in seconds (default: 1.0) */
- double tcp_timeout; /* TCP connection timeout (default: 5.0) */
+ gboolean tcp_enabled; /* Explicitly enable TCP */
+ gboolean tcp_auto; /* Auto-switch to TCP based on request rate */
+ double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */
+ double tcp_window; /* Time window for rate calculation in seconds (default: 1.0) */
+ double tcp_timeout; /* TCP connection timeout (default: 5.0) */
+ double tcp_retry_delay; /* Delay before retrying failed TCP connection (default: 10.0) */
/* Rate tracking for TCP auto-switch */
struct {
static gboolean fuzzy_tcp_send_command(struct fuzzy_tcp_connection *conn,
GPtrArray *commands,
struct fuzzy_client_session *session);
-static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn);
+static void fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn, const char *reason);
static gboolean fuzzy_check_session_is_completed(struct fuzzy_client_session *session);
/* Forward declarations for helper functions */
#define FUZZY_TCP_RELEASE(x) REF_RELEASE(x)
/**
- * Free TCP connection resources
+ * Free TCP connection resources (called by reference counting)
*/
static void
fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn)
g_queue_free(conn->write_queue);
}
+ msg_debug("fuzzy_tcp: freeing connection to %s for rule %s",
+ rspamd_upstream_name(conn->server),
+ conn->rule->name);
+
g_free(conn);
}
+/**
+ * Wrapper for g_ptr_array free function - releases reference
+ */
+static void
+fuzzy_tcp_connection_unref(gpointer conn_ptr)
+{
+ struct fuzzy_tcp_connection *conn = (struct fuzzy_tcp_connection *) conn_ptr;
+ FUZZY_TCP_RELEASE(conn);
+}
+
/**
* Create new TCP connection structure
*/
fuzzy_tcp_io_handler, conn);
rspamd_ev_watcher_start(conn->event_loop, &conn->ev, rule->tcp_timeout);
- FUZZY_TCP_RETAIN(conn);
-
- /* Store in connection pool array */
+ /* Store in connection pool array - array takes ownership of initial reference */
g_ptr_array_add(rule->tcp_connections, conn);
msg_info_task("fuzzy_tcp: initiating connection to %s for rule %s",
/* Connection exists - check state */
if (conn->connected) {
/* Ready to use */
+ msg_debug_task("fuzzy_tcp: reusing established connection to %s for rule %s",
+ rspamd_upstream_name(upstream), rule->name);
return conn;
}
else if (conn->connecting) {
/* Connection in progress - cannot use yet */
+ msg_debug_task("fuzzy_tcp: connection to %s is still connecting, using UDP fallback",
+ rspamd_upstream_name(upstream));
return NULL;
}
else if (conn->failed) {
ev_tstamp now = rspamd_get_calendar_ticks();
ev_tstamp time_since_failure = now - conn->last_failure;
- if (time_since_failure < 10.0) {
+ if (time_since_failure < rule->tcp_retry_delay) {
/* Recent failure - don't retry TCP yet, fallback to UDP */
- msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s, using UDP fallback",
+ msg_debug_task("fuzzy_tcp: connection failed %.1fs ago for %s (retry_delay=%.1fs), using UDP fallback",
time_since_failure,
- rspamd_upstream_name(upstream));
+ rspamd_upstream_name(upstream),
+ rule->tcp_retry_delay);
return NULL;
}
else {
/* Old failure - remove and try reconnecting */
- msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s, retrying TCP",
+ msg_info_task("fuzzy_tcp: connection failed %.1fs ago for %s (retry_delay=%.1fs), retrying TCP",
time_since_failure,
- rspamd_upstream_name(upstream));
+ rspamd_upstream_name(upstream),
+ rule->tcp_retry_delay);
g_ptr_array_remove(rule->tcp_connections, conn);
+ FUZZY_TCP_RELEASE(conn); /* Release reference held by array */
conn = NULL;
}
}
/**
* Cleanup pending requests for a failed connection
* Removes all pending commands associated with this connection
+ * This is called only for connection-level failures (socket errors, not session timeouts)
*/
static void
-fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn)
+fuzzy_tcp_connection_cleanup(struct fuzzy_tcp_connection *conn, const char *reason)
{
GHashTableIter iter;
gpointer key, value;
return;
}
+ msg_info("fuzzy_tcp: cleaning up connection to %s for rule %s, reason: %s",
+ rspamd_upstream_name(conn->server),
+ conn->rule->name,
+ reason ? reason : "unknown");
+
/* Collect commands to remove and sessions to check */
to_remove = g_ptr_array_new();
sessions_to_check = g_hash_table_new(g_direct_hash, g_direct_equal);
}
if (to_remove->len > 0 && task) {
- msg_info_task("fuzzy_tcp: cleaned up %d pending commands for failed connection to %s",
- (int) to_remove->len, rspamd_upstream_name(conn->server));
+ msg_warn_task("fuzzy_tcp: cleaned up %d pending commands due to connection failure: %s",
+ (int) to_remove->len, reason ? reason : "unknown");
}
/* Check session completion for all affected sessions */
conn->last_activity = rspamd_get_calendar_ticks();
if (what == EV_TIMEOUT) {
- msg_warn("fuzzy_tcp: connection timeout for rule %s to %s",
+ ev_tstamp elapsed = rspamd_get_calendar_ticks() - conn->connect_start;
+ msg_warn("fuzzy_tcp: connection timeout for rule %s to %s after %.2fs (connecting=%d, connected=%d)",
conn->rule->name,
- rspamd_upstream_name(conn->server));
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+ rspamd_upstream_name(conn->server),
+ elapsed,
+ conn->connecting,
+ conn->connected);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
conn->connecting = FALSE;
- rspamd_upstream_fail(conn->server, TRUE, "timeout");
+ fuzzy_tcp_connection_cleanup(conn, "connection timeout");
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+ rspamd_upstream_fail(conn->server, TRUE, "connection timeout");
FUZZY_TCP_RELEASE(conn);
return;
}
conn->rule->name,
rspamd_upstream_name(conn->server),
strerror(errno));
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
conn->connecting = FALSE;
+ fuzzy_tcp_connection_cleanup(conn, "getsockopt failed");
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
FUZZY_TCP_RELEASE(conn);
return;
}
if (so_error != 0) {
+ char error_buf[128];
+ rspamd_snprintf(error_buf, sizeof(error_buf), "connect error: %s", strerror(so_error));
msg_warn("fuzzy_tcp: connection failed for rule %s to %s: %s",
conn->rule->name,
rspamd_upstream_name(conn->server),
strerror(so_error));
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
conn->connecting = FALSE;
+ fuzzy_tcp_connection_cleanup(conn, error_buf);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
FUZZY_TCP_RELEASE(conn);
return;
/* Connection established! */
conn->connected = TRUE;
conn->connecting = FALSE;
+ ev_tstamp elapsed = rspamd_get_calendar_ticks() - conn->connect_start;
- msg_info("fuzzy_tcp: connection established to %s for rule %s (fd=%d, ev.io.fd=%d)",
+ msg_info("fuzzy_tcp: connection established to %s for rule %s in %.3fs (fd=%d, encrypted=%d)",
rspamd_inet_address_to_string_pretty(conn->addr),
- conn->rule->name, conn->fd, (int) conn->ev.io.fd);
+ conn->rule->name,
+ elapsed,
+ conn->fd,
+ conn->encrypted);
rspamd_upstream_ok(conn->server);
/* Cannot write more, wait for next event */
return;
}
+ char error_buf[128];
+ rspamd_snprintf(error_buf, sizeof(error_buf), "write error: %s", strerror(errno));
msg_err("fuzzy_tcp: write error for rule %s to %s: %s",
conn->rule->name,
rspamd_upstream_name(conn->server),
strerror(errno));
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
+ fuzzy_tcp_connection_cleanup(conn, error_buf);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
return;
}
else if (r == 0) {
msg_info("fuzzy_tcp: connection closed by %s for rule %s",
rspamd_upstream_name(conn->server),
conn->rule->name);
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
+ fuzzy_tcp_connection_cleanup(conn, "connection closed by peer");
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
return;
}
if (available_space == 0) {
msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
conn->rule->name);
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
+ fuzzy_tcp_connection_cleanup(conn, "read buffer overflow");
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
return;
}
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
return; /* Try again later */
}
+ char error_buf[128];
+ rspamd_snprintf(error_buf, sizeof(error_buf), "read error: %s", strerror(errno));
msg_err("fuzzy_tcp: read error for rule %s: %s",
conn->rule->name, strerror(errno));
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
+ fuzzy_tcp_connection_cleanup(conn, error_buf);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
return;
}
else if (r == 0) {
msg_info("fuzzy_tcp: connection closed by %s for rule %s",
rspamd_upstream_name(conn->server),
conn->rule->name);
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
+ fuzzy_tcp_connection_cleanup(conn, "connection closed by peer");
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
return;
}
frame_len = conn->cur_frame_state & 0x3FFF;
if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_reply)) {
+ char error_buf[128];
+ rspamd_snprintf(error_buf, sizeof(error_buf), "invalid frame length: %d", (int) frame_len);
msg_err("fuzzy_tcp: invalid frame length %d from %s, closing",
(int) frame_len,
rspamd_upstream_name(conn->server));
- fuzzy_tcp_connection_cleanup(conn);
- rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
conn->failed = TRUE;
conn->last_failure = rspamd_get_calendar_ticks();
+ fuzzy_tcp_connection_cleanup(conn, error_buf);
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
return;
}
else {
rule->tcp_timeout = 5.0; /* Default: 5 seconds */
}
+
+ if ((tcp_obj = ucl_object_lookup(value, "retry_delay")) != NULL) {
+ rule->tcp_retry_delay = ucl_object_todouble(tcp_obj);
+ }
+ else {
+ rule->tcp_retry_delay = 10.0; /* Default: 10 seconds */
+ }
}
}
else {
rule->tcp_threshold = 1.0;
rule->tcp_window = 1.0;
rule->tcp_timeout = 5.0;
+ rule->tcp_retry_delay = 10.0;
}
rule->rate_tracker.requests_count = 0;
rule->rate_tracker.window_start = 0;
- /* Initialize TCP connection pool - array of connections */
- rule->tcp_connections = g_ptr_array_new();
+ /* Initialize TCP connection pool - array of connections with proper free function */
+ rule->tcp_connections = g_ptr_array_new_with_free_func(fuzzy_tcp_connection_unref);
/* Initialize global pending requests pool - keyed by tag */
rule->pending_requests = g_hash_table_new_full(g_direct_hash, g_direct_equal,
if (session->fd == -1) {
/* TCP session - cleanup pending requests and stop timer */
fuzzy_tcp_session_cleanup(session);
- /* Stop pure timer (no IO) */
- if (ev_is_active(&session->timer_ev.tm)) {
- ev_timer_stop(session->event_loop, &session->timer_ev.tm);
- }
+ /* Stop pure timer (no IO) - safe to call even if not active */
+ ev_timer_stop(session->event_loop, &session->timer_ev.tm);
}
if (session->commands) {
ev->cb(-1, EV_TIMER, ev->ud);
}
-/* TCP timeout callback - no retransmits needed, connection is established */
+/* TCP timeout callback - session-level timeout, does NOT mark connection as failed */
static void
fuzzy_tcp_timer_callback(int fd, short what, void *arg)
{
struct rspamd_task *task = session->task;
struct fuzzy_cmd_io *io;
unsigned int i, nreplied = 0;
+ ev_tstamp now = rspamd_get_calendar_ticks();
+
+ /* Check pending timeouts for all requests in this rule (periodic check) */
+ fuzzy_tcp_check_pending_timeouts(session->rule, now);
/* Check if all commands have been replied */
for (i = 0; i < session->commands->len; i++) {
return;
}
- /* Timeout - just fail the request, don't retry via UDP */
- msg_warn_task("fuzzy_tcp: timeout waiting for replies from %s (%d/%d replied), giving up",
+ /* Session timeout - mark unreplied commands as failed for this session only */
+ msg_warn_task("fuzzy_tcp: session timeout waiting for replies from %s (%d/%d replied)",
rspamd_upstream_name(session->server),
nreplied, (int) session->commands->len);
}
}
- rspamd_upstream_fail(session->server, TRUE, "timeout");
+ /* Report upstream issue but don't mark as completely failed */
+ rspamd_upstream_fail(session->server, FALSE, "session timeout");
- /* Mark TCP connection as failed so future requests use UDP for ~10 seconds */
- for (i = 0; i < session->rule->tcp_connections->len; i++) {
- struct fuzzy_tcp_connection *conn = g_ptr_array_index(session->rule->tcp_connections, i);
- if (conn->server == session->server) {
- conn->failed = TRUE;
- conn->last_failure = rspamd_get_calendar_ticks();
- conn->connected = FALSE;
- msg_info_task("fuzzy_tcp: marked connection to %s as failed, switching to UDP for 10s",
- rspamd_upstream_name(session->server));
- break;
- }
- }
+ /* NOTE: We do NOT mark connection->failed = TRUE here!
+ * Session timeout is not a connection failure - other sessions may still succeed.
+ * Connection failures (socket errors) are handled in IO handlers.
+ */
- /* Clean up TCP session - stop timer and remove event */
- /* Remove any pending TCP requests for this session */
+ /* Clean up pending requests for THIS session only */
fuzzy_tcp_session_cleanup(session);
- /* Stop pure timer (no IO) */
- if (ev_is_active(&session->timer_ev.tm)) {
- ev_timer_stop(session->event_loop, &session->timer_ev.tm);
- }
+ /* Stop pure timer (no IO) - safe to call even if not active */
+ ev_timer_stop(session->event_loop, &session->timer_ev.tm);
/* Decrement async counter for TCP session */
if (session->item) {