]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Fuzzy storage: implement TCP protocol support
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 07:56:32 +0000 (08:56 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 7 Oct 2025 07:56:32 +0000 (08:56 +0100)
Implement TCP transport for fuzzy storage protocol to enable efficient
bulk request handling. This adds TCP accept handlers, frame-based I/O
processing, and proper session management.

Changes:
- Add TCP session structure with framing state machine
- Implement TCP accept handler with rate limiting and access control
- Add TCP I/O handler supporting frame-based protocol (size header + payload)
- Implement TCP write reply with queuing support
- Add TCP timeout configuration parameter (default: 5.0 seconds)
- Refactor rate limit checks to accept parameters instead of session objects
- Update worker socket type to support both UDP and TCP
- Add debug logging infrastructure for fuzzy storage

TCP framing protocol: [uint16_t size][encrypted_payload]
Frame processing uses state machine: 0x0000 (idle) -> 0x8000 (have size) -> 0xC000 (complete)

src/fuzzy_storage.c

index d6836df3bdd2fc2863b6503585e40aa391c1f617..8d97ebca8d13f0551cd137f3c5b677c9d2897e74 100644 (file)
@@ -50,6 +50,9 @@
 #define DEFAULT_BUCKET_MASK 24
 /* Update stats on keys each 1 hour */
 #define KEY_STAT_INTERVAL 3600.0
+/* TCP constants */
+#define FUZZY_TCP_BUFFER_LENGTH 8192
+#define DEFAULT_TCP_TIMEOUT 5.0
 
 static const char *local_db_name = "local";
 
@@ -57,13 +60,19 @@ static const char *local_db_name = "local";
 gpointer init_fuzzy(struct rspamd_config *cfg);
 void start_fuzzy(struct rspamd_worker *worker);
 
+#define msg_debug_fuzzy_storage(...) rspamd_conditional_debug_fast(NULL, NULL,                                         \
+                                                                                                                                  rspamd_fuzzy_storage_log_id, "fuzzy_storage", NULL, \
+                                                                                                                                  RSPAMD_LOG_FUNC,                                    \
+                                                                                                                                  __VA_ARGS__)
+INIT_LOG_MODULE(fuzzy_storage)
+
 worker_t fuzzy_worker = {
        "fuzzy",     /* Name */
        init_fuzzy,  /* Init function */
        start_fuzzy, /* Start function */
        RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_NO_STRICT_CONFIG | RSPAMD_WORKER_FUZZY,
-       RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */
-       RSPAMD_WORKER_VER         /* Version info */
+       RSPAMD_WORKER_SOCKET_UDP | RSPAMD_WORKER_SOCKET_TCP, /* UDP + TCP socket */
+       RSPAMD_WORKER_VER                                    /* Version info */
 };
 
 struct fuzzy_global_stat {
@@ -171,6 +180,7 @@ struct rspamd_fuzzy_storage_ctx {
        double expire;
        double sync_timeout;
        double delay;
+       double tcp_timeout;
        struct rspamd_radix_map_helper *update_ips;
        struct rspamd_hash_map_helper *update_keys;
        struct rspamd_radix_map_helper *blocked_ips;
@@ -235,6 +245,63 @@ enum fuzzy_cmd_type {
        CMD_ENCRYPTED_SHINGLE
 };
 
+struct rspamd_fuzzy_tcp_frame {
+       uint16_t size_hdr;                           /* We have to write this as well */
+       struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
+};
+
+struct fuzzy_tcp_reply_queue_elt {
+       struct rspamd_fuzzy_tcp_frame rep;             /* Serialized reply */
+       unsigned int written;                          /* How many bytes have we already written */
+       struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */
+};
+
+struct fuzzy_common_session {
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       int fd;
+       struct ev_io io;
+       ev_tstamp timestamp;
+       struct rspamd_worker *worker;
+       rspamd_inet_addr_t *addr;
+
+       enum rspamd_fuzzy_epoch epoch;
+       enum fuzzy_cmd_type cmd_type;
+       struct rspamd_fuzzy_shingle_cmd cmd;
+       struct fuzzy_key *key;
+       struct rspamd_fuzzy_cmd_extension *extensions;
+       struct fuzzy_key_stat *ip_stat;
+       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+};
+
+struct fuzzy_tcp_session {
+       struct ev_timer tm;
+
+       /*
+        * We store the state in the current frame
+        * 0 0 x x x x x x x x x x x x x x x - initial
+        * 1 0 x x x x x x x x x x x x x x x - read 1 byte of length
+        * 1 1 x x x x x x x x x x x x x x x - read 2 bytes of length
+        * So the length is always cur_frame & 0x3fff
+        */
+       uint16_t cur_frame_state;
+       uint16_t bytes_unprocessed;
+
+       /* Common with UDP session */
+       struct fuzzy_common_session common;
+       ref_entry_t ref;
+
+       struct fuzzy_tcp_reply_queue_elt *replies_queue;
+       unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
+};
+
+struct fuzzy_udp_session {
+       /* Common fields with TCP session */
+       struct fuzzy_common_session common;
+       struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
+       ref_entry_t ref;
+};
+
+/* Legacy structure name for compatibility during refactoring */
 struct fuzzy_session {
        struct rspamd_worker *worker;
        rspamd_inet_addr_t *addr;
@@ -269,6 +336,9 @@ struct rspamd_updates_cbdata {
 
 
 static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
+static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
+static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
+                                                                                struct fuzzy_tcp_reply_queue_elt *reply);
 static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
                                                                                                   const char *source, gboolean final);
 static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
@@ -279,6 +349,8 @@ static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx
 static struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg,
                                                                                                        const ucl_object_t *obj,
                                                                                                        khash_t(rspamd_fuzzy_keys_hash) * target);
+static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
+static void accept_tcp_socket(EV_P_ ev_io *w, int revents);
 
 static ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt);
 struct fuzzy_keymap_ucl_buf {
@@ -436,8 +508,12 @@ enum rspamd_ratelimit_check_policy {
 };
 
 static enum rspamd_ratelimit_check_result
-rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd_leaky_bucket_elt *elt,
-                                                                       enum rspamd_ratelimit_check_policy policy, double max_burst, double max_rate)
+rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                                       rspamd_inet_addr_t *addr,
+                                                                       ev_tstamp timestamp,
+                                                                       struct rspamd_leaky_bucket_elt *elt,
+                                                                       enum rspamd_ratelimit_check_policy policy,
+                                                                       double max_burst, double max_rate)
 {
        gboolean ratelimited = FALSE, new_ratelimit = FALSE;
 
@@ -450,13 +526,13 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd
                /* There is an issue with the previous logic: the TTL is updated each time
                 * we see that new bucket. Hence, we need to check the `last` and act accordingly
                 */
-               if (elt->last < session->timestamp && session->timestamp - elt->last >= session->ctx->leaky_bucket_ttl) {
+               if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
                        /*
                                 * We reset bucket to it's 90% capacity to allow some requests
                                 * This should cope with the issue when we block an IP network for some burst and never unblock it
                                 */
                        elt->cur = max_burst * 0.9;
-                       elt->last = session->timestamp;
+                       elt->last = timestamp;
                }
                else {
                        ratelimited = TRUE;
@@ -464,16 +540,16 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd
        }
        else {
                /* Update bucket: leak some elements */
-               if (elt->last < session->timestamp) {
-                       elt->cur -= max_rate * (session->timestamp - elt->last);
-                       elt->last = session->timestamp;
+               if (elt->last < timestamp) {
+                       elt->cur -= max_rate * (timestamp - elt->last);
+                       elt->last = timestamp;
 
                        if (elt->cur < 0) {
                                elt->cur = 0;
                        }
                }
                else {
-                       elt->last = session->timestamp;
+                       elt->last = timestamp;
                }
 
                /* Check the bucket */
@@ -491,7 +567,7 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd
        }
 
        if (ratelimited) {
-               rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+               rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
        }
 
        if (new_ratelimit) {
@@ -502,54 +578,58 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd
 }
 
 static gboolean
-rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
+rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                        rspamd_inet_addr_t *addr,
+                                                        struct rspamd_worker *worker,
+                                                        ev_tstamp timestamp)
 {
        rspamd_inet_addr_t *masked;
        struct rspamd_leaky_bucket_elt *elt;
 
-       if (!session->addr) {
+       if (!addr) {
                return TRUE;
        }
 
-       if (session->ctx->ratelimit_whitelist != NULL) {
-               if (rspamd_match_radix_map_addr(session->ctx->ratelimit_whitelist,
-                                                                               session->addr) != NULL) {
+       if (ctx->ratelimit_whitelist != NULL) {
+               if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
+                                                                               addr) != NULL) {
                        return TRUE;
                }
        }
 
        /*
-       if (rspamd_inet_address_is_local (session->addr, TRUE)) {
+       if (rspamd_inet_address_is_local (addr, TRUE)) {
                return TRUE;
        }
        */
 
-       masked = rspamd_inet_address_copy(session->addr, NULL);
+       masked = rspamd_inet_address_copy(addr, NULL);
 
        if (rspamd_inet_address_get_af(masked) == AF_INET) {
                rspamd_inet_address_apply_mask(masked,
-                                                                          MIN(session->ctx->leaky_bucket_mask, 32));
+                                                                          MIN(ctx->leaky_bucket_mask, 32));
        }
        else {
                /* Must be at least /64 */
                rspamd_inet_address_apply_mask(masked,
-                                                                          MIN(MAX(session->ctx->leaky_bucket_mask * 4, 64), 128));
+                                                                          MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
        }
 
-       elt = rspamd_lru_hash_lookup(session->ctx->ratelimit_buckets, masked,
-                                                                (time_t) session->timestamp);
+       elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
+                                                                (time_t) timestamp);
 
        if (elt) {
-               enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session, elt,
+               enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr,
+                                                                                                                                                                        timestamp, elt,
                                                                                                                                                                         ratelimit_policy_permanent,
-                                                                                                                                                                        session->ctx->leaky_bucket_burst,
-                                                                                                                                                                        session->ctx->leaky_bucket_rate);
+                                                                                                                                                                        ctx->leaky_bucket_burst,
+                                                                                                                                                                        ctx->leaky_bucket_rate);
 
                if (res == ratelimit_new) {
                        msg_info("ratelimiting %s (%s), %.1f max elts",
-                                        rspamd_inet_address_to_string(session->addr),
+                                        rspamd_inet_address_to_string(addr),
                                         rspamd_inet_address_to_string(masked),
-                                        session->ctx->leaky_bucket_burst);
+                                        ctx->leaky_bucket_burst);
 
                        struct rspamd_srv_command srv_cmd;
 
@@ -563,17 +643,18 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                                if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
                                        memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
                                        msg_debug("propagating blocked address to other workers");
-                                       rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+                                       rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
                                }
                                else {
-                                       msg_err("bad address length: %d, expected to be %d", (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+                                       msg_err("bad address length: %d, expected to be %d",
+                                                       (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
                                }
                        }
 
-                       rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+                       rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
                }
                else if (res == ratelimit_existing) {
-                       rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+                       rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
                }
 
                rspamd_inet_address_free(masked);
@@ -585,13 +666,13 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                elt = g_malloc(sizeof(*elt));
                elt->addr = masked; /* transfer ownership */
                elt->cur = 1;
-               elt->last = session->timestamp;
+               elt->last = timestamp;
 
-               rspamd_lru_hash_insert(session->ctx->ratelimit_buckets,
+               rspamd_lru_hash_insert(ctx->ratelimit_buckets,
                                                           masked,
                                                           elt,
-                                                          session->timestamp,
-                                                          session->ctx->leaky_bucket_ttl);
+                                                          timestamp,
+                                                          ctx->leaky_bucket_ttl);
        }
 
        return TRUE;
@@ -645,21 +726,24 @@ rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
 }
 
 static gboolean
-rspamd_fuzzy_check_write(struct fuzzy_session *session, uint8_t cmd)
+rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                rspamd_inet_addr_t *addr,
+                                                struct fuzzy_key *key,
+                                                uint8_t cmd)
 {
-       if (session->ctx->read_only) {
+       if (ctx->read_only) {
                return FALSE;
        }
 
        /*
         * Check IP first
         */
-       if (session->ctx->update_ips != NULL && session->addr) {
-               if (rspamd_inet_address_get_af(session->addr) == AF_UNIX) {
+       if (ctx->update_ips != NULL && addr) {
+               if (rspamd_inet_address_get_af(addr) == AF_UNIX) {
                        return TRUE;
                }
-               if (rspamd_match_radix_map_addr(session->ctx->update_ips,
-                                                                               session->addr) == NULL) {
+               if (rspamd_match_radix_map_addr(ctx->update_ips,
+                                                                               addr) == NULL) {
                        return FALSE;
                }
                else {
@@ -670,25 +754,25 @@ rspamd_fuzzy_check_write(struct fuzzy_session *session, uint8_t cmd)
        /*
         * Check global list of the update keys
         */
-       if (session->ctx->update_keys != NULL && session->key->stat && session->key->key) {
+       if (ctx->update_keys != NULL && key && key->stat && key->key) {
                static char base32_buf[rspamd_cryptobox_HASHBYTES * 2 + 1];
                unsigned int raw_len;
-               const unsigned char *pk_raw = rspamd_keypair_component(session->key->key,
+               const unsigned char *pk_raw = rspamd_keypair_component(key->key,
                                                                                                                           RSPAMD_KEYPAIR_COMPONENT_ID, &raw_len);
                int encoded_len = rspamd_encode_base32_buf(pk_raw, raw_len,
                                                                                                   base32_buf, sizeof(base32_buf),
                                                                                                   RSPAMD_BASE32_DEFAULT);
 
-               if (rspamd_match_hash_map(session->ctx->update_keys, base32_buf, encoded_len)) {
+               if (rspamd_match_hash_map(ctx->update_keys, base32_buf, encoded_len)) {
                        return TRUE;
                }
        }
 
-       if (session->key) {
-               if (cmd == FUZZY_WRITE && session->key->flags & FUZZY_KEY_WRITE) {
+       if (key) {
+               if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) {
                        return TRUE;
                }
-               else if (cmd == FUZZY_DEL && session->key->flags & FUZZY_KEY_DELETE) {
+               else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) {
                        return TRUE;
                }
        }
@@ -1632,17 +1716,20 @@ rspamd_fuzzy_process_command(struct fuzzy_session *session)
 
                if (session->ctx->ratelimit_buckets) {
                        if (session->ctx->ratelimit_log_only) {
-                               (void) rspamd_fuzzy_check_ratelimit(session); /* Check but ignore */
+                               (void) rspamd_fuzzy_check_ratelimit(session->ctx, session->addr,
+                                                                                                       session->worker, session->timestamp); /* Check but ignore */
                        }
                        else {
-                               is_rate_allowed = rspamd_fuzzy_check_ratelimit(session);
+                               is_rate_allowed = rspamd_fuzzy_check_ratelimit(session->ctx, session->addr,
+                                                                                                                          session->worker, session->timestamp);
                        }
                }
 
                if (session->key && session->key->rl_bucket) {
                        /* Check per-key bucket */
 
-                       enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session, session->key->rl_bucket,
+                       enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session->ctx, session->addr,
+                                                                                                                                                                                session->timestamp, session->key->rl_bucket,
                                                                                                                                                                                 ratelimit_policy_normal,
                                                                                                                                                                                 session->key->burst,
                                                                                                                                                                                 session->key->rate);
@@ -1745,7 +1832,7 @@ rspamd_fuzzy_process_command(struct fuzzy_session *session)
                rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
        }
        else {
-               if (rspamd_fuzzy_check_write(session, cmd->cmd)) {
+               if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key, cmd->cmd)) {
                        /* Check whitelist */
                        if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
                                rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
@@ -2178,6 +2265,74 @@ fuzzy_session_destroy(gpointer d)
        g_free(session);
 }
 
+static void
+fuzzy_tcp_session_destroy(gpointer d)
+{
+       struct fuzzy_tcp_session *session = d;
+
+       msg_debug_fuzzy_storage("destroying TCP session from %s",
+                                                       rspamd_inet_address_to_string(session->common.addr));
+
+       if (ev_can_stop(&session->common.io)) {
+               ev_io_stop(session->common.ctx->event_loop, &session->common.io);
+       }
+
+       if (ev_can_stop(&session->tm)) {
+               ev_timer_stop(session->common.ctx->event_loop, &session->tm);
+       }
+
+       /* Free replies queue */
+       struct fuzzy_tcp_reply_queue_elt *elt, *tmp;
+       DL_FOREACH_SAFE(session->replies_queue, elt, tmp)
+       {
+               DL_DELETE(session->replies_queue, elt);
+               g_free(elt);
+       }
+
+       close(session->common.fd);
+       rspamd_inet_address_free(session->common.addr);
+       rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm));
+       session->common.worker->nconns--;
+
+       if (session->common.ip_stat) {
+               REF_RELEASE(session->common.ip_stat);
+       }
+
+       if (session->common.extensions) {
+               g_free(session->common.extensions);
+       }
+
+       if (session->common.key) {
+               REF_RELEASE(session->common.key);
+       }
+
+       g_free(session);
+}
+
+static void
+fuzzy_udp_session_destroy(gpointer d)
+{
+       struct fuzzy_udp_session *session = d;
+
+       rspamd_inet_address_free(session->common.addr);
+       rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm));
+       session->common.worker->nconns--;
+
+       if (session->common.ip_stat) {
+               REF_RELEASE(session->common.ip_stat);
+       }
+
+       if (session->common.extensions) {
+               g_free(session->common.extensions);
+       }
+
+       if (session->common.key) {
+               REF_RELEASE(session->common.key);
+       }
+
+       g_free(session);
+}
+
 #define FUZZY_INPUT_BUFLEN 1024
 #ifdef HAVE_RECVMMSG
 #define MSGVEC_LEN 16
@@ -2321,6 +2476,299 @@ accept_fuzzy_socket(EV_P_ ev_io *w, int revents)
        }
 }
 
+/* TCP-specific reply and I/O handlers */
+
+static void
+rspamd_fuzzy_tcp_timeout(EV_P_ ev_timer *w, int revents)
+{
+       struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) w->data;
+
+       msg_debug_fuzzy_storage("TCP session from %s timed out",
+                                                       rspamd_inet_address_to_string(session->common.addr));
+
+       REF_RELEASE(session);
+}
+
+static bool
+rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
+                                                        struct fuzzy_tcp_reply_queue_elt *reply)
+{
+       gssize r;
+       gsize total_len = sizeof(reply->rep.size_hdr) + ntohs(reply->rep.size_hdr);
+       gsize remaining = total_len - reply->written;
+       unsigned char *data = ((unsigned char *) &reply->rep) + reply->written;
+
+       r = write(session->common.fd, data, remaining);
+
+       if (r == -1) {
+               if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+                       return false;
+               }
+               else {
+                       msg_err("error while writing TCP reply: %s", strerror(errno));
+                       return false;
+               }
+       }
+
+       reply->written += r;
+
+       if (reply->written >= total_len) {
+               /* Reply fully sent */
+               DL_DELETE(session->replies_queue, reply);
+               g_free(reply);
+
+               msg_debug_fuzzy_storage("TCP reply sent to %s, %z bytes",
+                                                               rspamd_inet_address_to_string(session->common.addr),
+                                                               (size_t) r);
+
+               return true;
+       }
+
+       return false;
+}
+
+static void
+rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents)
+{
+       struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) w->data;
+       gssize r;
+
+       if (revents & EV_READ) {
+               /* Read available data */
+               r = read(session->common.fd,
+                                session->input_buf + session->bytes_unprocessed,
+                                sizeof(session->input_buf) - session->bytes_unprocessed);
+
+               if (r <= 0) {
+                       if (r == -1) {
+                               if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+                                       return;
+                               }
+                               msg_debug_fuzzy_storage("read error on TCP connection from %s: %s",
+                                                                               rspamd_inet_address_to_string(session->common.addr),
+                                                                               strerror(errno));
+                       }
+                       else {
+                               msg_debug_fuzzy_storage("TCP connection from %s closed by peer",
+                                                                               rspamd_inet_address_to_string(session->common.addr));
+                       }
+
+                       REF_RELEASE(session);
+                       return;
+               }
+
+               session->bytes_unprocessed += r;
+               session->common.timestamp = ev_now(session->common.ctx->event_loop);
+
+               /* Reset timeout */
+               ev_timer_again(EV_A_ & session->tm);
+
+               /* Process frames */
+               unsigned int processed_offset = 0;
+
+               while (processed_offset < session->bytes_unprocessed) {
+                       uint16_t frame_len;
+
+                       /* Check frame state */
+                       if ((session->cur_frame_state & 0xC000) == 0x0000) {
+                               /* Need to read first byte of length */
+                               if (processed_offset < session->bytes_unprocessed) {
+                                       session->cur_frame_state = 0x8000 | session->input_buf[processed_offset];
+                                       processed_offset++;
+                               }
+                               else {
+                                       break;
+                               }
+                       }
+
+                       if ((session->cur_frame_state & 0xC000) == 0x8000) {
+                               /* Need to read second byte of length */
+                               if (processed_offset < session->bytes_unprocessed) {
+                                       uint16_t first_byte = session->cur_frame_state & 0xFF;
+                                       uint16_t second_byte = session->input_buf[processed_offset];
+                                       session->cur_frame_state = 0xC000 | ((first_byte << 8) | second_byte);
+                                       processed_offset++;
+                               }
+                               else {
+                                       break;
+                               }
+                       }
+
+                       /* Now we have full length */
+                       frame_len = session->cur_frame_state & 0x3FFF;
+
+                       if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_shingle_cmd)) {
+                               msg_err("invalid frame length %d from %s, closing connection",
+                                               (int) frame_len,
+                                               rspamd_inet_address_to_string(session->common.addr));
+                               REF_RELEASE(session);
+                               return;
+                       }
+
+                       /* Check if we have complete frame */
+                       if (session->bytes_unprocessed - processed_offset >= frame_len) {
+                               /* Process this frame using legacy session temporarily */
+                               struct fuzzy_session legacy_session;
+
+                               memset(&legacy_session, 0, sizeof(legacy_session));
+                               legacy_session.worker = session->common.worker;
+                               legacy_session.addr = session->common.addr;
+                               legacy_session.ctx = session->common.ctx;
+                               legacy_session.fd = session->common.fd;
+                               legacy_session.timestamp = session->common.timestamp;
+                               legacy_session.key = session->common.key;
+                               legacy_session.ip_stat = session->common.ip_stat;
+                               memcpy(legacy_session.nm, session->common.nm, sizeof(legacy_session.nm));
+
+                               if (rspamd_fuzzy_cmd_from_wire(session->input_buf + processed_offset,
+                                                                                          frame_len, &legacy_session)) {
+                                       /* Copy parsed data back */
+                                       session->common.epoch = legacy_session.epoch;
+                                       session->common.cmd_type = legacy_session.cmd_type;
+                                       memcpy(&session->common.cmd, &legacy_session.cmd, sizeof(session->common.cmd));
+                                       session->common.key = legacy_session.key;
+                                       session->common.extensions = legacy_session.extensions;
+                                       memcpy(session->common.nm, legacy_session.nm, sizeof(session->common.nm));
+
+                                       /* Process command - this will need to be adapted for TCP */
+                                       rspamd_fuzzy_process_command(&legacy_session);
+                               }
+                               else {
+                                       session->common.ctx->stat.invalid_requests++;
+                                       msg_debug_fuzzy_storage("invalid TCP fuzzy command of size %d received from %s",
+                                                                                       (int) frame_len,
+                                                                                       rspamd_inet_address_to_string(session->common.addr));
+                               }
+
+                               processed_offset += frame_len;
+                               session->cur_frame_state = 0x0000; /* Reset for next frame */
+                       }
+                       else {
+                               /* Incomplete frame, wait for more data */
+                               break;
+                       }
+               }
+
+               /* Move unprocessed data to the beginning */
+               if (processed_offset > 0) {
+                       if (processed_offset < session->bytes_unprocessed) {
+                               memmove(session->input_buf,
+                                               session->input_buf + processed_offset,
+                                               session->bytes_unprocessed - processed_offset);
+                               session->bytes_unprocessed -= processed_offset;
+                       }
+                       else {
+                               session->bytes_unprocessed = 0;
+                       }
+               }
+       }
+
+       if (revents & EV_WRITE) {
+               /* Write pending replies */
+               struct fuzzy_tcp_reply_queue_elt *elt;
+
+               while ((elt = session->replies_queue) != NULL) {
+                       if (!rspamd_fuzzy_tcp_write_reply(session, elt)) {
+                               /* Cannot write more, wait for next write event */
+                               return;
+                       }
+               }
+
+               /* All replies sent, disable write event */
+               ev_io_stop(EV_A_ w);
+               ev_io_set(w, w->fd, EV_READ);
+               ev_io_start(EV_A_ w);
+       }
+}
+
+static void
+accept_tcp_socket(EV_P_ ev_io *w, int revents)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct fuzzy_tcp_session *session;
+       rspamd_inet_addr_t *addr = NULL;
+       int nfd;
+
+       ctx = (struct rspamd_fuzzy_storage_ctx *) worker->ctx;
+
+       if ((nfd = rspamd_accept_from_socket(w->fd, &addr,
+                                                                                rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
+               msg_warn("TCP accept failed: %s", strerror(errno));
+               return;
+       }
+
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               if (addr) {
+                       rspamd_inet_address_free(addr);
+               }
+               return;
+       }
+
+       ev_now_update_if_cheap(ctx->event_loop);
+
+       /* Check ratelimit */
+       if (!rspamd_fuzzy_check_ratelimit(ctx, addr, worker, ev_now(ctx->event_loop))) {
+               msg_info("ratelimiting TCP connection from %s",
+                                rspamd_inet_address_to_string(addr));
+               rspamd_inet_address_free(addr);
+               close(nfd);
+               return;
+       }
+
+       /* Check if client is allowed */
+       if (!rspamd_fuzzy_check_client(ctx, addr)) {
+               msg_info("refusing TCP connection from %s (blacklisted)",
+                                rspamd_inet_address_to_string(addr));
+               rspamd_inet_address_free(addr);
+               close(nfd);
+               return;
+       }
+
+       /* Set TCP_NODELAY */
+#ifdef TCP_NODELAY
+       {
+               int sopt = 1;
+               if (setsockopt(nfd, IPPROTO_TCP, TCP_NODELAY, &sopt, sizeof(sopt)) == -1) {
+                       msg_warn("cannot set TCP_NODELAY for %s: %s",
+                                        rspamd_inet_address_to_string(addr),
+                                        strerror(errno));
+               }
+       }
+#endif
+
+       /* Create session */
+       session = g_malloc0(sizeof(*session));
+       REF_INIT_RETAIN(session, fuzzy_tcp_session_destroy);
+
+       session->common.ctx = ctx;
+       session->common.worker = worker;
+       session->common.addr = addr;
+       session->common.fd = nfd;
+       session->common.timestamp = ev_now(ctx->event_loop);
+
+       session->cur_frame_state = 0x0000;
+       session->bytes_unprocessed = 0;
+       session->replies_queue = NULL;
+
+       worker->nconns++;
+
+       msg_debug_fuzzy_storage("accepted TCP connection from %s",
+                                                       rspamd_inet_address_to_string(addr));
+
+       /* Setup I/O watcher */
+       session->common.io.data = session;
+       ev_io_init(&session->common.io, rspamd_fuzzy_tcp_io, nfd, EV_READ);
+       ev_io_start(ctx->event_loop, &session->common.io);
+
+       /* Setup timeout */
+       session->tm.data = session;
+       ev_timer_init(&session->tm, rspamd_fuzzy_tcp_timeout,
+                                 ctx->tcp_timeout, ctx->tcp_timeout);
+       ev_timer_start(ctx->event_loop, &session->tm);
+}
+
 static gboolean
 rspamd_fuzzy_storage_periodic_callback(void *ud)
 {
@@ -3268,6 +3716,7 @@ init_fuzzy(struct rspamd_config *cfg)
        ctx->leaky_bucket_burst = NAN;
        ctx->leaky_bucket_rate = NAN;
        ctx->delay = NAN;
+       ctx->tcp_timeout = DEFAULT_TCP_TIMEOUT;
        ctx->default_forbidden_ids = kh_init(fuzzy_key_ids_set);
        ctx->weak_ids = kh_init(fuzzy_key_ids_set);
 
@@ -3301,6 +3750,16 @@ init_fuzzy(struct rspamd_config *cfg)
                                                                          RSPAMD_CL_FLAG_TIME_FLOAT,
                                                                          "Default delay time for hashes, default: not enabled");
 
+       rspamd_rcl_register_worker_option(cfg,
+                                                                         type,
+                                                                         "tcp_timeout",
+                                                                         rspamd_rcl_parse_struct_time,
+                                                                         ctx,
+                                                                         G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx,
+                                                                                                         tcp_timeout),
+                                                                         RSPAMD_CL_FLAG_TIME_FLOAT,
+                                                                         "TCP connection timeout, default: " G_STRINGIFY(DEFAULT_TCP_TIMEOUT) " seconds");
+
        rspamd_rcl_register_worker_option(cfg,
                                                                          type,
                                                                          "allow_update",
@@ -3557,8 +4016,17 @@ fuzzy_peer_rep(struct rspamd_worker *worker,
                                ev_io_start(ctx->event_loop, &ac_ev->accept_ev);
                                DL_APPEND(worker->accept_events, ac_ev);
                        }
+                       else if (ls->type == RSPAMD_WORKER_SOCKET_TCP) {
+                               ac_ev = g_malloc0(sizeof(*ac_ev));
+                               ac_ev->accept_ev.data = worker;
+                               ac_ev->event_loop = ctx->event_loop;
+                               ev_io_init(&ac_ev->accept_ev, accept_tcp_socket, ls->fd,
+                                                  EV_READ);
+                               ev_io_start(ctx->event_loop, &ac_ev->accept_ev);
+                               DL_APPEND(worker->accept_events, ac_ev);
+                       }
                        else {
-                               /* We allow TCP listeners only for a update worker */
+                               /* Unknown socket type */
                                g_assert_not_reached();
                        }
                }