ev_tstamp window_start; /* Start of the current window */
} rate_tracker;
- /* TCP connection state */
- struct {
- gboolean connected; /* TCP connection is active and ready */
- gboolean connecting; /* TCP connection in progress */
- void *connection; /* Placeholder for TCP connection handle (future) */
- } tcp_state;
+ /* TCP connection pool - array of connections, one per upstream */
+ GPtrArray *tcp_connections; /* Array of fuzzy_tcp_connection* */
+ GHashTable *pending_requests; /* Global: tag -> fuzzy_tcp_pending_command */
};
struct fuzzy_ctx {
int retransmits;
};
+/**
+ * Pending command awaiting TCP reply
+ * Used to match replies with requests
+ */
+struct fuzzy_tcp_pending_command {
+ struct fuzzy_cmd_io *io; /* Command I/O */
+ struct rspamd_task *task; /* Task associated with command */
+ struct fuzzy_client_session *session; /* Session for reply delivery */
+ ev_tstamp send_time; /* When command was sent */
+};
+
+/**
+ * TCP write queue element - contains framed command ready to send
+ */
+struct fuzzy_tcp_write_buf {
+ uint16_t size_hdr; /* Frame size in network byte order */
+ unsigned char *data; /* Command data (encrypted) */
+ gsize total_len; /* Total length: sizeof(size_hdr) + data_len */
+ gsize bytes_written; /* How many bytes already written */
+};
+
+/**
+ * TCP connection state for a fuzzy rule
+ * One TCP connection per fuzzy_rule, shared across all tasks
+ */
+struct fuzzy_tcp_connection {
+ struct fuzzy_rule *rule; /* Parent rule */
+ struct upstream *server; /* Connected upstream */
+ rspamd_inet_addr_t *addr; /* Server address */
+
+ int fd; /* Socket file descriptor */
+ struct ev_loop *event_loop; /* Event loop */
+ struct rspamd_io_ev ev; /* Event watcher */
+
+ /* Write state */
+ GQueue *write_queue; /* Queue of iovec to send */
+ gsize bytes_sent; /* Bytes sent from current iovec */
+
+ /* Read state - TCP framing */
+ uint16_t cur_frame_state; /* 0x0000/0x8000/0xC000 like server */
+ gsize bytes_unprocessed; /* Bytes in read_buf not yet processed */
+ unsigned char read_buf[8192]; /* Read buffer for incoming data */
+
+ /* Encryption keys for this connection */
+ struct rspamd_cryptobox_keypair *local_key; /* Local keypair (can be NULL) */
+ struct rspamd_cryptobox_pubkey *peer_key; /* Server public key (can be NULL) */
+ gboolean encrypted; /* TRUE if connection uses encryption */
+
+ /* Connection state */
+ gboolean connected; /* TCP handshake complete */
+ gboolean connecting; /* Connection in progress */
+ gboolean failed; /* Connection failed */
+
+ ev_tstamp connect_start; /* When connection started */
+ ev_tstamp last_activity; /* Last I/O activity */
+
+ ref_entry_t ref; /* Reference counting */
+};
+
#define FUZZY_CMD_FLAG_REPLIED (1 << 0)
#define FUZZY_CMD_FLAG_SENT (1 << 1)
#define FUZZY_CMD_FLAG_IMAGE (1 << 2)
if (rule->write_peer_key) {
rspamd_pubkey_unref(rule->write_peer_key);
}
+
+ /* Clean up TCP connections */
+ if (rule->tcp_connections) {
+ g_ptr_array_free(rule->tcp_connections, TRUE);
+ }
+
+ /* Clean up pending requests pool */
+ if (rule->pending_requests) {
+ g_hash_table_destroy(rule->pending_requests);
+ }
}
/**
}
/**
- * Determine whether to use TCP for this request
- * Returns TRUE if TCP should be used, FALSE for UDP
+ * Check if TCP should be attempted based on configuration and rate
*/
static gboolean
-fuzzy_should_use_tcp(struct fuzzy_rule *rule)
+fuzzy_should_try_tcp(struct fuzzy_rule *rule, ev_tstamp now)
{
- /* If TCP is explicitly enabled, always prefer TCP when connected */
- if (rule->tcp_enabled && rule->tcp_state.connected) {
+ /* TCP explicitly enabled - always try */
+ if (rule->tcp_enabled) {
return TRUE;
}
- /* If auto-switch is enabled and TCP is connected */
- if (rule->tcp_auto && rule->tcp_state.connected) {
- /* Calculate current rate */
- if (rule->rate_tracker.window_start > 0 && rule->tcp_window > 0) {
- ev_tstamp elapsed = rspamd_get_calendar_ticks() - rule->rate_tracker.window_start;
- if (elapsed > 0) {
- double rate = (double) rule->rate_tracker.requests_count / elapsed;
- if (rate > rule->tcp_threshold) {
- return TRUE;
- }
+ /* TCP auto-switch based on rate */
+ if (rule->tcp_auto && rule->tcp_window > 0) {
+ ev_tstamp elapsed = now - rule->rate_tracker.window_start;
+ if (elapsed > 0) {
+ double rate = (double) rule->rate_tracker.requests_count / elapsed;
+ if (rate > rule->tcp_threshold) {
+ return TRUE;
}
}
}
return FALSE;
}
+/* Forward declarations for TCP functions */
+static void fuzzy_tcp_io_handler(int fd, short what, gpointer ud);
+static void fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn);
+static void fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn);
+
+/* Forward declarations for helper functions */
+static gboolean fuzzy_rule_has_encryption(struct fuzzy_rule *rule);
+static void fuzzy_insert_result(struct fuzzy_client_session *session,
+ const struct rspamd_fuzzy_reply *rep,
+ struct rspamd_fuzzy_cmd *cmd,
+ struct fuzzy_cmd_io *io,
+ unsigned int flag);
+
+#define FUZZY_TCP_RETAIN(x) REF_RETAIN(x)
+#define FUZZY_TCP_RELEASE(x) REF_RELEASE(x)
+
/**
- * Initiate asynchronous TCP connection (placeholder)
- * This function will be implemented later with actual TCP connection logic
+ * Free TCP connection resources
*/
static void
-fuzzy_tcp_connect_async(struct fuzzy_rule *rule)
+fuzzy_tcp_connection_free(struct fuzzy_tcp_connection *conn)
{
- /* Mark connection as in progress */
- if (!rule->tcp_state.connecting && !rule->tcp_state.connected) {
- rule->tcp_state.connecting = TRUE;
-
- /* TODO: Implement actual async TCP connection
- * - Create socket
- * - Set non-blocking mode
- * - Start connect()
- * - Register event handler for connection completion
- * - On success: set tcp_state.connected = TRUE, connecting = FALSE
- * - On failure: set connecting = FALSE, schedule retry
- */
+ if (conn->fd != -1) {
+ rspamd_ev_watcher_stop(conn->event_loop, &conn->ev);
+ close(conn->fd);
+ }
+
+ if (conn->write_queue) {
+ g_queue_free(conn->write_queue);
+ }
+
+ g_free(conn);
+}
+
+/**
+ * Create new TCP connection structure
+ */
+static struct fuzzy_tcp_connection *
+fuzzy_tcp_connection_new(struct fuzzy_rule *rule, struct ev_loop *event_loop)
+{
+ struct fuzzy_tcp_connection *conn;
+
+ conn = g_malloc0(sizeof(struct fuzzy_tcp_connection));
+ conn->rule = rule;
+ conn->fd = -1;
+ conn->event_loop = event_loop;
+ conn->write_queue = g_queue_new();
+ conn->connected = FALSE;
+ conn->connecting = FALSE;
+ conn->failed = FALSE;
+ conn->cur_frame_state = 0x0000;
+ conn->bytes_sent = 0;
+ conn->bytes_unprocessed = 0;
+
+ REF_INIT_RETAIN(conn, fuzzy_tcp_connection_free);
+
+ return conn;
+}
+
+/**
+ * Initiate asynchronous TCP connection for specific upstream
+ */
+static struct fuzzy_tcp_connection *
+fuzzy_tcp_connect_async(struct fuzzy_rule *rule,
+ struct upstream *upstream,
+ struct rspamd_task *task,
+ gboolean is_write_server)
+{
+ struct fuzzy_tcp_connection *conn;
+ rspamd_inet_addr_t *addr;
+ int fd;
+
+ /* Get current server address (not next, to avoid address rotation) */
+ addr = rspamd_upstream_addr_cur(upstream);
+ if (!addr) {
+ msg_warn_task("fuzzy_tcp: no address for upstream %s in rule %s",
+ rspamd_upstream_name(upstream),
+ rule->name);
+ return NULL;
+ }
+
+ rspamd_inet_address_set_port(addr, rspamd_upstream_port(upstream));
+
+ /* Create non-blocking TCP socket */
+ fd = rspamd_inet_address_connect(addr, SOCK_STREAM, TRUE);
+ if (fd == -1) {
+ msg_warn_task("fuzzy_tcp: cannot connect to %s (%s): %s",
+ rspamd_upstream_name(upstream),
+ rspamd_inet_address_to_string_pretty(addr),
+ strerror(errno));
+ rspamd_upstream_fail(upstream, FALSE, strerror(errno));
+ return NULL;
+ }
+
+ /* Create connection structure */
+ conn = fuzzy_tcp_connection_new(rule, task->event_loop);
+ conn->fd = fd;
+ conn->server = upstream;
+ conn->addr = addr;
+ conn->connecting = TRUE;
+ conn->connect_start = rspamd_get_calendar_ticks();
+
+ /* Determine encryption keys for this connection */
+ if (fuzzy_rule_has_encryption(rule)) {
+ if (is_write_server) {
+ /* Write server - use write keys */
+ conn->local_key = rule->write_local_key ? rule->write_local_key : rule->local_key;
+ conn->peer_key = rule->write_peer_key ? rule->write_peer_key : rule->peer_key;
+ }
+ else {
+ /* Read server - use read keys */
+ conn->local_key = rule->read_local_key ? rule->read_local_key : rule->local_key;
+ conn->peer_key = rule->read_peer_key ? rule->read_peer_key : rule->peer_key;
+ }
+ conn->encrypted = TRUE;
+ }
+ else {
+ conn->local_key = NULL;
+ conn->peer_key = NULL;
+ conn->encrypted = FALSE;
+ }
+
+ /* Initialize event watcher for connection establishment (wait for write) */
+ rspamd_ev_watcher_init(&conn->ev, fd, EV_WRITE,
+ fuzzy_tcp_io_handler, conn);
+ rspamd_ev_watcher_start(conn->event_loop, &conn->ev, rule->tcp_timeout);
+
+ FUZZY_TCP_RETAIN(conn);
+
+ /* Store in connection pool array */
+ g_ptr_array_add(rule->tcp_connections, conn);
+
+ msg_info_task("fuzzy_tcp: initiating connection to %s for rule %s",
+ rspamd_inet_address_to_string_pretty(addr),
+ rule->name);
+
+ return conn;
+}
+
+/**
+ * Get or create TCP connection for specific upstream
+ * Returns existing connection if available, creates new one if needed
+ */
+static struct fuzzy_tcp_connection *
+fuzzy_tcp_get_or_create_connection(struct fuzzy_rule *rule,
+ struct upstream *upstream,
+ struct rspamd_task *task,
+ gboolean is_write_server)
+{
+ struct fuzzy_tcp_connection *conn = NULL;
+ guint i;
+
+ /* Search for existing connection to this upstream */
+ for (i = 0; i < rule->tcp_connections->len; i++) {
+ struct fuzzy_tcp_connection *c = g_ptr_array_index(rule->tcp_connections, i);
+
+ if (c->server == upstream) {
+ conn = c;
+ break;
+ }
+ }
+
+ if (conn) {
+ /* Connection exists - check state */
+ if (conn->connected) {
+ /* Ready to use */
+ return conn;
+ }
+ else if (conn->connecting) {
+ /* Connection in progress - cannot use yet */
+ return NULL;
+ }
+ else if (conn->failed) {
+ /* Previous connection failed - remove and recreate */
+ g_ptr_array_remove(rule->tcp_connections, conn);
+ conn = NULL;
+ }
+ }
+
+ /* No connection or failed - create new one */
+ if (!conn) {
+ conn = fuzzy_tcp_connect_async(rule, upstream, task, is_write_server);
+ }
+
+ return conn;
+}
+
+/**
+ * Main TCP I/O event handler
+ * Handles connection establishment, reads, writes, and timeouts
+ */
+static void
+fuzzy_tcp_io_handler(int fd, short what, gpointer ud)
+{
+ struct fuzzy_tcp_connection *conn = ud;
+ int so_error = 0;
+ socklen_t so_len = sizeof(so_error);
+
+ FUZZY_TCP_RETAIN(conn);
+
+ conn->last_activity = rspamd_get_calendar_ticks();
+
+ if (what == EV_TIMEOUT) {
+ msg_warn("fuzzy_tcp: connection timeout for rule %s to %s",
+ conn->rule->name,
+ rspamd_upstream_name(conn->server));
+ conn->failed = TRUE;
+ conn->connecting = FALSE;
+ rspamd_upstream_fail(conn->server, TRUE, "timeout");
+ FUZZY_TCP_RELEASE(conn);
+ return;
+ }
+
+ if (what == EV_WRITE) {
+ /* Check if we're still connecting */
+ if (conn->connecting && !conn->connected) {
+ /* Verify connection succeeded */
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_len) == -1) {
+ msg_warn("fuzzy_tcp: getsockopt failed for rule %s to %s: %s",
+ conn->rule->name,
+ rspamd_upstream_name(conn->server),
+ strerror(errno));
+ conn->failed = TRUE;
+ conn->connecting = FALSE;
+ rspamd_upstream_fail(conn->server, TRUE, "getsockopt failed");
+ FUZZY_TCP_RELEASE(conn);
+ return;
+ }
+
+ if (so_error != 0) {
+ msg_warn("fuzzy_tcp: connection failed for rule %s to %s: %s",
+ conn->rule->name,
+ rspamd_upstream_name(conn->server),
+ strerror(so_error));
+ conn->failed = TRUE;
+ conn->connecting = FALSE;
+ rspamd_upstream_fail(conn->server, TRUE, strerror(so_error));
+ FUZZY_TCP_RELEASE(conn);
+ return;
+ }
+
+ /* Connection established! */
+ conn->connected = TRUE;
+ conn->connecting = FALSE;
+
+ msg_info("fuzzy_tcp: connection established to %s for rule %s",
+ rspamd_inet_address_to_string_pretty(conn->addr),
+ conn->rule->name);
+
+ rspamd_upstream_ok(conn->server);
+
+ /* Now wait for both read and write events */
+ rspamd_ev_watcher_reschedule(conn->event_loop, &conn->ev,
+ EV_READ | EV_WRITE);
+ }
+ else if (conn->connected) {
+ /* Handle write */
+ fuzzy_tcp_write_handler(conn);
+ }
+ }
+
+ if (what == EV_READ && conn->connected) {
+ /* Handle read */
+ fuzzy_tcp_read_handler(conn);
+ }
+
+ FUZZY_TCP_RELEASE(conn);
+}
+
+/**
+ * TCP write handler
+ * Sends queued commands to the server with TCP framing
+ */
+static void
+fuzzy_tcp_write_handler(struct fuzzy_tcp_connection *conn)
+{
+ struct fuzzy_tcp_write_buf *buf;
+ ssize_t r;
+
+ while ((buf = g_queue_peek_head(conn->write_queue)) != NULL) {
+ /* Write remaining data */
+ gsize remaining = buf->total_len - buf->bytes_written;
+ unsigned char *write_ptr;
+
+ /* Determine what to write: size_hdr or data */
+ if (buf->bytes_written < sizeof(buf->size_hdr)) {
+ /* Still writing size header */
+ write_ptr = (unsigned char *) &buf->size_hdr + buf->bytes_written;
+ }
+ else {
+ /* Writing data */
+ write_ptr = buf->data + (buf->bytes_written - sizeof(buf->size_hdr));
+ }
+
+ r = write(conn->fd, write_ptr, remaining);
+
+ if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ /* Cannot write more, wait for next event */
+ return;
+ }
+ msg_err("fuzzy_tcp: write error for rule %s to %s: %s",
+ conn->rule->name,
+ rspamd_upstream_name(conn->server),
+ strerror(errno));
+ conn->failed = TRUE;
+ return;
+ }
+ else if (r == 0) {
+ msg_info("fuzzy_tcp: connection closed by %s for rule %s",
+ rspamd_upstream_name(conn->server),
+ conn->rule->name);
+ conn->failed = TRUE;
+ return;
+ }
+
+ buf->bytes_written += r;
+
+ if (buf->bytes_written >= buf->total_len) {
+ /* Buffer fully sent, remove from queue and free */
+ g_queue_pop_head(conn->write_queue);
+ g_free(buf->data);
+ g_free(buf);
+ }
+ else {
+ /* Partial write, wait for next event */
+ return;
+ }
+ }
+
+ /* Queue is empty, no more data to write */
+ /* Might want to disable EV_WRITE here if needed */
+}
+
+/**
+ * Process a single TCP reply frame
+ * Decrypts reply, matches with pending command by tag, delivers result
+ */
+static void
+fuzzy_tcp_process_reply(struct fuzzy_tcp_connection *conn,
+ unsigned char *data, gsize len)
+{
+ struct rspamd_fuzzy_encrypted_reply encrep;
+ const struct rspamd_fuzzy_reply *rep;
+ struct fuzzy_rule *rule = conn->rule;
+ unsigned int required_size;
+ struct fuzzy_tcp_pending_command *pending;
+ uint32_t tag;
+
+ /* Check if we have encryption */
+ if (conn->encrypted) {
+ required_size = sizeof(encrep);
+ }
+ else {
+ required_size = sizeof(struct rspamd_fuzzy_reply);
+ }
+
+ if (len < required_size) {
+ msg_warn("fuzzy_tcp: invalid reply size %d from %s, expected at least %d",
+ (int) len, rspamd_upstream_name(conn->server), (int) required_size);
+ return;
+ }
+
+ /* Decrypt if needed - use keys from connection */
+ if (conn->encrypted) {
+ memcpy(&encrep, data, sizeof(encrep));
+
+ /* Process keys through cache */
+ rspamd_keypair_cache_process(rule->ctx->keypairs_cache,
+ conn->local_key, conn->peer_key);
+
+ /* Decrypt with connection keys */
+ if (!rspamd_cryptobox_decrypt_nm_inplace((unsigned char *) &encrep.rep,
+ sizeof(encrep.rep),
+ encrep.hdr.nonce,
+ rspamd_pubkey_get_nm(conn->peer_key, conn->local_key),
+ encrep.hdr.mac)) {
+ msg_warn("fuzzy_tcp: cannot decrypt reply from %s",
+ rspamd_upstream_name(conn->server));
+ return;
+ }
+
+ rep = &encrep.rep;
+ }
+ else {
+ rep = (const struct rspamd_fuzzy_reply *) data;
+ }
+
+ /* Extract tag and lookup pending command */
+ tag = rep->v1.tag;
+ pending = g_hash_table_lookup(rule->pending_requests, GINT_TO_POINTER(tag));
+
+ if (!pending) {
+ msg_debug("fuzzy_tcp: unexpected tag %u from %s",
+ tag, rspamd_upstream_name(conn->server));
+ return;
+ }
+
+ /* Process the reply */
+ if (rep->v1.prob > 0.5) {
+ if (pending->io->cmd.cmd == FUZZY_CHECK) {
+ fuzzy_insert_result(pending->session, rep, &pending->io->cmd,
+ pending->io, rep->v1.flag);
+ }
+ }
+
+ msg_debug("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
+ tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
+
+ /* Remove from pending requests */
+ g_hash_table_remove(rule->pending_requests, GINT_TO_POINTER(tag));
+}
+
+/**
+ * TCP read handler
+ * Reads replies from server, parses framing, matches with pending commands
+ */
+static void
+fuzzy_tcp_read_handler(struct fuzzy_tcp_connection *conn)
+{
+ ssize_t r;
+ gsize available_space;
+
+ /* Read data from socket into buffer */
+ available_space = sizeof(conn->read_buf) - conn->bytes_unprocessed;
+ if (available_space == 0) {
+ msg_err("fuzzy_tcp: read buffer full for rule %s, closing connection",
+ conn->rule->name);
+ conn->failed = TRUE;
+ return;
+ }
+
+ r = read(conn->fd, conn->read_buf + conn->bytes_unprocessed, available_space);
+
+ if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ return; /* Try again later */
+ }
+ msg_err("fuzzy_tcp: read error for rule %s: %s",
+ conn->rule->name, strerror(errno));
+ conn->failed = TRUE;
+ return;
+ }
+ else if (r == 0) {
+ msg_info("fuzzy_tcp: connection closed by %s for rule %s",
+ rspamd_upstream_name(conn->server),
+ conn->rule->name);
+ conn->failed = TRUE;
+ return;
+ }
+
+ conn->bytes_unprocessed += r;
+
+ /* Process frames using state machine */
+ unsigned int processed_offset = 0;
+
+ while (processed_offset < conn->bytes_unprocessed) {
+ uint16_t frame_len;
+
+ /* State: 0x0000 - need first byte of length */
+ if ((conn->cur_frame_state & 0xC000) == 0x0000) {
+ if (processed_offset < conn->bytes_unprocessed) {
+ conn->cur_frame_state = 0x8000 | conn->read_buf[processed_offset];
+ processed_offset++;
+ }
+ else {
+ break;
+ }
+ }
+
+ /* State: 0x8000 - need second byte of length */
+ if ((conn->cur_frame_state & 0xC000) == 0x8000) {
+ if (processed_offset < conn->bytes_unprocessed) {
+ uint16_t first_byte = conn->cur_frame_state & 0xFF;
+ uint16_t second_byte = conn->read_buf[processed_offset];
+ conn->cur_frame_state = 0xC000 | ((first_byte << 8) | second_byte);
+ processed_offset++;
+ }
+ else {
+ break;
+ }
+ }
+
+ /* State: 0xC000 - have length, reading data */
+ frame_len = conn->cur_frame_state & 0x3FFF;
+
+ if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_reply)) {
+ msg_err("fuzzy_tcp: invalid frame length %d from %s, closing",
+ (int) frame_len,
+ rspamd_upstream_name(conn->server));
+ conn->failed = TRUE;
+ return;
+ }
+
+ /* Check if we have complete frame */
+ if (conn->bytes_unprocessed - processed_offset >= frame_len) {
+ /* Process complete frame - decrypt and deliver to session */
+ fuzzy_tcp_process_reply(conn, conn->read_buf + processed_offset, frame_len);
+
+ processed_offset += frame_len;
+ conn->cur_frame_state = 0x0000; /* Reset for next frame */
+ }
+ else {
+ /* Incomplete frame, wait for more data */
+ break;
+ }
+ }
+
+ /* Move unprocessed data to beginning of buffer */
+ if (processed_offset > 0) {
+ if (processed_offset < conn->bytes_unprocessed) {
+ memmove(conn->read_buf,
+ conn->read_buf + processed_offset,
+ conn->bytes_unprocessed - processed_offset);
+ conn->bytes_unprocessed -= processed_offset;
+ }
+ else {
+ conn->bytes_unprocessed = 0;
+ }
}
}
rule->rate_tracker.requests_count = 0;
rule->rate_tracker.window_start = 0;
- /* Initialize TCP connection state */
- rule->tcp_state.connected = FALSE;
- rule->tcp_state.connecting = FALSE;
- rule->tcp_state.connection = NULL;
+ /* Initialize TCP connection pool - array of connections */
+ rule->tcp_connections = g_ptr_array_new();
+
+ /* Initialize global pending requests pool - keyed by tag */
+ rule->pending_requests = g_hash_table_new_full(g_direct_hash, g_direct_equal,
+ NULL, g_free);
/*
* Process rule in Lua
ev_tstamp now = rspamd_get_calendar_ticks();
fuzzy_update_rate_tracker(rule, now);
- /* Decide whether to use TCP or UDP */
- gboolean use_tcp = fuzzy_should_use_tcp(rule);
- int sock_type = SOCK_DGRAM; /* Default to UDP */
-
- /* If TCP should be used but not connected, initiate connection */
- if ((rule->tcp_enabled || rule->tcp_auto) && !rule->tcp_state.connected) {
- /* Check if rate threshold exceeded for auto-switch */
- if (rule->tcp_auto && rule->tcp_window > 0) {
- ev_tstamp elapsed = now - rule->rate_tracker.window_start;
- if (elapsed > 0) {
- double rate = (double) rule->rate_tracker.requests_count / elapsed;
- if (rate > rule->tcp_threshold) {
- /* Rate exceeded, initiate TCP connection asynchronously */
- fuzzy_tcp_connect_async(rule);
- }
- }
- }
- else if (rule->tcp_enabled) {
- /* TCP explicitly enabled, try to connect */
- fuzzy_tcp_connect_async(rule);
- }
+ /* Get upstream first - use read_servers for check operations */
+ selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL, 0);
+ if (!selected) {
+ msg_warn_task("cannot get upstream for rule %s", rule->name);
+ g_ptr_array_free(commands, TRUE);
+ return;
}
- /* Use TCP if available and should be used, otherwise fall back to UDP */
- if (use_tcp) {
- sock_type = SOCK_STREAM;
+ /* Try TCP if enabled/auto and rate threshold exceeded */
+ struct fuzzy_tcp_connection *tcp_conn = NULL;
+ if (fuzzy_should_try_tcp(rule, now)) {
+ /* This is read server (CHECK operation) */
+ tcp_conn = fuzzy_tcp_get_or_create_connection(rule, selected, task, FALSE);
}
- /* Get upstream - use read_servers for check operations */
- selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
- NULL, 0);
+ /* For now, always use UDP (TCP write/read handlers not ready yet) */
+ /* TODO: Use tcp_conn when TCP I/O handlers are implemented */
+ if (tcp_conn && tcp_conn->connected) {
+ /* TCP connection available - will use it in future */
+ msg_debug_task("TCP connection available for %s, but not using yet (handlers not ready)",
+ rspamd_upstream_name(selected));
+ }
+
+ /* Use UDP for now */
if (selected) {
- addr = rspamd_upstream_addr_next(selected);
- if ((sock = rspamd_inet_address_connect(addr, sock_type, TRUE)) == -1) {
+ addr = rspamd_upstream_addr_cur(selected);
+ rspamd_inet_address_set_port(addr, rspamd_upstream_port(selected));
+
+ if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
msg_warn_task("cannot connect to %s(%s), %d, %s",
rspamd_upstream_name(selected),
rspamd_inet_address_to_string_pretty(addr),