struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */
};
+/*
+ * Connection-level state for a TCP fuzzy session. Per-command state
+ * (parsed cmd, key retain, extensions buffer, decrypt nm, ip_stat) lives
+ * on the per-frame `struct fuzzy_session` and dies with it — never copy
+ * it onto the long-lived TCP session.
+ */
struct fuzzy_common_session {
struct rspamd_fuzzy_storage_ctx *ctx;
int fd;
ev_tstamp timestamp;
struct rspamd_worker *worker;
rspamd_inet_addr_t *addr;
-
- enum rspamd_fuzzy_epoch epoch;
- enum fuzzy_cmd_type cmd_type;
- struct rspamd_fuzzy_shingle_cmd cmd;
- struct fuzzy_key *key;
- struct rspamd_fuzzy_cmd_extension *extensions;
- struct fuzzy_key_stat *ip_stat;
- unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
};
struct fuzzy_tcp_session {
uint16_t cur_frame_state;
uint16_t bytes_unprocessed;
- /* Common with UDP session */
struct fuzzy_common_session common;
ref_entry_t ref;
unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
};
-struct fuzzy_udp_session {
- /* Common fields with TCP session */
- struct fuzzy_common_session common;
- struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
- ref_entry_t ref;
-};
-
struct fuzzy_peer_request {
ev_io io_ev;
struct fuzzy_peer_cmd cmd;
};
static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
-static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
struct fuzzy_tcp_reply_queue_elt *reply);
static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
close(session->common.fd);
rspamd_inet_address_free(session->common.addr);
- rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm));
session->common.worker->nconns--;
- if (session->common.ip_stat) {
- REF_RELEASE(session->common.ip_stat);
- }
-
- if (session->common.extensions) {
- g_free(session->common.extensions);
- }
-
- if (session->common.key) {
- REF_RELEASE(session->common.key);
- }
-
- g_free(session);
-}
-
-static void
-fuzzy_udp_session_destroy(gpointer d)
-{
- struct fuzzy_udp_session *session = d;
-
- rspamd_inet_address_free(session->common.addr);
- rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm));
- session->common.worker->nconns--;
-
- if (session->common.ip_stat) {
- REF_RELEASE(session->common.ip_stat);
- }
-
- if (session->common.extensions) {
- g_free(session->common.extensions);
- }
-
- if (session->common.key) {
- REF_RELEASE(session->common.key);
- }
-
g_free(session);
}
/* Check if we have complete frame */
if (session->bytes_unprocessed - processed_offset >= frame_len) {
- /* Create heap-allocated session for async processing */
+ /*
+ * Per-frame command session. Owns its own key retain,
+ * extensions buffer, ip_stat retain, and decrypt nm —
+ * fuzzy_session_destroy releases all of that when this
+ * cmd_session is dropped. Do NOT cache any of it on the
+ * long-lived TCP session: that turned the per-frame
+ * allocations into per-frame leaks.
+ */
struct fuzzy_session *cmd_session = g_malloc0(sizeof(*cmd_session));
REF_INIT_RETAIN(cmd_session, fuzzy_session_destroy);
- /* Copy data from TCP session to command session */
cmd_session->worker = session->common.worker;
cmd_session->addr = rspamd_inet_address_copy(session->common.addr, NULL);
cmd_session->ctx = session->common.ctx;
cmd_session->fd = session->common.fd;
cmd_session->timestamp = session->common.timestamp;
- cmd_session->key = session->common.key;
- cmd_session->ip_stat = session->common.ip_stat;
- memcpy(cmd_session->nm, session->common.nm, sizeof(cmd_session->nm));
-
- /* Retain references to shared objects */
- if (cmd_session->key) {
- REF_RETAIN(cmd_session->key);
- }
- if (cmd_session->ip_stat) {
- REF_RETAIN(cmd_session->ip_stat);
- }
- /* Don't increment nconns here - TCP session already counted the connection */
- /* Set TCP session pointer so replies go to TCP queue */
+ /* Replies go to the TCP queue, not the socket directly */
cmd_session->tcp_session = session;
- REF_RETAIN(session); /* TCP session must live until command is processed */
+ REF_RETAIN(session); /* released by fuzzy_session_destroy */
if (rspamd_fuzzy_cmd_from_wire(session->input_buf + processed_offset,
frame_len, cmd_session)) {
- /* Copy parsed data back to TCP session for tracking */
- session->common.epoch = cmd_session->epoch;
- session->common.cmd_type = cmd_session->cmd_type;
- memcpy(&session->common.cmd, &cmd_session->cmd, sizeof(session->common.cmd));
-
- /* Transfer ownership of key and extensions to TCP session */
- /* Clear cmd_session pointers to avoid double-free */
- session->common.key = cmd_session->key;
- session->common.extensions = cmd_session->extensions;
- cmd_session->key = NULL;
- cmd_session->extensions = NULL;
- memcpy(session->common.nm, cmd_session->nm, sizeof(session->common.nm));
-
- /* Process command - replies will go to TCP queue via tcp_session pointer */
rspamd_fuzzy_process_command(cmd_session);
}
else {
msg_debug_fuzzy_storage("invalid TCP fuzzy command of size %d received from %s",
(int) frame_len,
rspamd_inet_address_to_string(session->common.addr));
- /* Note: Don't release session here - cmd_session holds a reference and will release it */
}
- /* Release our reference - session will be freed when all callbacks complete */
REF_RELEASE(cmd_session);
processed_offset += frame_len;