#define FUZZY_TCP_BUFFER_LENGTH 8192
-struct rspamd_fuzzy_tcp_reply {
- uint16_t size_hdr; /* We have to write this as well */
- struct rspamd_fuzzy_encrypted_reply rep; /* Payload */
+struct rspamd_fuzzy_tcp_frame {
+ uint16_t size_hdr; /* We have to write this as well */
+ struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
};
-struct fuzzy_tcp_reply {
- struct rspamd_fuzzy_tcp_reply rep; /* Serialized reply */
- unsigned int written; /* How many bytes have we already written */
- struct fuzzy_tcp_reply *prev, *next; /* Link */
+struct fuzzy_tcp_reply_queue_elt {
+ struct rspamd_fuzzy_tcp_frame rep; /* Serialized reply */
+ unsigned int written; /* How many bytes have we already written */
+ struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */
};
struct fuzzy_common_session {
struct fuzzy_common_session common;
ref_entry_t ref;
- struct fuzzy_tcp_reply *replies_queue;
+ struct fuzzy_tcp_reply_queue_elt *replies_queue;
unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
};
static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
+static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply);
static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
const char *source, gboolean final);
static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
const char *reason);
static struct fuzzy_key *fuzzy_add_keypair_from_ucl(const ucl_object_t *obj,
khash_t(rspamd_fuzzy_keys_hash) * target);
+static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
struct fuzzy_keymap_ucl_buf {
rspamd_fstring_t *buf;
}
}
+static bool
+rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply)
+{
+ ssize_t r;
+ gconstpointer data = &reply->rep;
+ size_t len = reply->rep.size_hdr + sizeof(reply->rep.size_hdr);
+
+ r = write(session->common.fd, data, len);
+
+ if (r == -1) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ /* Attach this reply to the list of the replies pending */
+ DL_APPEND(session->replies_queue, reply);
+ /* Grab reference to avoid early destruction */
+ REF_RETAIN(session);
+ ev_io_init(&session->common.io,
+ rspamd_fuzzy_tcp_io, session->common.fd, EV_WRITE | EV_READ);
+ ev_io_start(session->common.ctx->event_loop, &session->common.io);
+ }
+ else {
+ msg_err("error while writing reply: %s", strerror(errno));
+ return false;
+ }
+ }
+ else if ((size_t) r < len) {
+ reply->written = r;
+ DL_APPEND(session->replies_queue, reply);
+ /* Grab reference to avoid early destruction */
+ REF_RETAIN(session);
+ /* Partial write */
+ ev_io_init(&session->common.io,
+ rspamd_fuzzy_tcp_io, session->common.fd, EV_WRITE);
+ ev_io_start(session->common.ctx->event_loop, &session->common.io);
+ }
+ else {
+ /* Full write, no need to preserve queue elt */
+ g_free(reply);
+ }
+
+ return true;
+}
+
static void
rspamd_fuzzy_update_key_stat(gboolean matched,
struct fuzzy_key_stat *key_stat,
rspamd_fuzzy_udp_write_reply(session);
}
+static struct fuzzy_tcp_reply_queue_elt *
+rspamd_fuzzy_make_tcp_reply(struct rspamd_fuzzy_cmd *cmd,
+ struct rspamd_fuzzy_reply *result,
+ struct fuzzy_tcp_session *session,
+ int flags)
+{
+ gsize len;
+ struct fuzzy_tcp_reply_queue_elt *queue_elt;
+
+ if (cmd) {
+ queue_elt = g_malloc0(sizeof(*queue_elt));
+
+ struct rspamd_fuzzy_tcp_frame *reply = &queue_elt->rep;
+
+ result->v1.tag = cmd->tag;
+ memcpy(&reply->payload.rep, result, sizeof(*result));
+
+ if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
+ /* Hash is too fresh, need to delay it */
+ reply->payload.rep.ts = 0;
+ reply->payload.rep.v1.prob = 0.0f;
+ reply->payload.rep.v1.value = 0;
+ }
+ else if (!rspamd_fuzzy_can_reply(&session->common, flags & RSPAMD_FUZZY_REPLY_ENCRYPTED,
+ reply->payload.rep.v1.flag, reply->payload.rep.v1.prob)) {
+ /* Hash is from a forbidden flag */
+ reply->payload.rep.ts = 0;
+ reply->payload.rep.v1.prob = 0.0f;
+ reply->payload.rep.v1.value = 0;
+ reply->payload.rep.v1.flag = 0;
+ }
+ else {
+ /* Update stats before encryption */
+ if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+ rspamd_fuzzy_update_stats(session->common.ctx,
+ session->common.epoch,
+ reply->payload.rep.v1.prob > 0.5f,
+ flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+ flags & RSPAMD_FUZZY_REPLY_DELAY,
+ session->common.key,
+ session->common.ip_stat,
+ cmd->cmd,
+ &reply->payload.rep,
+ session->common.timestamp);
+ }
+ }
+
+ if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
+ /* We need also to encrypt reply */
+ ottery_rand_bytes(reply->payload.hdr.nonce,
+ sizeof(reply->payload.hdr.nonce));
+
+ /*
+ * For old replies we need to encrypt just old part, otherwise
+ * decryption would fail due to mac verification mistake
+ */
+
+ len = sizeof(reply->payload);
+ rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &reply->payload,
+ len,
+ reply->payload.hdr.nonce,
+ session->common.nm,
+ reply->payload.hdr.mac,
+ RSPAMD_CRYPTOBOX_MODE_25519);
+ }
+ else {
+ len = sizeof(reply->payload.rep);
+ }
+
+ reply->size_hdr = len;
+
+ return queue_elt;
+ }
+ else {
+ return NULL;
+ }
+}
+
static gboolean
-fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
+rspamd_fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
{
gssize r;
}
static void
-fuzzy_peer_send_io(EV_P_ ev_io *w, int revents)
+rspamd_fuzzy_peer_send_io(EV_P_ ev_io *w, int revents)
{
struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *) w->data;
- if (!fuzzy_peer_try_send(w->fd, up_req)) {
+ if (!rspamd_fuzzy_peer_try_send(w->fd, up_req)) {
msg_err("cannot send update request to the peer: %s", strerror(errno));
}
sizeof(up_req->cmd.cmd.shingle.sgl));
}
- if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+ if (!rspamd_fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
up_req->io_ev.data = up_req;
- ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+ ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io,
session->ctx->peer_fd, EV_WRITE);
ev_io_start(session->ctx->event_loop, &up_req->io_ev);
}
int send_flags = 0;
struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags);
- /* TODO write reply impl */
+
+ if (cmd) {
+
+ struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, result, session, send_flags);
+
+ if (reply) {
+ if (!rspamd_fuzzy_tcp_write_reply(session, reply)) {
+ /* Enforced deletion */
+ REF_RELEASE(session);
+ }
+ }
+ }
+ else {
+ /* Enforced deletion */
+ REF_RELEASE(session);
+ }
+
+ /* Free our refcount */
REF_RELEASE(session);
}
ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- if (!fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) {
+ if (!rspamd_fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) {
up_req->io_ev.data = up_req;
- ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+ ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io,
session->common.ctx->peer_fd, EV_WRITE);
ev_io_start(session->common.ctx->event_loop, &up_req->io_ev);
}
static void
tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
{
- struct fuzzy_tcp_reply *rep;
+ struct fuzzy_tcp_reply_queue_elt *rep;
fuzzy_common_session_dtor(&tcp_session->common);
struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&tcp_session->common, &result, &send_flags, &up_len, &final);
if (G_UNLIKELY(cmd == NULL || final)) {
+ struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
- /* TODO: write reply */
+ if (reply) {
+ rspamd_fuzzy_tcp_write_reply(tcp_session, reply);
+ }
+
+ REF_RELEASE(tcp_session);
return true;
}
result.v1.value = 403;
result.v1.prob = 0.0f;
result.v1.flag = 0;
- /* TODO: write reply */
+
+ struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+ if (reply) {
+ rspamd_fuzzy_tcp_write_reply(tcp_session, reply);
+ }
+
+ REF_RELEASE(tcp_session);
return false;
}
/* Store high qword in value and low qword in flag */
result.v1.value = (int32_t) ((uint64_t) tcp_session->common.ctx->stat.fuzzy_hashes >> 32);
result.v1.flag = (uint32_t) (tcp_session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32);
- /* TODO: write reply */
+
+ struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+ if (reply) {
+ if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) {
+ REF_RELEASE(tcp_session);
+ }
+ }
+ else {
+ REF_RELEASE(tcp_session);
+ }
}
else if (cmd->cmd == FUZZY_PING) {
result.v1.prob = 1.0f;
result.v1.value = cmd->value;
- /* TODO: write reply */
+ struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+ if (reply) {
+ if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) {
+ REF_RELEASE(tcp_session);
+ }
+ }
+ else {
+ REF_RELEASE(tcp_session);
+ }
}
else {
+ struct fuzzy_tcp_reply_queue_elt *reply;
+
if (rspamd_fuzzy_check_write(tcp_session->common.ctx, tcp_session->common.addr, tcp_session->common.key)) {
/* Check whitelist */
if (tcp_session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- if (!fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) {
+ if (!rspamd_fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) {
up_req->io_ev.data = up_req;
- ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+ ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io,
tcp_session->common.ctx->peer_fd, EV_WRITE);
ev_io_start(tcp_session->common.ctx->event_loop, &up_req->io_ev);
}
result.v1.prob = 0.0f;
}
reply:
- /* TODO: write reply */
+ reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+ if (reply) {
+ if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) {
+ REF_RELEASE(tcp_session);
+ }
+ }
+ else {
+ REF_RELEASE(tcp_session);
+ }
}
return true;
}
static bool
-fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
+rspamd_fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
{
if (bytes_read <= 0) {
/* Apparent garbage */
msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required",
(int) buf_avail, bytes_required);
- /* TODO: add this */
+ if (!rspamd_fuzzy_process_tcp_frame(tcp_session, p, bytes_required)) {
+ /* Error processing, give up */
+ return false;
+ }
+
buf_avail -= bytes_required;
p += bytes_required;
tcp_session->cur_frame_state = 0;
}
static void
-tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
+rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents)
{
struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
REF_RELEASE(tcp_session);
}
else {
- if (!fuzzy_tcp_process_input(tcp_session, r)) {
+ if (!rspamd_fuzzy_tcp_process_input(tcp_session, r)) {
REF_RELEASE(tcp_session);
}
else {
else if (revents & EV_WRITE) {
if (tcp_session->replies_queue) {
/* Try to write as many replies, as possible */
- struct fuzzy_tcp_reply *rep, *tmp;
+ struct fuzzy_tcp_reply_queue_elt *rep, *tmp;
int n = 0;
DL_FOREACH(tcp_session->replies_queue, rep)
}
static void
-tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents)
+rspamd_fuzzy_tcp_timeout(EV_P_ ev_timer *w, int revents)
{
struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
tcp_session->common.worker = worker;
tcp_session->common.timestamp = ev_now(ctx->event_loop);
REF_INIT_RETAIN(tcp_session, tcp_session_dtor);
- ev_io_init(&tcp_session->common.io, tcp_fuzzy_socket_io, nfd, EV_READ);
- ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
+ ev_io_init(&tcp_session->common.io, rspamd_fuzzy_tcp_io, nfd, EV_READ);
+ ev_timer_init(&tcp_session->tm, rspamd_fuzzy_tcp_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
tcp_session->tm.data = tcp_session;
tcp_session->common.io.data = tcp_session;
ev_timer_start(ctx->event_loop, &tcp_session->tm);