#define DEFAULT_BUCKET_MASK 24
/* Update stats on keys each 1 hour */
#define KEY_STAT_INTERVAL 3600.0
+/* TCP constants */
+#define FUZZY_TCP_BUFFER_LENGTH 8192
+#define DEFAULT_TCP_TIMEOUT 5.0
static const char *local_db_name = "local";
gpointer init_fuzzy(struct rspamd_config *cfg);
void start_fuzzy(struct rspamd_worker *worker);
+#define msg_debug_fuzzy_storage(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_fuzzy_storage_log_id, "fuzzy_storage", NULL, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+INIT_LOG_MODULE(fuzzy_storage)
+
worker_t fuzzy_worker = {
"fuzzy", /* Name */
init_fuzzy, /* Init function */
start_fuzzy, /* Start function */
RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_NO_STRICT_CONFIG | RSPAMD_WORKER_FUZZY,
- RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */
- RSPAMD_WORKER_VER /* Version info */
+ RSPAMD_WORKER_SOCKET_UDP | RSPAMD_WORKER_SOCKET_TCP, /* UDP + TCP socket */
+ RSPAMD_WORKER_VER /* Version info */
};
struct fuzzy_global_stat {
double expire;
double sync_timeout;
double delay;
+ double tcp_timeout;
struct rspamd_radix_map_helper *update_ips;
struct rspamd_hash_map_helper *update_keys;
struct rspamd_radix_map_helper *blocked_ips;
CMD_ENCRYPTED_SHINGLE
};
+struct rspamd_fuzzy_tcp_frame {
+ uint16_t size_hdr; /* We have to write this as well */
+ struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
+};
+
+struct fuzzy_tcp_reply_queue_elt {
+ struct rspamd_fuzzy_tcp_frame rep; /* Serialized reply */
+ unsigned int written; /* How many bytes have we already written */
+ struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */
+};
+
+struct fuzzy_common_session {
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ int fd;
+ struct ev_io io;
+ ev_tstamp timestamp;
+ struct rspamd_worker *worker;
+ rspamd_inet_addr_t *addr;
+
+ enum rspamd_fuzzy_epoch epoch;
+ enum fuzzy_cmd_type cmd_type;
+ struct rspamd_fuzzy_shingle_cmd cmd;
+ struct fuzzy_key *key;
+ struct rspamd_fuzzy_cmd_extension *extensions;
+ struct fuzzy_key_stat *ip_stat;
+ unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+};
+
+struct fuzzy_tcp_session {
+ struct ev_timer tm;
+
+ /*
+ * We store the state in the current frame
+ * 0 0 x x x x x x x x x x x x x x x - initial
+ * 1 0 x x x x x x x x x x x x x x x - read 1 byte of length
+ * 1 1 x x x x x x x x x x x x x x x - read 2 bytes of length
+ * So the length is always cur_frame & 0x3fff
+ */
+ uint16_t cur_frame_state;
+ uint16_t bytes_unprocessed;
+
+ /* Common with UDP session */
+ struct fuzzy_common_session common;
+ ref_entry_t ref;
+
+ struct fuzzy_tcp_reply_queue_elt *replies_queue;
+ unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
+};
+
+struct fuzzy_udp_session {
+ /* Common fields with TCP session */
+ struct fuzzy_common_session common;
+ struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
+ ref_entry_t ref;
+};
+
+/* Legacy structure name for compatibility during refactoring */
struct fuzzy_session {
struct rspamd_worker *worker;
rspamd_inet_addr_t *addr;
static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
+static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
+static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
+ struct fuzzy_tcp_reply_queue_elt *reply);
static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
const char *source, gboolean final);
static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
static struct fuzzy_key *fuzzy_add_keypair_from_ucl(struct rspamd_config *cfg,
const ucl_object_t *obj,
khash_t(rspamd_fuzzy_keys_hash) * target);
+static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
+static void accept_tcp_socket(EV_P_ ev_io *w, int revents);
static ucl_object_t *rspamd_leaky_bucket_to_ucl(struct rspamd_leaky_bucket_elt *p_elt);
struct fuzzy_keymap_ucl_buf {
};
static enum rspamd_ratelimit_check_result
-rspamd_fuzzy_check_ratelimit_bucket(struct fuzzy_session *session, struct rspamd_leaky_bucket_elt *elt,
- enum rspamd_ratelimit_check_policy policy, double max_burst, double max_rate)
+rspamd_fuzzy_check_ratelimit_bucket(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ ev_tstamp timestamp,
+ struct rspamd_leaky_bucket_elt *elt,
+ enum rspamd_ratelimit_check_policy policy,
+ double max_burst, double max_rate)
{
gboolean ratelimited = FALSE, new_ratelimit = FALSE;
/* There is an issue with the previous logic: the TTL is updated each time
* we see that new bucket. Hence, we need to check the `last` and act accordingly
*/
- if (elt->last < session->timestamp && session->timestamp - elt->last >= session->ctx->leaky_bucket_ttl) {
+ if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
/*
* We reset bucket to it's 90% capacity to allow some requests
* This should cope with the issue when we block an IP network for some burst and never unblock it
*/
elt->cur = max_burst * 0.9;
- elt->last = session->timestamp;
+ elt->last = timestamp;
}
else {
ratelimited = TRUE;
}
else {
/* Update bucket: leak some elements */
- if (elt->last < session->timestamp) {
- elt->cur -= max_rate * (session->timestamp - elt->last);
- elt->last = session->timestamp;
+ if (elt->last < timestamp) {
+ elt->cur -= max_rate * (timestamp - elt->last);
+ elt->last = timestamp;
if (elt->cur < 0) {
elt->cur = 0;
}
}
else {
- elt->last = session->timestamp;
+ elt->last = timestamp;
}
/* Check the bucket */
}
if (ratelimited) {
- rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
}
if (new_ratelimit) {
}
static gboolean
-rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
+rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ struct rspamd_worker *worker,
+ ev_tstamp timestamp)
{
rspamd_inet_addr_t *masked;
struct rspamd_leaky_bucket_elt *elt;
- if (!session->addr) {
+ if (!addr) {
return TRUE;
}
- if (session->ctx->ratelimit_whitelist != NULL) {
- if (rspamd_match_radix_map_addr(session->ctx->ratelimit_whitelist,
- session->addr) != NULL) {
+ if (ctx->ratelimit_whitelist != NULL) {
+ if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
+ addr) != NULL) {
return TRUE;
}
}
/*
- if (rspamd_inet_address_is_local (session->addr, TRUE)) {
+ if (rspamd_inet_address_is_local (addr, TRUE)) {
return TRUE;
}
*/
- masked = rspamd_inet_address_copy(session->addr, NULL);
+ masked = rspamd_inet_address_copy(addr, NULL);
if (rspamd_inet_address_get_af(masked) == AF_INET) {
rspamd_inet_address_apply_mask(masked,
- MIN(session->ctx->leaky_bucket_mask, 32));
+ MIN(ctx->leaky_bucket_mask, 32));
}
else {
/* Must be at least /64 */
rspamd_inet_address_apply_mask(masked,
- MIN(MAX(session->ctx->leaky_bucket_mask * 4, 64), 128));
+ MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
}
- elt = rspamd_lru_hash_lookup(session->ctx->ratelimit_buckets, masked,
- (time_t) session->timestamp);
+ elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
+ (time_t) timestamp);
if (elt) {
- enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session, elt,
+ enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(ctx, addr,
+ timestamp, elt,
ratelimit_policy_permanent,
- session->ctx->leaky_bucket_burst,
- session->ctx->leaky_bucket_rate);
+ ctx->leaky_bucket_burst,
+ ctx->leaky_bucket_rate);
if (res == ratelimit_new) {
msg_info("ratelimiting %s (%s), %.1f max elts",
- rspamd_inet_address_to_string(session->addr),
+ rspamd_inet_address_to_string(addr),
rspamd_inet_address_to_string(masked),
- session->ctx->leaky_bucket_burst);
+ ctx->leaky_bucket_burst);
struct rspamd_srv_command srv_cmd;
if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
msg_debug("propagating blocked address to other workers");
- rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+ rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
}
else {
- msg_err("bad address length: %d, expected to be %d", (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+ msg_err("bad address length: %d, expected to be %d",
+ (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
}
}
- rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
}
else if (res == ratelimit_existing) {
- rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
}
rspamd_inet_address_free(masked);
elt = g_malloc(sizeof(*elt));
elt->addr = masked; /* transfer ownership */
elt->cur = 1;
- elt->last = session->timestamp;
+ elt->last = timestamp;
- rspamd_lru_hash_insert(session->ctx->ratelimit_buckets,
+ rspamd_lru_hash_insert(ctx->ratelimit_buckets,
masked,
elt,
- session->timestamp,
- session->ctx->leaky_bucket_ttl);
+ timestamp,
+ ctx->leaky_bucket_ttl);
}
return TRUE;
}
static gboolean
-rspamd_fuzzy_check_write(struct fuzzy_session *session, uint8_t cmd)
+rspamd_fuzzy_check_write(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ struct fuzzy_key *key,
+ uint8_t cmd)
{
- if (session->ctx->read_only) {
+ if (ctx->read_only) {
return FALSE;
}
/*
* Check IP first
*/
- if (session->ctx->update_ips != NULL && session->addr) {
- if (rspamd_inet_address_get_af(session->addr) == AF_UNIX) {
+ if (ctx->update_ips != NULL && addr) {
+ if (rspamd_inet_address_get_af(addr) == AF_UNIX) {
return TRUE;
}
- if (rspamd_match_radix_map_addr(session->ctx->update_ips,
- session->addr) == NULL) {
+ if (rspamd_match_radix_map_addr(ctx->update_ips,
+ addr) == NULL) {
return FALSE;
}
else {
/*
* Check global list of the update keys
*/
- if (session->ctx->update_keys != NULL && session->key->stat && session->key->key) {
+ if (ctx->update_keys != NULL && key && key->stat && key->key) {
static char base32_buf[rspamd_cryptobox_HASHBYTES * 2 + 1];
unsigned int raw_len;
- const unsigned char *pk_raw = rspamd_keypair_component(session->key->key,
+ const unsigned char *pk_raw = rspamd_keypair_component(key->key,
RSPAMD_KEYPAIR_COMPONENT_ID, &raw_len);
int encoded_len = rspamd_encode_base32_buf(pk_raw, raw_len,
base32_buf, sizeof(base32_buf),
RSPAMD_BASE32_DEFAULT);
- if (rspamd_match_hash_map(session->ctx->update_keys, base32_buf, encoded_len)) {
+ if (rspamd_match_hash_map(ctx->update_keys, base32_buf, encoded_len)) {
return TRUE;
}
}
- if (session->key) {
- if (cmd == FUZZY_WRITE && session->key->flags & FUZZY_KEY_WRITE) {
+ if (key) {
+ if (cmd == FUZZY_WRITE && key->flags & FUZZY_KEY_WRITE) {
return TRUE;
}
- else if (cmd == FUZZY_DEL && session->key->flags & FUZZY_KEY_DELETE) {
+ else if (cmd == FUZZY_DEL && key->flags & FUZZY_KEY_DELETE) {
return TRUE;
}
}
if (session->ctx->ratelimit_buckets) {
if (session->ctx->ratelimit_log_only) {
- (void) rspamd_fuzzy_check_ratelimit(session); /* Check but ignore */
+ (void) rspamd_fuzzy_check_ratelimit(session->ctx, session->addr,
+ session->worker, session->timestamp); /* Check but ignore */
}
else {
- is_rate_allowed = rspamd_fuzzy_check_ratelimit(session);
+ is_rate_allowed = rspamd_fuzzy_check_ratelimit(session->ctx, session->addr,
+ session->worker, session->timestamp);
}
}
if (session->key && session->key->rl_bucket) {
/* Check per-key bucket */
- enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session, session->key->rl_bucket,
+ enum rspamd_ratelimit_check_result res = rspamd_fuzzy_check_ratelimit_bucket(session->ctx, session->addr,
+ session->timestamp, session->key->rl_bucket,
ratelimit_policy_normal,
session->key->burst,
session->key->rate);
rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
}
else {
- if (rspamd_fuzzy_check_write(session, cmd->cmd)) {
+ if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key, cmd->cmd)) {
/* Check whitelist */
if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
g_free(session);
}
+static void
+fuzzy_tcp_session_destroy(gpointer d)
+{
+ struct fuzzy_tcp_session *session = d;
+
+ msg_debug_fuzzy_storage("destroying TCP session from %s",
+ rspamd_inet_address_to_string(session->common.addr));
+
+ if (ev_can_stop(&session->common.io)) {
+ ev_io_stop(session->common.ctx->event_loop, &session->common.io);
+ }
+
+ if (ev_can_stop(&session->tm)) {
+ ev_timer_stop(session->common.ctx->event_loop, &session->tm);
+ }
+
+ /* Free replies queue */
+ struct fuzzy_tcp_reply_queue_elt *elt, *tmp;
+ DL_FOREACH_SAFE(session->replies_queue, elt, tmp)
+ {
+ DL_DELETE(session->replies_queue, elt);
+ g_free(elt);
+ }
+
+ close(session->common.fd);
+ rspamd_inet_address_free(session->common.addr);
+ rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm));
+ session->common.worker->nconns--;
+
+ if (session->common.ip_stat) {
+ REF_RELEASE(session->common.ip_stat);
+ }
+
+ if (session->common.extensions) {
+ g_free(session->common.extensions);
+ }
+
+ if (session->common.key) {
+ REF_RELEASE(session->common.key);
+ }
+
+ g_free(session);
+}
+
+static void
+fuzzy_udp_session_destroy(gpointer d)
+{
+ struct fuzzy_udp_session *session = d;
+
+ rspamd_inet_address_free(session->common.addr);
+ rspamd_explicit_memzero(session->common.nm, sizeof(session->common.nm));
+ session->common.worker->nconns--;
+
+ if (session->common.ip_stat) {
+ REF_RELEASE(session->common.ip_stat);
+ }
+
+ if (session->common.extensions) {
+ g_free(session->common.extensions);
+ }
+
+ if (session->common.key) {
+ REF_RELEASE(session->common.key);
+ }
+
+ g_free(session);
+}
+
#define FUZZY_INPUT_BUFLEN 1024
#ifdef HAVE_RECVMMSG
#define MSGVEC_LEN 16
}
}
+/* TCP-specific reply and I/O handlers */
+
+static void
+rspamd_fuzzy_tcp_timeout(EV_P_ ev_timer *w, int revents)
+{
+ struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) w->data;
+
+ msg_debug_fuzzy_storage("TCP session from %s timed out",
+ rspamd_inet_address_to_string(session->common.addr));
+
+ REF_RELEASE(session);
+}
+
+static bool
+rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session,
+ struct fuzzy_tcp_reply_queue_elt *reply)
+{
+ gssize r;
+ gsize total_len = sizeof(reply->rep.size_hdr) + ntohs(reply->rep.size_hdr);
+ gsize remaining = total_len - reply->written;
+ unsigned char *data = ((unsigned char *) &reply->rep) + reply->written;
+
+ r = write(session->common.fd, data, remaining);
+
+ if (r == -1) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ return false;
+ }
+ else {
+ msg_err("error while writing TCP reply: %s", strerror(errno));
+ return false;
+ }
+ }
+
+ reply->written += r;
+
+ if (reply->written >= total_len) {
+ /* Reply fully sent */
+ DL_DELETE(session->replies_queue, reply);
+ g_free(reply);
+
+ msg_debug_fuzzy_storage("TCP reply sent to %s, %z bytes",
+ rspamd_inet_address_to_string(session->common.addr),
+ (size_t) r);
+
+ return true;
+ }
+
+ return false;
+}
+
+static void
+rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents)
+{
+ struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) w->data;
+ gssize r;
+
+ if (revents & EV_READ) {
+ /* Read available data */
+ r = read(session->common.fd,
+ session->input_buf + session->bytes_unprocessed,
+ sizeof(session->input_buf) - session->bytes_unprocessed);
+
+ if (r <= 0) {
+ if (r == -1) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ return;
+ }
+ msg_debug_fuzzy_storage("read error on TCP connection from %s: %s",
+ rspamd_inet_address_to_string(session->common.addr),
+ strerror(errno));
+ }
+ else {
+ msg_debug_fuzzy_storage("TCP connection from %s closed by peer",
+ rspamd_inet_address_to_string(session->common.addr));
+ }
+
+ REF_RELEASE(session);
+ return;
+ }
+
+ session->bytes_unprocessed += r;
+ session->common.timestamp = ev_now(session->common.ctx->event_loop);
+
+ /* Reset timeout */
+ ev_timer_again(EV_A_ & session->tm);
+
+ /* Process frames */
+ unsigned int processed_offset = 0;
+
+ while (processed_offset < session->bytes_unprocessed) {
+ uint16_t frame_len;
+
+ /* Check frame state */
+ if ((session->cur_frame_state & 0xC000) == 0x0000) {
+ /* Need to read first byte of length */
+ if (processed_offset < session->bytes_unprocessed) {
+ session->cur_frame_state = 0x8000 | session->input_buf[processed_offset];
+ processed_offset++;
+ }
+ else {
+ break;
+ }
+ }
+
+ if ((session->cur_frame_state & 0xC000) == 0x8000) {
+ /* Need to read second byte of length */
+ if (processed_offset < session->bytes_unprocessed) {
+ uint16_t first_byte = session->cur_frame_state & 0xFF;
+ uint16_t second_byte = session->input_buf[processed_offset];
+ session->cur_frame_state = 0xC000 | ((first_byte << 8) | second_byte);
+ processed_offset++;
+ }
+ else {
+ break;
+ }
+ }
+
+ /* Now we have full length */
+ frame_len = session->cur_frame_state & 0x3FFF;
+
+ if (frame_len > sizeof(struct rspamd_fuzzy_encrypted_shingle_cmd)) {
+ msg_err("invalid frame length %d from %s, closing connection",
+ (int) frame_len,
+ rspamd_inet_address_to_string(session->common.addr));
+ REF_RELEASE(session);
+ return;
+ }
+
+ /* Check if we have complete frame */
+ if (session->bytes_unprocessed - processed_offset >= frame_len) {
+ /* Process this frame using legacy session temporarily */
+ struct fuzzy_session legacy_session;
+
+ memset(&legacy_session, 0, sizeof(legacy_session));
+ legacy_session.worker = session->common.worker;
+ legacy_session.addr = session->common.addr;
+ legacy_session.ctx = session->common.ctx;
+ legacy_session.fd = session->common.fd;
+ legacy_session.timestamp = session->common.timestamp;
+ legacy_session.key = session->common.key;
+ legacy_session.ip_stat = session->common.ip_stat;
+ memcpy(legacy_session.nm, session->common.nm, sizeof(legacy_session.nm));
+
+ if (rspamd_fuzzy_cmd_from_wire(session->input_buf + processed_offset,
+ frame_len, &legacy_session)) {
+ /* Copy parsed data back */
+ session->common.epoch = legacy_session.epoch;
+ session->common.cmd_type = legacy_session.cmd_type;
+ memcpy(&session->common.cmd, &legacy_session.cmd, sizeof(session->common.cmd));
+ session->common.key = legacy_session.key;
+ session->common.extensions = legacy_session.extensions;
+ memcpy(session->common.nm, legacy_session.nm, sizeof(session->common.nm));
+
+ /* Process command - this will need to be adapted for TCP */
+ rspamd_fuzzy_process_command(&legacy_session);
+ }
+ else {
+ session->common.ctx->stat.invalid_requests++;
+ msg_debug_fuzzy_storage("invalid TCP fuzzy command of size %d received from %s",
+ (int) frame_len,
+ rspamd_inet_address_to_string(session->common.addr));
+ }
+
+ processed_offset += frame_len;
+ session->cur_frame_state = 0x0000; /* Reset for next frame */
+ }
+ else {
+ /* Incomplete frame, wait for more data */
+ break;
+ }
+ }
+
+ /* Move unprocessed data to the beginning */
+ if (processed_offset > 0) {
+ if (processed_offset < session->bytes_unprocessed) {
+ memmove(session->input_buf,
+ session->input_buf + processed_offset,
+ session->bytes_unprocessed - processed_offset);
+ session->bytes_unprocessed -= processed_offset;
+ }
+ else {
+ session->bytes_unprocessed = 0;
+ }
+ }
+ }
+
+ if (revents & EV_WRITE) {
+ /* Write pending replies */
+ struct fuzzy_tcp_reply_queue_elt *elt;
+
+ while ((elt = session->replies_queue) != NULL) {
+ if (!rspamd_fuzzy_tcp_write_reply(session, elt)) {
+ /* Cannot write more, wait for next write event */
+ return;
+ }
+ }
+
+ /* All replies sent, disable write event */
+ ev_io_stop(EV_A_ w);
+ ev_io_set(w, w->fd, EV_READ);
+ ev_io_start(EV_A_ w);
+ }
+}
+
+static void
+accept_tcp_socket(EV_P_ ev_io *w, int revents)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct fuzzy_tcp_session *session;
+ rspamd_inet_addr_t *addr = NULL;
+ int nfd;
+
+ ctx = (struct rspamd_fuzzy_storage_ctx *) worker->ctx;
+
+ if ((nfd = rspamd_accept_from_socket(w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
+ msg_warn("TCP accept failed: %s", strerror(errno));
+ return;
+ }
+
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ if (addr) {
+ rspamd_inet_address_free(addr);
+ }
+ return;
+ }
+
+ ev_now_update_if_cheap(ctx->event_loop);
+
+ /* Check ratelimit */
+ if (!rspamd_fuzzy_check_ratelimit(ctx, addr, worker, ev_now(ctx->event_loop))) {
+ msg_info("ratelimiting TCP connection from %s",
+ rspamd_inet_address_to_string(addr));
+ rspamd_inet_address_free(addr);
+ close(nfd);
+ return;
+ }
+
+ /* Check if client is allowed */
+ if (!rspamd_fuzzy_check_client(ctx, addr)) {
+ msg_info("refusing TCP connection from %s (blacklisted)",
+ rspamd_inet_address_to_string(addr));
+ rspamd_inet_address_free(addr);
+ close(nfd);
+ return;
+ }
+
+ /* Set TCP_NODELAY */
+#ifdef TCP_NODELAY
+ {
+ int sopt = 1;
+ if (setsockopt(nfd, IPPROTO_TCP, TCP_NODELAY, &sopt, sizeof(sopt)) == -1) {
+ msg_warn("cannot set TCP_NODELAY for %s: %s",
+ rspamd_inet_address_to_string(addr),
+ strerror(errno));
+ }
+ }
+#endif
+
+ /* Create session */
+ session = g_malloc0(sizeof(*session));
+ REF_INIT_RETAIN(session, fuzzy_tcp_session_destroy);
+
+ session->common.ctx = ctx;
+ session->common.worker = worker;
+ session->common.addr = addr;
+ session->common.fd = nfd;
+ session->common.timestamp = ev_now(ctx->event_loop);
+
+ session->cur_frame_state = 0x0000;
+ session->bytes_unprocessed = 0;
+ session->replies_queue = NULL;
+
+ worker->nconns++;
+
+ msg_debug_fuzzy_storage("accepted TCP connection from %s",
+ rspamd_inet_address_to_string(addr));
+
+ /* Setup I/O watcher */
+ session->common.io.data = session;
+ ev_io_init(&session->common.io, rspamd_fuzzy_tcp_io, nfd, EV_READ);
+ ev_io_start(ctx->event_loop, &session->common.io);
+
+ /* Setup timeout */
+ session->tm.data = session;
+ ev_timer_init(&session->tm, rspamd_fuzzy_tcp_timeout,
+ ctx->tcp_timeout, ctx->tcp_timeout);
+ ev_timer_start(ctx->event_loop, &session->tm);
+}
+
static gboolean
rspamd_fuzzy_storage_periodic_callback(void *ud)
{
ctx->leaky_bucket_burst = NAN;
ctx->leaky_bucket_rate = NAN;
ctx->delay = NAN;
+ ctx->tcp_timeout = DEFAULT_TCP_TIMEOUT;
ctx->default_forbidden_ids = kh_init(fuzzy_key_ids_set);
ctx->weak_ids = kh_init(fuzzy_key_ids_set);
RSPAMD_CL_FLAG_TIME_FLOAT,
"Default delay time for hashes, default: not enabled");
+ rspamd_rcl_register_worker_option(cfg,
+ type,
+ "tcp_timeout",
+ rspamd_rcl_parse_struct_time,
+ ctx,
+ G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx,
+ tcp_timeout),
+ RSPAMD_CL_FLAG_TIME_FLOAT,
+ "TCP connection timeout, default: " G_STRINGIFY(DEFAULT_TCP_TIMEOUT) " seconds");
+
rspamd_rcl_register_worker_option(cfg,
type,
"allow_update",
ev_io_start(ctx->event_loop, &ac_ev->accept_ev);
DL_APPEND(worker->accept_events, ac_ev);
}
+ else if (ls->type == RSPAMD_WORKER_SOCKET_TCP) {
+ ac_ev = g_malloc0(sizeof(*ac_ev));
+ ac_ev->accept_ev.data = worker;
+ ac_ev->event_loop = ctx->event_loop;
+ ev_io_init(&ac_ev->accept_ev, accept_tcp_socket, ls->fd,
+ EV_READ);
+ ev_io_start(ctx->event_loop, &ac_ev->accept_ev);
+ DL_APPEND(worker->accept_events, ac_ev);
+ }
else {
- /* We allow TCP listeners only for a update worker */
+ /* Unknown socket type */
g_assert_not_reached();
}
}