gpointer key, value;
struct fuzzy_tcp_pending_command *pending;
GPtrArray *to_remove;
+ GHashTable *sessions_to_check;
unsigned int i;
struct rspamd_task *task = NULL;
return;
}
- /* Collect tags to remove (can't modify hash table during iteration) */
+ /* Collect commands to remove and sessions to check */
to_remove = g_ptr_array_new();
+ sessions_to_check = g_hash_table_new(g_direct_hash, g_direct_equal);
g_hash_table_iter_init(&iter, conn->rule->pending_requests);
while (g_hash_table_iter_next(&iter, &key, &value)) {
pending = (struct fuzzy_tcp_pending_command *) value;
if (pending->connection == conn) {
+ /* Mark command as replied (failed) */
+ if (!(pending->io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+ pending->io->flags |= FUZZY_CMD_FLAG_REPLIED;
+ }
+
+ /* Collect session for completion check */
+ if (pending->session) {
+ g_hash_table_add(sessions_to_check, pending->session);
+ }
+
g_ptr_array_add(to_remove, key);
/* Get task for logging */
}
}
- /* Remove pending commands */
+ /* Remove pending commands from hash table */
for (i = 0; i < to_remove->len; i++) {
g_hash_table_remove(conn->rule->pending_requests,
g_ptr_array_index(to_remove, i));
(int) to_remove->len, rspamd_upstream_name(conn->server));
}
+ /* Check session completion for all affected sessions */
+ GHashTableIter session_iter;
+ struct fuzzy_client_session *session;
+ g_hash_table_iter_init(&session_iter, sessions_to_check);
+ while (g_hash_table_iter_next(&session_iter, (gpointer *) &session, NULL)) {
+ fuzzy_check_session_is_completed(session);
+ }
+
g_ptr_array_free(to_remove, TRUE);
+ g_hash_table_unref(sessions_to_check);
}
/**
gpointer key, value;
struct fuzzy_tcp_pending_command *pending;
GPtrArray *to_remove;
+ GHashTable *sessions_to_check;
unsigned int i;
ev_tstamp timeout;
timeout = rule->io_timeout;
to_remove = g_ptr_array_new();
+ sessions_to_check = g_hash_table_new(g_direct_hash, g_direct_equal);
g_hash_table_iter_init(&iter, rule->pending_requests);
while (g_hash_table_iter_next(&iter, &key, &value)) {
/* Check if request timed out */
if ((now - pending->send_time) > timeout) {
+ /* Mark command as replied (timed out) */
+ if (!(pending->io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+ pending->io->flags |= FUZZY_CMD_FLAG_REPLIED;
+ }
+
+ /* Collect session for completion check */
+ if (pending->session) {
+ g_hash_table_add(sessions_to_check, pending->session);
+ }
+
g_ptr_array_add(to_remove, key);
if (pending->task) {
struct rspamd_task *task = pending->task;
- msg_info_task("fuzzy_tcp: request timeout after %.2fs for tag %u to %s",
+ msg_info_task("fuzzy_tcp: request timeout after %.2fs for tag %ud to %s",
now - pending->send_time,
pending->io->tag,
rspamd_upstream_name(pending->connection->server));
g_ptr_array_index(to_remove, i));
}
+ /* Check session completion for all affected sessions */
+ GHashTableIter session_iter;
+ struct fuzzy_client_session *session;
+ g_hash_table_iter_init(&session_iter, sessions_to_check);
+ while (g_hash_table_iter_next(&session_iter, (gpointer *) &session, NULL)) {
+ fuzzy_check_session_is_completed(session);
+ }
+
g_ptr_array_free(to_remove, TRUE);
+ g_hash_table_unref(sessions_to_check);
}
/**
while ((buf = g_queue_peek_head(conn->write_queue)) != NULL) {
/* Write remaining data */
- gsize remaining = buf->total_len - buf->bytes_written;
+ gsize remaining;
unsigned char *write_ptr;
/* Determine what to write: size_hdr or data */
if (buf->bytes_written < sizeof(buf->size_hdr)) {
/* Still writing size header */
write_ptr = (unsigned char *) &buf->size_hdr + buf->bytes_written;
+ remaining = sizeof(buf->size_hdr) - buf->bytes_written;
}
else {
/* Writing data */
write_ptr = buf->data + (buf->bytes_written - sizeof(buf->size_hdr));
+ remaining = buf->total_len - buf->bytes_written;
}
r = write(conn->fd, write_ptr, remaining);
/* Mark as sent */
io->flags |= FUZZY_CMD_FLAG_SENT;
- msg_debug_fuzzy_check("fuzzy_tcp: queued command with tag %u to %s",
+ msg_debug_fuzzy_check("fuzzy_tcp: queued command with tag %ud to %s",
io->tag, rspamd_upstream_name(conn->server));
}
pending = g_hash_table_lookup(rule->pending_requests, GINT_TO_POINTER(tag));
if (!pending) {
- msg_debug("fuzzy_tcp: unexpected tag %u from %s",
+ msg_debug("fuzzy_tcp: unexpected tag %ud from %s",
tag, rspamd_upstream_name(conn->server));
return;
}
pending->io->flags |= FUZZY_CMD_FLAG_REPLIED;
}
- msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %u from %s (prob=%.2f)",
+ msg_debug_fuzzy_check("fuzzy_tcp: processed reply with tag %ud from %s (prob=%.2f)",
tag, rspamd_upstream_name(conn->server), (double) rep->v1.prob);
/* Remove from pending requests */
rule->alg = RSPAMD_SHINGLES_OLD;
rule->skip_map = NULL;
+ msg_debug_config("parsing fuzzy rule '%s'", name ? name : "default");
+
if ((value = ucl_object_lookup(obj, "skip_hashes")) != NULL) {
rspamd_map_add_from_ucl(cfg, value,
"Fuzzy hashes whitelist",
rule->no_subject = ucl_obj_toboolean(value);
}
+ /* TCP configuration */
+ if ((value = ucl_object_lookup(obj, "tcp")) != NULL) {
+ const char *tcp_mode = NULL;
+
+ msg_debug_config("rule %s: found 'tcp' option, type=%d",
+ name ? name : "default", ucl_object_type(value));
+
+ if (ucl_object_type(value) == UCL_BOOLEAN) {
+ if (ucl_object_toboolean(value)) {
+ rule->tcp_enabled = TRUE;
+ rule->tcp_auto = FALSE;
+ }
+ }
+ else if (ucl_object_type(value) == UCL_STRING) {
+ tcp_mode = ucl_object_tostring(value);
+
+ if (g_ascii_strcasecmp(tcp_mode, "yes") == 0 ||
+ g_ascii_strcasecmp(tcp_mode, "true") == 0) {
+ rule->tcp_enabled = TRUE;
+ rule->tcp_auto = FALSE;
+ }
+ else if (g_ascii_strcasecmp(tcp_mode, "auto") == 0) {
+ rule->tcp_auto = TRUE;
+ rule->tcp_enabled = FALSE;
+ }
+ else if (g_ascii_strcasecmp(tcp_mode, "no") == 0 ||
+ g_ascii_strcasecmp(tcp_mode, "false") == 0) {
+ rule->tcp_enabled = FALSE;
+ rule->tcp_auto = FALSE;
+ }
+ else {
+ msg_warn_config("unknown tcp mode: %s, TCP disabled", tcp_mode);
+ }
+ }
+ }
+ else {
+ msg_debug_config("rule %s: 'tcp' option not found in configuration",
+ name ? name : "default");
+ }
+
+ if ((value = ucl_object_lookup(obj, "tcp_threshold")) != NULL) {
+ msg_debug_config("rule %s: found tcp_threshold=%.2f",
+ name ? name : "default", ucl_obj_todouble(value));
+ rule->tcp_threshold = ucl_obj_todouble(value);
+ }
+ else {
+ rule->tcp_threshold = 1.0; /* Default: 1 req/sec */
+ }
+
+ if ((value = ucl_object_lookup(obj, "tcp_window")) != NULL) {
+ rule->tcp_window = ucl_obj_todouble(value);
+ }
+ else {
+ rule->tcp_window = 1.0; /* Default: 1 second window */
+ }
+
+ if ((value = ucl_object_lookup(obj, "tcp_timeout")) != NULL) {
+ rule->tcp_timeout = ucl_obj_todouble(value);
+ }
+ else {
+ rule->tcp_timeout = 5.0; /* Default: 5 seconds */
+ }
+
+ /* Log TCP configuration */
+ if (rule->tcp_enabled) {
+ msg_info_config("rule %s: TCP explicitly enabled (timeout=%.1fs)",
+ rule->name ? rule->name : "default", rule->tcp_timeout);
+ }
+ else if (rule->tcp_auto) {
+ msg_info_config("rule %s: TCP auto-switch enabled (threshold=%.2f req/s, window=%.1fs)",
+ rule->name ? rule->name : "default",
+ rule->tcp_threshold, rule->tcp_window);
+ }
+ else {
+ msg_info_config("rule %s: TCP disabled (using UDP only)",
+ rule->name ? rule->name : "default");
+ }
+
if ((value = ucl_object_lookup(obj, "algorithm")) != NULL) {
rule->algorithm_str = ucl_object_tostring(value);
pending = (struct fuzzy_tcp_pending_command *) value;
if (pending->session == session) {
+ /* Mark command as replied (session finished) */
+ if (!(pending->io->flags & FUZZY_CMD_FLAG_REPLIED)) {
+ pending->io->flags |= FUZZY_CMD_FLAG_REPLIED;
+ }
+
g_ptr_array_add(to_remove, key);
}
}
${RSPAMD_FUZZY_SERVER_MODE} servers
${RSPAMD_FUZZY_SHINGLES_KEY} null
${RSPAMD_SCOPE} Suite
-${SETTINGS_FUZZY_CHECK} ${EMPTY}
-${SETTINGS_FUZZY_WORKER} ${EMPTY}
+${RSPAMD_SETTINGS_FUZZY_CHECK} ${EMPTY}
+${RSPAMD_SETTINGS_FUZZY_WORKER} ${EMPTY}
@{MESSAGES_SKIP} ${RSPAMD_TESTDIR}/messages/priority.eml
@{MESSAGES} ${RSPAMD_TESTDIR}/messages/spam_message.eml ${RSPAMD_TESTDIR}/messages/zip.eml
@{RANDOM_MESSAGES} ${RSPAMD_TESTDIR}/messages/bad_message.eml ${RSPAMD_TESTDIR}/messages/zip-doublebad.eml
[Arguments] ${algorithm}
Set Suite Variable ${RSPAMD_FUZZY_ALGORITHM} ${algorithm}
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} servers
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}";
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}";
Rspamd Redis Setup
Fuzzy Setup Keyed
Fuzzy Setup Split Servers
Set Suite Variable ${RSPAMD_FUZZY_ALGORITHM} siphash
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} split
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} read_servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}"; write_servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}";
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} read_servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}"; write_servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}";
Rspamd Redis Setup
Fuzzy Setup Read Only
Set Suite Variable ${RSPAMD_FUZZY_ALGORITHM} siphash
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} read_only
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} read_only = true;
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} read_only = true;
Rspamd Redis Setup
Fuzzy Setup Write Only
Set Suite Variable ${RSPAMD_FUZZY_ALGORITHM} siphash
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} write_only
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} mode = "write_only";
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} mode = "write_only";
Rspamd Redis Setup
Fuzzy Setup TCP
[Arguments] ${algorithm}
Set Suite Variable ${RSPAMD_FUZZY_ALGORITHM} ${algorithm}
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} servers
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}"; tcp = "auto"; tcp_threshold = 5;
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} tcp = "auto"; tcp_threshold = 5;
Rspamd Redis Setup
Fuzzy Setup TCP Siphash
Set Suite Variable ${RSPAMD_FUZZY_CLIENT_ENCRYPTION_KEY} ${RSPAMD_KEY_PUB1}
Set Suite Variable ${RSPAMD_FUZZY_INCLUDE} ${RSPAMD_TESTDIR}/configs/fuzzy-encryption-key.conf
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} servers
- Set Suite Variable ${SETTINGS_FUZZY_WORKER} tcp = true;
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} tcp = "auto"; tcp_threshold = 5;
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_WORKER} tcp = true;
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} tcp = "auto"; tcp_threshold = 5;
Rspamd Redis Setup
Fuzzy Setup TCP Encrypted Siphash
[Arguments] ${algorithm}
Set Suite Variable ${RSPAMD_FUZZY_ALGORITHM} ${algorithm}
Set Suite Variable ${RSPAMD_FUZZY_SERVER_MODE} servers
- Set Suite Variable ${SETTINGS_FUZZY_WORKER} tcp = true;
- Set Suite Variable ${SETTINGS_FUZZY_CHECK} servers = "${RSPAMD_LOCAL_ADDR}:${RSPAMD_PORT_FUZZY}"; tcp = "yes";
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_WORKER} tcp = true;
+ Set Suite Variable ${RSPAMD_SETTINGS_FUZZY_CHECK} tcp = "yes";
Rspamd Redis Setup
Fuzzy Setup TCP Explicit Siphash