struct rspamd_hash_map_helper *skip_map;
struct fuzzy_ctx *ctx;
int lua_id;
+
+ /* TCP configuration */
+ gboolean tcp_enabled; /* Explicitly enable TCP */
+ gboolean tcp_auto; /* Auto-switch to TCP based on request rate */
+ double tcp_threshold; /* Requests/sec threshold for auto-switch (default: 1.0) */
+ double tcp_window; /* Time window for rate calculation in seconds (default: 1.0) */
+ double tcp_timeout; /* TCP connection timeout (default: 5.0) */
+
+ /* Rate tracking for TCP auto-switch */
+ struct {
+ uint32_t requests_count; /* Number of requests in current window */
+ ev_tstamp window_start; /* Start of the current window */
+ } rate_tracker;
+
+ /* TCP connection state */
+ struct {
+ gboolean connected; /* TCP connection is active and ready */
+ gboolean connecting; /* TCP connection in progress */
+ void *connection; /* Placeholder for TCP connection handle (future) */
+ } tcp_state;
};
struct fuzzy_ctx {
}
}
+/**
+ * Update the rate tracker for TCP auto-switch decision
+ * Uses a sliding window to track request rate
+ */
+static void
+fuzzy_update_rate_tracker(struct fuzzy_rule *rule, ev_tstamp now)
+{
+ /* Check if we need to reset the window */
+ if (rule->rate_tracker.window_start == 0 ||
+ (now - rule->rate_tracker.window_start) >= rule->tcp_window) {
+ /* Start new window */
+ rule->rate_tracker.window_start = now;
+ rule->rate_tracker.requests_count = 1;
+ }
+ else {
+ /* Increment counter in current window */
+ rule->rate_tracker.requests_count++;
+ }
+}
+
+/**
+ * Determine whether to use TCP for this request
+ * Returns TRUE if TCP should be used, FALSE for UDP
+ */
+static gboolean
+fuzzy_should_use_tcp(struct fuzzy_rule *rule)
+{
+ /* If TCP is explicitly enabled, always prefer TCP when connected */
+ if (rule->tcp_enabled && rule->tcp_state.connected) {
+ return TRUE;
+ }
+
+ /* If auto-switch is enabled and TCP is connected */
+ if (rule->tcp_auto && rule->tcp_state.connected) {
+ /* Calculate current rate */
+ if (rule->rate_tracker.window_start > 0 && rule->tcp_window > 0) {
+ ev_tstamp elapsed = rspamd_get_calendar_ticks() - rule->rate_tracker.window_start;
+ if (elapsed > 0) {
+ double rate = (double) rule->rate_tracker.requests_count / elapsed;
+ if (rate > rule->tcp_threshold) {
+ return TRUE;
+ }
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+/**
+ * Initiate asynchronous TCP connection (placeholder)
+ * This function will be implemented later with actual TCP connection logic
+ */
+static void
+fuzzy_tcp_connect_async(struct fuzzy_rule *rule)
+{
+ /* Mark connection as in progress */
+ if (!rule->tcp_state.connecting && !rule->tcp_state.connected) {
+ rule->tcp_state.connecting = TRUE;
+
+ /* TODO: Implement actual async TCP connection
+ * - Create socket
+ * - Set non-blocking mode
+ * - Start connect()
+ * - Register event handler for connection completion
+ * - On success: set tcp_state.connected = TRUE, connecting = FALSE
+ * - On failure: set connecting = FALSE, schedule retry
+ */
+ }
+}
+
static int
fuzzy_parse_rule(struct rspamd_config *cfg, const ucl_object_t *obj,
const char *name, int cb_id)
rule->weight_threshold = ucl_object_todouble(value);
}
+ /* TCP configuration */
+ if ((value = ucl_object_lookup(obj, "tcp")) != NULL) {
+ const ucl_object_t *tcp_obj;
+
+ if (ucl_object_type(value) == UCL_BOOLEAN) {
+ /* Simple boolean: enable/disable TCP */
+ rule->tcp_enabled = ucl_object_toboolean(value);
+ rule->tcp_auto = FALSE;
+ }
+ else if (ucl_object_type(value) == UCL_OBJECT) {
+ /* Object with detailed TCP configuration */
+
+ if ((tcp_obj = ucl_object_lookup(value, "enabled")) != NULL) {
+ rule->tcp_enabled = ucl_object_toboolean(tcp_obj);
+ }
+
+ if ((tcp_obj = ucl_object_lookup(value, "auto")) != NULL) {
+ rule->tcp_auto = ucl_object_toboolean(tcp_obj);
+ }
+
+ if ((tcp_obj = ucl_object_lookup(value, "threshold")) != NULL) {
+ rule->tcp_threshold = ucl_object_todouble(tcp_obj);
+ }
+ else {
+ rule->tcp_threshold = 1.0; /* Default: >1 req/sec */
+ }
+
+ if ((tcp_obj = ucl_object_lookup(value, "window")) != NULL) {
+ rule->tcp_window = ucl_object_todouble(tcp_obj);
+ }
+ else {
+ rule->tcp_window = 1.0; /* Default: 1 second window */
+ }
+
+ if ((tcp_obj = ucl_object_lookup(value, "timeout")) != NULL) {
+ rule->tcp_timeout = ucl_object_todouble(tcp_obj);
+ }
+ else {
+ rule->tcp_timeout = 5.0; /* Default: 5 seconds */
+ }
+ }
+ }
+ else {
+ /* Default values if no TCP configuration */
+ rule->tcp_enabled = FALSE;
+ rule->tcp_auto = FALSE;
+ rule->tcp_threshold = 1.0;
+ rule->tcp_window = 1.0;
+ rule->tcp_timeout = 5.0;
+ }
+
+ /* Initialize rate tracker */
+ rule->rate_tracker.requests_count = 0;
+ rule->rate_tracker.window_start = 0;
+
+ /* Initialize TCP connection state */
+ rule->tcp_state.connected = FALSE;
+ rule->tcp_state.connecting = FALSE;
+ rule->tcp_state.connection = NULL;
+
/*
* Process rule in Lua
*/
int sock;
if (!rspamd_session_blocked(task->s)) {
+ /* Update rate tracker for TCP auto-switch decision */
+ ev_tstamp now = rspamd_get_calendar_ticks();
+ fuzzy_update_rate_tracker(rule, now);
+
+ /* Decide whether to use TCP or UDP */
+ gboolean use_tcp = fuzzy_should_use_tcp(rule);
+ int sock_type = SOCK_DGRAM; /* Default to UDP */
+
+ /* If TCP should be used but not connected, initiate connection */
+ if ((rule->tcp_enabled || rule->tcp_auto) && !rule->tcp_state.connected) {
+ /* Check if rate threshold exceeded for auto-switch */
+ if (rule->tcp_auto && rule->tcp_window > 0) {
+ ev_tstamp elapsed = now - rule->rate_tracker.window_start;
+ if (elapsed > 0) {
+ double rate = (double) rule->rate_tracker.requests_count / elapsed;
+ if (rate > rule->tcp_threshold) {
+ /* Rate exceeded, initiate TCP connection asynchronously */
+ fuzzy_tcp_connect_async(rule);
+ }
+ }
+ }
+ else if (rule->tcp_enabled) {
+ /* TCP explicitly enabled, try to connect */
+ fuzzy_tcp_connect_async(rule);
+ }
+ }
+
+ /* Use TCP if available and should be used, otherwise fall back to UDP */
+ if (use_tcp) {
+ sock_type = SOCK_STREAM;
+ }
+
/* Get upstream - use read_servers for check operations */
selected = rspamd_upstream_get(rule->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL, 0);
if (selected) {
addr = rspamd_upstream_addr_next(selected);
- if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
+ if ((sock = rspamd_inet_address_connect(addr, sock_type, TRUE)) == -1) {
msg_warn_task("cannot connect to %s(%s), %d, %s",
rspamd_upstream_name(selected),
rspamd_inet_address_to_string_pretty(addr),