From: Vsevolod Stakhov Date: Tue, 7 Oct 2025 07:56:32 +0000 (+0100) Subject: [Feature] Fuzzy storage: implement TCP protocol support X-Git-Tag: 3.14.0~84^2~20 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=943668cd59ca248fae035513e263d163de4f88f2;p=thirdparty%2Frspamd.git [Feature] Fuzzy storage: implement TCP protocol support Implement TCP transport for fuzzy storage protocol to enable efficient bulk request handling. This adds TCP accept handlers, frame-based I/O processing, and proper session management. Changes: - Add TCP session structure with framing state machine - Implement TCP accept handler with rate limiting and access control - Add TCP I/O handler supporting frame-based protocol (size header + payload) - Implement TCP write reply with queuing support - Add TCP timeout configuration parameter (default: 5.0 seconds) - Refactor rate limit checks to accept parameters instead of session objects - Update worker socket type to support both UDP and TCP - Add debug logging infrastructure for fuzzy storage TCP framing protocol: [uint16_t size][encrypted_payload] Frame processing uses state machine: 0x0000 (idle) -> 0x8000 (have size) -> 0xC000 (complete) --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index d6836df3bd..8d97ebca8d 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -50,6 +50,9 @@ #define DEFAULT_BUCKET_MASK 24 /* Update stats on keys each 1 hour */ #define KEY_STAT_INTERVAL 3600.0 +/* TCP constants */ +#define FUZZY_TCP_BUFFER_LENGTH 8192 +#define DEFAULT_TCP_TIMEOUT 5.0 static const char *local_db_name = "local"; @@ -57,13 +60,19 @@ static const char *local_db_name = "local"; gpointer init_fuzzy(struct rspamd_config *cfg); void start_fuzzy(struct rspamd_worker *worker); +#define msg_debug_fuzzy_storage(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_fuzzy_storage_log_id, "fuzzy_storage", NULL, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +INIT_LOG_MODULE(fuzzy_storage) + worker_t fuzzy_worker = { "fuzzy", /* Name */ init_fuzzy, /* Init function */ start_fuzzy, /* Start function */ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_NO_STRICT_CONFIG | RSPAMD_WORKER_FUZZY, - RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */ - RSPAMD_WORKER_VER /* Version info */ + RSPAMD_WORKER_SOCKET_UDP | RSPAMD_WORKER_SOCKET_TCP, /* UDP + TCP socket */ + RSPAMD_WORKER_VER /* Version info */ }; struct fuzzy_global_stat { @@ -171,6 +180,7 @@ struct rspamd_fuzzy_storage_ctx { double expire; double sync_timeout; double delay; + double tcp_timeout; struct rspamd_radix_map_helper *update_ips; struct rspamd_hash_map_helper *update_keys; struct rspamd_radix_map_helper *blocked_ips; @@ -235,6 +245,63 @@ enum fuzzy_cmd_type { CMD_ENCRYPTED_SHINGLE }; +struct rspamd_fuzzy_tcp_frame { + uint16_t size_hdr; /* We have to write this as well */ + struct rspamd_fuzzy_encrypted_reply payload; /* Payload */ +}; + +struct fuzzy_tcp_reply_queue_elt { + struct rspamd_fuzzy_tcp_frame rep; /* Serialized reply */ + unsigned int written; /* How many bytes have we already written */ + struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */ +}; + +struct fuzzy_common_session { + struct rspamd_fuzzy_storage_ctx *ctx; + int fd; + struct ev_io io; + ev_tstamp timestamp; + struct rspamd_worker *worker; + rspamd_inet_addr_t *addr; + + enum rspamd_fuzzy_epoch epoch; + enum fuzzy_cmd_type cmd_type; + struct rspamd_fuzzy_shingle_cmd cmd; + struct fuzzy_key *key; + struct rspamd_fuzzy_cmd_extension *extensions; + struct fuzzy_key_stat *ip_stat; + unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; +}; + +struct fuzzy_tcp_session { + struct ev_timer tm; + + /* + * We store the state in the current frame + * 0 0 x x x x x x x x x x x x x x x - initial + * 1 0 x x x x x x x x x x x x x x x - read 1 byte of length + * 1 1 x x x x x x x x x x x x x x x - read 2 bytes of length + * So the length is always cur_frame & 0x3fff + */ + uint16_t cur_frame_state; + uint16_t bytes_unprocessed; + + /* Common with UDP session */ + struct fuzzy_common_session common; + ref_entry_t ref; + + struct fuzzy_tcp_reply_queue_elt *replies_queue; + unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH]; +}; + +struct fuzzy_udp_session { + /* Common fields with TCP session */ + struct fuzzy_common_session common; + struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */ + ref_entry_t ref; +}; + +/* Legacy structure name for compatibility during refactoring */ struct fuzzy_session { struct rspamd_worker *worker; rspamd_inet_addr_t *addr; @@ -269,6 +336,9 @@ struct rspamd_updates_cbdata { static void rspamd_fuzzy_write_reply(struct fuzzy_session *session); +static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session); +static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, + struct fuzzy_tcp_reply_queue_elt *reply); static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx, const char *source, gboolean final); static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, @@ -279,6 +349,8 @@ static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx static struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg, const ucl_object_t *obj, khash_t(rspamd_fuzzy_keys_hash) * target); +static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents); +static void accept_tcp_socket(EV_P_ ev_io *w, int revents); static ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt); struct fuzzy_keymap_ucl_buf { @@ -436,8 +508,12 @@ enum rspamd_ratelimit_check_policy { }; static enum rspamd_ratelimit_check_result -rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd_leaky_bucket_elt *elt, - enum rspamd_ratelimit_check_policy policy, double max_burst, double max_rate) +rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + ev_tstamp timestamp, + struct rspamd_leaky_bucket_elt *elt, + enum rspamd_ratelimit_check_policy policy, + double max_burst, double max_rate) { gboolean ratelimited = FALSE, new_ratelimit = FALSE; @@ -450,13 +526,13 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd /* There is an issue with the previous logic: the TTL is updated each time * we see that new bucket. Hence, we need to check the `last` and act accordingly */ - if (elt->last < session->timestamp && session->timestamp - elt->last >= session->ctx->leaky_bucket_ttl) { + if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) { /* * We reset bucket to it's 90% capacity to allow some requests * This should cope with the issue when we block an IP network for some burst and never unblock it */ elt->cur = max_burst * 0.9; - elt->last = session->timestamp; + elt->last = timestamp; } else { ratelimited = TRUE; @@ -464,16 +540,16 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd } else { /* Update bucket: leak some elements */ - if (elt->last < session->timestamp) { - elt->cur -= max_rate * (session->timestamp - elt->last); - elt->last = session->timestamp; + if (elt->last < timestamp) { + elt->cur -= max_rate * (timestamp - elt->last); + elt->last = timestamp; if (elt->cur < 0) { elt->cur = 0; } } else { - elt->last = session->timestamp; + elt->last = timestamp; } /* Check the bucket */ @@ -491,7 +567,7 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd } if (ratelimited) { - rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit"); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); } if (new_ratelimit) { @@ -502,54 +578,58 @@ rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd } static gboolean -rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) +rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + struct rspamd_worker *worker, + ev_tstamp timestamp) { rspamd_inet_addr_t *masked; struct rspamd_leaky_bucket_elt *elt; - if (!session->addr) { + if (!addr) { return TRUE; } - if (session->ctx->ratelimit_whitelist != NULL) { - if (rspamd_match_radix_map_addr(session->ctx->ratelimit_whitelist, - session->addr) != NULL) { + if (ctx->ratelimit_whitelist != NULL) { + if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist, + addr) != NULL) { return TRUE; } } /* - if (rspamd_inet_address_is_local (session->addr, TRUE)) { + if (rspamd_inet_address_is_local (addr, TRUE)) { return TRUE; } */ - masked = rspamd_inet_address_copy(session->addr, NULL); + masked = rspamd_inet_address_copy(addr, NULL); if (rspamd_inet_address_get_af(masked) == AF_INET) { rspamd_inet_address_apply_mask(masked, - MIN(session->ctx->leaky_bucket_mask, 32)); + MIN(ctx->leaky_bucket_mask, 32)); } else { /* Must be at least /64 */ rspamd_inet_address_apply_mask(masked, - MIN(MAX(session->ctx->leaky_bucket_mask * 4, 64), 128)); + MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128)); } - elt = rspamd_lru_hash_lookup(session->ctx->ratelimit_buckets, masked, - (time_t) session->timestamp); + elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked, + (time_t) timestamp); if (elt) { - enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session, elt, + enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr, + timestamp, elt, ratelimit_policy_permanent, - session->ctx->leaky_bucket_burst, - session->ctx->leaky_bucket_rate); + ctx->leaky_bucket_burst, + ctx->leaky_bucket_rate); if (res == ratelimit_new) { msg_info("ratelimiting %s (%s), %.1f max elts", - rspamd_inet_address_to_string(session->addr), + rspamd_inet_address_to_string(addr), rspamd_inet_address_to_string(masked), - session->ctx->leaky_bucket_burst); + ctx->leaky_bucket_burst); struct rspamd_srv_command srv_cmd; @@ -563,17 +643,18 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) { memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen); msg_debug("propagating blocked address to other workers"); - rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL); + rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); } else { - msg_err("bad address length: %d, expected to be %d", (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); + msg_err("bad address length: %d, expected to be %d", + (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); } } - rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit"); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); } else if (res == ratelimit_existing) { - rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit"); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); } rspamd_inet_address_free(masked); @@ -585,13 +666,13 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) elt = g_malloc(sizeof(*elt)); elt->addr = masked; /* transfer ownership */ elt->cur = 1; - elt->last = session->timestamp; + elt->last = timestamp; - rspamd_lru_hash_insert(session->ctx->ratelimit_buckets, + rspamd_lru_hash_insert(ctx->ratelimit_buckets, masked, elt, - session->timestamp, - session->ctx->leaky_bucket_ttl); + timestamp, + ctx->leaky_bucket_ttl); } return TRUE; @@ -645,21 +726,24 @@ rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, } static gboolean -rspamd_fuzzy_check_write(struct fuzzy_session *session, uint8_t cmd) +rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + struct fuzzy_key *key, + uint8_t cmd) { - if (session->ctx->read_only) { + if (ctx->read_only) { return FALSE; } /* * Check IP first */ - if (session->ctx->update_ips != NULL && session->addr) { - if (rspamd_inet_address_get_af(session->addr) == AF_UNIX) { + if (ctx->update_ips != NULL && addr) { + if (rspamd_inet_address_get_af(addr) == AF_UNIX) { return TRUE; } - if (rspamd_match_radix_map_addr(session->ctx->update_ips, - session->addr) == NULL) { + if (rspamd_match_radix_map_addr(ctx->update_ips, + addr) == NULL) { return FALSE; } else { @@ -670,25 +754,25 @@ rspamd_fuzzy_check_write(struct fuzzy_session *session, uint8_t cmd) /* * Check global list of the update keys */ - if (session->ctx->update_keys != NULL && session->key->stat && session->key->key) { + if (ctx->update_keys != NULL && key && key->stat && key->key) { static char base32_buf[rspamd_cryptobox_HASHBYTES * 2 + 1]; unsigned int raw_len; - const unsigned char *pk_raw = rspamd_keypair_component(session->key->key, + const unsigned char *pk_raw = rspamd_keypair_component(key->key, RSPAMD_KEYPAIR_COMPONENT_ID, &raw_len); int encoded_len = rspamd_encode_base32_buf(pk_raw, raw_len, base32_buf, sizeof(base32_buf), RSPAMD_BASE32_DEFAULT); - if (rspamd_match_hash_map(session->ctx->update_keys, base32_buf, encoded_len)) { + if (rspamd_match_hash_map(ctx->update_keys, base32_buf, encoded_len)) { return TRUE; } } - if (session->key) { - if (cmd == FUZZY_WRITE && session->key->flags & FUZZY_KEY_WRITE) { + if (key) { + if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) { return TRUE; } - else if (cmd == FUZZY_DEL && session->key->flags & FUZZY_KEY_DELETE) { + else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) { return TRUE; } } @@ -1632,17 +1716,20 @@ rspamd_fuzzy_process_command(struct fuzzy_session *session) if (session->ctx->ratelimit_buckets) { if (session->ctx->ratelimit_log_only) { - (void) rspamd_fuzzy_check_ratelimit(session); /* Check but ignore */ + (void) rspamd_fuzzy_check_ratelimit(session->ctx, session->addr, + session->worker, session->timestamp); /* Check but ignore */ } else { - is_rate_allowed = rspamd_fuzzy_check_ratelimit(session); + is_rate_allowed = rspamd_fuzzy_check_ratelimit(session->ctx, session->addr, + session->worker, session->timestamp); } } if (session->key && session->key->rl_bucket) { /* Check per-key bucket */ - enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session, session->key->rl_bucket, + enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session->ctx, session->addr, + session->timestamp, session->key->rl_bucket, ratelimit_policy_normal, session->key->burst, session->key->rate); @@ -1745,7 +1832,7 @@ rspamd_fuzzy_process_command(struct fuzzy_session *session) rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); } else { - if (rspamd_fuzzy_check_write(session, cmd->cmd)) { + if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key, cmd->cmd)) { /* Check whitelist */ if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest), @@ -2178,6 +2265,74 @@ fuzzy_session_destroy(gpointer d) g_free(session); } +static void +fuzzy_tcp_session_destroy(gpointer d) +{ + struct fuzzy_tcp_session *session = d; + + msg_debug_fuzzy_storage("destroying TCP session from %s", + rspamd_inet_address_to_string(session->common.addr)); + + if (ev_can_stop(&session->common.io)) { + ev_io_stop(session->common.ctx->event_loop, &session->common.io); + } + + if (ev_can_stop(&session->tm)) { + ev_timer_stop(session->common.ctx->event_loop, &session->tm); + } + + /* Free replies queue */ + struct fuzzy_tcp_reply_queue_elt *elt, *tmp; + DL_FOREACH_SAFE(session->replies_queue, elt, tmp) + { + DL_DELETE(session->replies_queue, elt); + g_free(elt); + } + + close(session->common.fd); + rspamd_inet_address_free(session->common.addr); + rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm)); + session->common.worker->nconns--; + + if (session->common.ip_stat) { + REF_RELEASE(session->common.ip_stat); + } + + if (session->common.extensions) { + g_free(session->common.extensions); + } + + if (session->common.key) { + REF_RELEASE(session->common.key); + } + + g_free(session); +} + +static void +fuzzy_udp_session_destroy(gpointer d) +{ + struct fuzzy_udp_session *session = d; + + rspamd_inet_address_free(session->common.addr); + rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm)); + session->common.worker->nconns--; + + if (session->common.ip_stat) { + REF_RELEASE(session->common.ip_stat); + } + + if (session->common.extensions) { + g_free(session->common.extensions); + } + + if (session->common.key) { + REF_RELEASE(session->common.key); + } + + g_free(session); +} + #define FUZZY_INPUT_BUFLEN 1024 #ifdef HAVE_RECVMMSG #define MSGVEC_LEN 16 @@ -2321,6 +2476,299 @@ accept_fuzzy_socket(EV_P_ ev_io *w, int revents) } } +/* TCP-specific reply and I/O handlers */ + +static void +rspamd_fuzzy_tcp_timeout(EV_P_ ev_timer *w, int revents) +{ + struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) w->data; + + msg_debug_fuzzy_storage("TCP session from %s timed out", + rspamd_inet_address_to_string(session->common.addr)); + + REF_RELEASE(session); +} + +static bool +rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, + struct fuzzy_tcp_reply_queue_elt *reply) +{ + gssize r; + gsize total_len = sizeof(reply->rep.size_hdr) + ntohs(reply->rep.size_hdr); + gsize remaining = total_len - reply->written; + unsigned char *data = ((unsigned char *) &reply->rep) + reply->written; + + r = write(session->common.fd, data, remaining); + + if (r == -1) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + return false; + } + else { + msg_err("error while writing TCP reply: %s", strerror(errno)); + return false; + } + } + + reply->written += r; + + if (reply->written >= total_len) { + /* Reply fully sent */ + DL_DELETE(session->replies_queue, reply); + g_free(reply); + + msg_debug_fuzzy_storage("TCP reply sent to %s, %z bytes", + rspamd_inet_address_to_string(session->common.addr), + (size_t) r); + + return true; + } + + return false; +} + +static void +rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents) +{ + struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) w->data; + gssize r; + + if (revents & EV_READ) { + /* Read available data */ + r = read(session->common.fd, + session->input_buf + session->bytes_unprocessed, + sizeof(session->input_buf) - session->bytes_unprocessed); + + if (r <= 0) { + if (r == -1) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + return; + } + msg_debug_fuzzy_storage("read error on TCP connection from %s: %s", + rspamd_inet_address_to_string(session->common.addr), + strerror(errno)); + } + else { + msg_debug_fuzzy_storage("TCP connection from %s closed by peer", + rspamd_inet_address_to_string(session->common.addr)); + } + + REF_RELEASE(session); + return; + } + + session->bytes_unprocessed += r; + session->common.timestamp = ev_now(session->common.ctx->event_loop); + + /* Reset timeout */ + ev_timer_again(EV_A_ & session->tm); + + /* Process frames */ + unsigned int processed_offset = 0; + + while (processed_offset < session->bytes_unprocessed) { + uint16_t frame_len; + + /* Check frame state */ + if ((session->cur_frame_state & 0xC000) == 0x0000) { + /* Need to read first byte of length */ + if (processed_offset < session->bytes_unprocessed) { + session->cur_frame_state = 0x8000 | session->input_buf[processed_offset]; + processed_offset++; + } + else { + break; + } + } + + if ((session->cur_frame_state & 0xC000) == 0x8000) { + /* Need to read second byte of length */ + if (processed_offset < session->bytes_unprocessed) { + uint16_t first_byte = session->cur_frame_state & 0xFF; + uint16_t second_byte = session->input_buf[processed_offset]; + session->cur_frame_state = 0xC000 | ((first_byte << 8) | second_byte); + processed_offset++; + } + else { + break; + } + } + + /* Now we have full length */ + frame_len = session->cur_frame_state & 0x3FFF; + + if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_shingle_cmd)) { + msg_err("invalid frame length %d from %s, closing connection", + (int) frame_len, + rspamd_inet_address_to_string(session->common.addr)); + REF_RELEASE(session); + return; + } + + /* Check if we have complete frame */ + if (session->bytes_unprocessed - processed_offset >= frame_len) { + /* Process this frame using legacy session temporarily */ + struct fuzzy_session legacy_session; + + memset(&legacy_session, 0, sizeof(legacy_session)); + legacy_session.worker = session->common.worker; + legacy_session.addr = session->common.addr; + legacy_session.ctx = session->common.ctx; + legacy_session.fd = session->common.fd; + legacy_session.timestamp = session->common.timestamp; + legacy_session.key = session->common.key; + legacy_session.ip_stat = session->common.ip_stat; + memcpy(legacy_session.nm, session->common.nm, sizeof(legacy_session.nm)); + + if (rspamd_fuzzy_cmd_from_wire(session->input_buf + processed_offset, + frame_len, &legacy_session)) { + /* Copy parsed data back */ + session->common.epoch = legacy_session.epoch; + session->common.cmd_type = legacy_session.cmd_type; + memcpy(&session->common.cmd, &legacy_session.cmd, sizeof(session->common.cmd)); + session->common.key = legacy_session.key; + session->common.extensions = legacy_session.extensions; + memcpy(session->common.nm, legacy_session.nm, sizeof(session->common.nm)); + + /* Process command - this will need to be adapted for TCP */ + rspamd_fuzzy_process_command(&legacy_session); + } + else { + session->common.ctx->stat.invalid_requests++; + msg_debug_fuzzy_storage("invalid TCP fuzzy command of size %d received from %s", + (int) frame_len, + rspamd_inet_address_to_string(session->common.addr)); + } + + processed_offset += frame_len; + session->cur_frame_state = 0x0000; /* Reset for next frame */ + } + else { + /* Incomplete frame, wait for more data */ + break; + } + } + + /* Move unprocessed data to the beginning */ + if (processed_offset > 0) { + if (processed_offset < session->bytes_unprocessed) { + memmove(session->input_buf, + session->input_buf + processed_offset, + session->bytes_unprocessed - processed_offset); + session->bytes_unprocessed -= processed_offset; + } + else { + session->bytes_unprocessed = 0; + } + } + } + + if (revents & EV_WRITE) { + /* Write pending replies */ + struct fuzzy_tcp_reply_queue_elt *elt; + + while ((elt = session->replies_queue) != NULL) { + if (!rspamd_fuzzy_tcp_write_reply(session, elt)) { + /* Cannot write more, wait for next write event */ + return; + } + } + + /* All replies sent, disable write event */ + ev_io_stop(EV_A_ w); + ev_io_set(w, w->fd, EV_READ); + ev_io_start(EV_A_ w); + } +} + +static void +accept_tcp_socket(EV_P_ ev_io *w, int revents) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) w->data; + struct rspamd_fuzzy_storage_ctx *ctx; + struct fuzzy_tcp_session *session; + rspamd_inet_addr_t *addr = NULL; + int nfd; + + ctx = (struct rspamd_fuzzy_storage_ctx *) worker->ctx; + + if ((nfd = rspamd_accept_from_socket(w->fd, &addr, + rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) { + msg_warn("TCP accept failed: %s", strerror(errno)); + return; + } + + /* Check for EAGAIN */ + if (nfd == 0) { + if (addr) { + rspamd_inet_address_free(addr); + } + return; + } + + ev_now_update_if_cheap(ctx->event_loop); + + /* Check ratelimit */ + if (!rspamd_fuzzy_check_ratelimit(ctx, addr, worker, ev_now(ctx->event_loop))) { + msg_info("ratelimiting TCP connection from %s", + rspamd_inet_address_to_string(addr)); + rspamd_inet_address_free(addr); + close(nfd); + return; + } + + /* Check if client is allowed */ + if (!rspamd_fuzzy_check_client(ctx, addr)) { + msg_info("refusing TCP connection from %s (blacklisted)", + rspamd_inet_address_to_string(addr)); + rspamd_inet_address_free(addr); + close(nfd); + return; + } + + /* Set TCP_NODELAY */ +#ifdef TCP_NODELAY + { + int sopt = 1; + if (setsockopt(nfd, IPPROTO_TCP, TCP_NODELAY, &sopt, sizeof(sopt)) == -1) { + msg_warn("cannot set TCP_NODELAY for %s: %s", + rspamd_inet_address_to_string(addr), + strerror(errno)); + } + } +#endif + + /* Create session */ + session = g_malloc0(sizeof(*session)); + REF_INIT_RETAIN(session, fuzzy_tcp_session_destroy); + + session->common.ctx = ctx; + session->common.worker = worker; + session->common.addr = addr; + session->common.fd = nfd; + session->common.timestamp = ev_now(ctx->event_loop); + + session->cur_frame_state = 0x0000; + session->bytes_unprocessed = 0; + session->replies_queue = NULL; + + worker->nconns++; + + msg_debug_fuzzy_storage("accepted TCP connection from %s", + rspamd_inet_address_to_string(addr)); + + /* Setup I/O watcher */ + session->common.io.data = session; + ev_io_init(&session->common.io, rspamd_fuzzy_tcp_io, nfd, EV_READ); + ev_io_start(ctx->event_loop, &session->common.io); + + /* Setup timeout */ + session->tm.data = session; + ev_timer_init(&session->tm, rspamd_fuzzy_tcp_timeout, + ctx->tcp_timeout, ctx->tcp_timeout); + ev_timer_start(ctx->event_loop, &session->tm); +} + static gboolean rspamd_fuzzy_storage_periodic_callback(void *ud) { @@ -3268,6 +3716,7 @@ init_fuzzy(struct rspamd_config *cfg) ctx->leaky_bucket_burst = NAN; ctx->leaky_bucket_rate = NAN; ctx->delay = NAN; + ctx->tcp_timeout = DEFAULT_TCP_TIMEOUT; ctx->default_forbidden_ids = kh_init(fuzzy_key_ids_set); ctx->weak_ids = kh_init(fuzzy_key_ids_set); @@ -3301,6 +3750,16 @@ init_fuzzy(struct rspamd_config *cfg) RSPAMD_CL_FLAG_TIME_FLOAT, "Default delay time for hashes, default: not enabled"); + rspamd_rcl_register_worker_option(cfg, + type, + "tcp_timeout", + rspamd_rcl_parse_struct_time, + ctx, + G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, + tcp_timeout), + RSPAMD_CL_FLAG_TIME_FLOAT, + "TCP connection timeout, default: " G_STRINGIFY(DEFAULT_TCP_TIMEOUT) " seconds"); + rspamd_rcl_register_worker_option(cfg, type, "allow_update", @@ -3557,8 +4016,17 @@ fuzzy_peer_rep(struct rspamd_worker *worker, ev_io_start(ctx->event_loop, &ac_ev->accept_ev); DL_APPEND(worker->accept_events, ac_ev); } + else if (ls->type == RSPAMD_WORKER_SOCKET_TCP) { + ac_ev = g_malloc0(sizeof(*ac_ev)); + ac_ev->accept_ev.data = worker; + ac_ev->event_loop = ctx->event_loop; + ev_io_init(&ac_ev->accept_ev, accept_tcp_socket, ls->fd, + EV_READ); + ev_io_start(ctx->event_loop, &ac_ev->accept_ev); + DL_APPEND(worker->accept_events, ac_ev); + } else { - /* We allow TCP listeners only for a update worker */ + /* Unknown socket type */ g_assert_not_reached(); } }