struct fuzzy_tcp_reply *prev, *next; /* Link */
};
-struct fuzzy_tcp_session {
- struct rspamd_worker *worker;
- rspamd_inet_addr_t *addr;
+struct fuzzy_common_session {
struct rspamd_fuzzy_storage_ctx *ctx;
int fd;
struct ev_io io;
+ ev_tstamp timestamp;
+ struct rspamd_worker *worker;
+ rspamd_inet_addr_t *addr;
+
+ enum rspamd_fuzzy_epoch epoch;
+ enum fuzzy_cmd_type cmd_type;
+ struct rspamd_fuzzy_shingle_cmd cmd;
+ struct fuzzy_key *key;
+ struct rspamd_fuzzy_cmd_extension *extensions;
+ struct fuzzy_key_stat *ip_stat;
+ unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+};
+
+struct fuzzy_tcp_session {
struct ev_timer tm;
/*
uint16_t bytes_unprocessed;
/* Common with UDP session */
- enum rspamd_fuzzy_epoch epoch;
- enum fuzzy_cmd_type cmd_type;
- struct rspamd_fuzzy_shingle_cmd cmd;
- struct fuzzy_key *key;
- struct rspamd_fuzzy_cmd_extension *extensions;
- struct fuzzy_key_stat *ip_stat;
- unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
-
+ struct fuzzy_common_session common;
ref_entry_t ref;
struct fuzzy_tcp_reply *replies_queue;
unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
};
-struct fuzzy_session {
- struct rspamd_worker *worker;
- rspamd_inet_addr_t *addr;
- struct rspamd_fuzzy_storage_ctx *ctx;
-
- struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */
+struct fuzzy_udp_session {
+ /* Common fields with TCP session */
+ struct fuzzy_common_session common;
struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
- struct fuzzy_key_stat *ip_stat;
-
- enum rspamd_fuzzy_epoch epoch;
- enum fuzzy_cmd_type cmd_type;
- int fd;
- ev_tstamp timestamp;
-
- struct fuzzy_key *key;
- struct rspamd_fuzzy_cmd_extension *extensions;
- unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
-
- struct ev_io io;
ref_entry_t ref;
};
};
-static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
+static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
const char *source, gboolean final);
static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
}
static void
-rspamd_fuzzy_reply_io(EV_P_ ev_io *w, int revents)
+rspamd_fuzzy_udp_reply_io(EV_P_ ev_io *w, int revents)
{
- struct fuzzy_session *session = (struct fuzzy_session *) w->data;
+ struct fuzzy_udp_session *session = (struct fuzzy_udp_session *) w->data;
ev_io_stop(EV_A_ w);
- rspamd_fuzzy_write_reply(session);
+ rspamd_fuzzy_udp_write_reply(session);
REF_RELEASE(session);
}
static void
-rspamd_fuzzy_write_reply(struct fuzzy_session *session)
+rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session)
{
gssize r;
gsize len;
gconstpointer data;
- if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
- session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
+ if (session->common.cmd_type == CMD_ENCRYPTED_NORMAL ||
+ session->common.cmd_type == CMD_ENCRYPTED_SHINGLE) {
/* Encrypted reply */
data = &session->reply;
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) {
len = sizeof(session->reply);
}
else {
else {
data = &session->reply.rep;
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) {
len = sizeof(session->reply.rep);
}
else {
}
}
- r = rspamd_inet_address_sendto(session->fd, data, len, 0,
- session->addr);
+ r = rspamd_inet_address_sendto(session->common.fd, data, len, 0,
+ session->common.addr);
if (r == -1) {
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
/* Grab reference to avoid early destruction */
REF_RETAIN(session);
- session->io.data = session;
- ev_io_init(&session->io,
- rspamd_fuzzy_reply_io, session->fd, EV_WRITE);
- ev_io_start(session->ctx->event_loop, &session->io);
+ session->common.io.data = session;
+ ev_io_init(&session->common.io,
+ rspamd_fuzzy_udp_reply_io, session->common.fd, EV_WRITE);
+ ev_io_start(session->common.ctx->event_loop, &session->common.io);
}
else {
msg_err("error while writing reply: %s", strerror(errno));
RSPAMD_FUZZY_REPLY_DELAY = 0x1u << 2u,
};
+static bool
+rspamd_fuzzy_can_reply(struct fuzzy_common_session *session, bool encrypted, int flag, float prob)
+{
+ bool default_disabled = false;
+
+ {
+ khiter_t k;
+
+ k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, flag);
+
+ if (k != kh_end(session->ctx->default_forbidden_ids)) {
+ /* Hash is from a forbidden flag by default */
+ default_disabled = true;
+ }
+ }
+
+ if (encrypted) {
+ if (prob > 0 && session->key && session->key->forbidden_ids) {
+ khiter_t k;
+
+ k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, flag);
+
+ if (k != kh_end(session->key->forbidden_ids)) {
+ /* Hash is from a forbidden flag for this key */
+ default_disabled = true;
+ }
+ }
+ }
+
+ return !default_disabled;
+}
+
static void
-rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd,
- struct rspamd_fuzzy_reply *result,
- struct fuzzy_session *session,
- int flags)
+rspamd_fuzzy_make_udp_reply(struct rspamd_fuzzy_cmd *cmd,
+ struct rspamd_fuzzy_reply *result,
+ struct fuzzy_udp_session *session,
+ int flags)
{
gsize len;
session->reply.rep.v1.prob = 0.0f;
session->reply.rep.v1.value = 0;
}
-
- bool default_disabled = false;
-
- {
- khiter_t k;
-
- k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, session->reply.rep.v1.flag);
-
- if (k != kh_end(session->ctx->default_forbidden_ids)) {
- /* Hash is from a forbidden flag by default */
- default_disabled = true;
+ else if (!rspamd_fuzzy_can_reply(&session->common, flags & RSPAMD_FUZZY_REPLY_ENCRYPTED,
+ session->reply.rep.v1.flag, session->reply.rep.v1.prob)) {
+ /* Hash is from a forbidden flag */
+ session->reply.rep.ts = 0;
+ session->reply.rep.v1.prob = 0.0f;
+ session->reply.rep.v1.value = 0;
+ session->reply.rep.v1.flag = 0;
+ }
+ else {
+ /* Update stats before encryption */
+ if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+ rspamd_fuzzy_update_stats(session->common.ctx,
+ session->common.epoch,
+ session->reply.rep.v1.prob > 0.5f,
+ flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+ flags & RSPAMD_FUZZY_REPLY_DELAY,
+ session->common.key,
+ session->common.ip_stat,
+ cmd->cmd,
+ &session->reply.rep,
+ session->common.timestamp);
}
}
if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
-
- if (session->reply.rep.v1.prob > 0 && session->key && session->key->forbidden_ids) {
- khiter_t k;
-
- k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, session->reply.rep.v1.flag);
-
- if (k != kh_end(session->key->forbidden_ids)) {
- /* Hash is from a forbidden flag for this key */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- session->reply.rep.v1.flag = 0;
- }
- }
- else if (default_disabled) {
- /* Hash is from a forbidden flag by default */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- session->reply.rep.v1.flag = 0;
- }
-
/* We need also to encrypt reply */
ottery_rand_bytes(session->reply.hdr.nonce,
sizeof(session->reply.hdr.nonce));
* decryption would fail due to mac verification mistake
*/
- if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+ if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) {
len = sizeof(session->reply.rep);
}
else {
len = sizeof(session->reply.rep.v1);
}
- /* Update stats before encryption */
- if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
- rspamd_fuzzy_update_stats(session->ctx,
- session->epoch,
- session->reply.rep.v1.prob > 0.5f,
- flags & RSPAMD_FUZZY_REPLY_SHINGLE,
- flags & RSPAMD_FUZZY_REPLY_DELAY,
- session->key,
- session->ip_stat,
- cmd->cmd,
- &session->reply.rep,
- session->timestamp);
- }
-
rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &session->reply.rep,
len,
session->reply.hdr.nonce,
- session->nm,
+ session->common.nm,
session->reply.hdr.mac,
RSPAMD_CRYPTOBOX_MODE_25519);
}
- else if (default_disabled) {
- /* Hash is from a forbidden flag by default, and there is no encryption override */
- session->reply.rep.ts = 0;
- session->reply.rep.v1.prob = 0.0f;
- session->reply.rep.v1.value = 0;
- session->reply.rep.v1.flag = 0;
- }
- if (!(flags & RSPAMD_FUZZY_REPLY_ENCRYPTED)) {
- if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
- rspamd_fuzzy_update_stats(session->ctx,
- session->epoch,
- session->reply.rep.v1.prob > 0.5f,
- flags & RSPAMD_FUZZY_REPLY_SHINGLE,
- flags & RSPAMD_FUZZY_REPLY_DELAY,
- session->key,
- session->ip_stat,
- cmd->cmd,
- &session->reply.rep,
- session->timestamp);
- }
- }
}
- rspamd_fuzzy_write_reply(session);
+ rspamd_fuzzy_udp_write_reply(session);
}
static gboolean
}
}
-static void
-rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
+static struct rspamd_fuzzy_cmd *
+rspamd_fuzzy_fill_reply(struct rspamd_fuzzy_reply *result, struct fuzzy_common_session *session, int *send_flags)
{
- struct fuzzy_session *session = ud;
- gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
+ bool encrypted = false, is_shingle = false;
struct rspamd_fuzzy_cmd *cmd = NULL;
const struct rspamd_shingle *shingle = NULL;
struct rspamd_shingle sgl_cpy;
- int send_flags = 0;
switch (session->cmd_type) {
case CMD_ENCRYPTED_NORMAL:
- encrypted = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+ encrypted = true;
+ *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
/* Fallthrough */
case CMD_NORMAL:
cmd = &session->cmd.basic;
break;
case CMD_ENCRYPTED_SHINGLE:
- encrypted = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+ encrypted = true;
+ *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
/* Fallthrough */
case CMD_SHINGLE:
cmd = &session->cmd.basic;
memcpy(&sgl_cpy, &session->cmd.sgl, sizeof(sgl_cpy));
shingle = &sgl_cpy;
- is_shingle = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
+ is_shingle = true;
+ *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
break;
}
}
lua_settop(L, 0);
- rspamd_fuzzy_make_reply(cmd, result, session, send_flags);
- REF_RELEASE(session);
-
- return;
+ return cmd;
}
}
session->ctx->delay / 2.0);
if (hash_age < jittered_age) {
- send_flags |= RSPAMD_FUZZY_REPLY_DELAY;
+ *send_flags |= RSPAMD_FUZZY_REPLY_DELAY;
}
}
}
}
- rspamd_fuzzy_make_reply(cmd, result, session, send_flags);
+ return cmd;
+}
+
+static void
+rspamd_fuzzy_udp_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
+{
+ struct fuzzy_udp_session *session = (struct fuzzy_udp_session *) ud;
+ int send_flags = 0;
+ struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags);
+ rspamd_fuzzy_make_udp_reply(cmd, result, session, send_flags);
REF_RELEASE(session);
}
static void
-rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
+rspamd_fuzzy_tcp_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
{
- gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
- struct rspamd_fuzzy_cmd *cmd = NULL;
- struct rspamd_fuzzy_reply result;
- struct fuzzy_peer_cmd up_cmd;
- struct fuzzy_peer_request *up_req;
- struct fuzzy_key_stat *ip_stat = NULL;
- char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
- rspamd_inet_addr_t *naddr;
- gpointer ptr;
- gsize up_len = 0;
+ struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) ud;
int send_flags = 0;
+ struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags);
+ /* TODO write reply impl */
+ REF_RELEASE(session);
+}
+
+static struct rspamd_fuzzy_cmd *
+rspamd_fuzzy_prepare_cmd(struct fuzzy_common_session *session,
+ struct rspamd_fuzzy_reply *result,
+ int *send_flags,
+ size_t *up_len,
+ bool *final)
+{
+ struct fuzzy_key_stat *ip_stat = NULL;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
+ bool is_encrypted = false, is_shingle = false;
+
cmd = &session->cmd.basic;
switch (session->cmd_type) {
case CMD_NORMAL:
- up_len = sizeof(session->cmd.basic);
+ *up_len = sizeof(session->cmd.basic);
break;
case CMD_SHINGLE:
- up_len = sizeof(session->cmd);
- is_shingle = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
+ *up_len = sizeof(session->cmd);
+ is_shingle = true;
+ *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
break;
case CMD_ENCRYPTED_NORMAL:
- up_len = sizeof(session->cmd.basic);
- encrypted = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+ *up_len = sizeof(session->cmd.basic);
+ is_encrypted = true;
+ *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
break;
case CMD_ENCRYPTED_SHINGLE:
- up_len = sizeof(session->cmd);
- encrypted = TRUE;
- is_shingle = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
+ *up_len = sizeof(session->cmd);
+ is_encrypted = true;
+ is_shingle = true;
+ *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
break;
default:
msg_err("invalid command type: %d", session->cmd_type);
- return;
+ return NULL;
}
- memset(&result, 0, sizeof(result));
- memcpy(result.digest, cmd->digest, sizeof(result.digest));
- result.v1.flag = cmd->flag;
- result.v1.tag = cmd->tag;
+ memset(result, 0, sizeof(*result));
+ memcpy(result->digest, cmd->digest, sizeof(result->digest));
+ result->v1.flag = cmd->flag;
+ result->v1.tag = cmd->tag;
if (session->ctx->lua_pre_handler_cbref != -1) {
/* Start lua pre handler */
if (ret) {
/* Artificial reply */
- result.v1.value = lua_tointeger(L, err_idx + 2);
+ result->v1.value = lua_tointeger(L, err_idx + 2);
if (lua_isnumber(L, err_idx + 3)) {
- result.v1.prob = lua_tonumber(L, err_idx + 3);
+ result->v1.prob = lua_tonumber(L, err_idx + 3);
}
else {
- result.v1.prob = 0.0f;
+ result->v1.prob = 0.0f;
}
lua_settop(L, 0);
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
- return;
+ return cmd;
}
}
if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
- result.v1.value = 500;
- result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
- return;
+ result->v1.value = 500;
+ result->v1.prob = 0.0f;
+ *final = true;
+
+ return cmd;
}
- if (session->ctx->encrypted_only && !encrypted) {
+ if (session->ctx->encrypted_only && !is_encrypted) {
/* Do not accept unencrypted commands */
- result.v1.value = 403;
- result.v1.prob = 0.0f;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
- return;
+ result->v1.value = 403;
+ result->v1.prob = 0.0f;
+ *final = true;
+
+ return cmd;
}
if (session->key && session->addr) {
session->addr, -1);
if (ip_stat == NULL) {
- naddr = rspamd_inet_address_copy(session->addr, NULL);
+ rspamd_inet_addr_t *naddr = rspamd_inet_address_copy(session->addr, NULL);
ip_stat = g_malloc0(sizeof(*ip_stat));
REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor);
rspamd_lru_hash_insert(session->key->stat->last_ips,
session->ip_stat = ip_stat;
}
+ /* Unset final flag */
+ *final = false;
+
+ return cmd;
+}
+
+static void
+rspamd_fuzzy_process_udp_session(struct fuzzy_udp_session *session)
+{
+ gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
+ char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
+ rspamd_inet_addr_t *naddr;
+ gpointer ptr;
+ int send_flags = 0;
+ bool final = false;
+ size_t up_len;
+
+ struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&session->common, &session->reply.rep,
+ &send_flags, &up_len, &final);
+
+
+ if (final) {
+ rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
+ REF_RELEASE(session);
+ return;
+ }
+
if (cmd->cmd == FUZZY_CHECK) {
bool can_continue = true;
-
- if (session->ctx->ratelimit_buckets) {
- if (session->ctx->ratelimit_log_only) {
- (void) rspamd_fuzzy_check_ratelimit(session->ctx,
- session->addr,
- session->worker,
- session->timestamp); /* Check but ignore */
+ if (session->common.ctx->ratelimit_buckets) {
+ if (session->common.ctx->ratelimit_log_only) {
+ (void) rspamd_fuzzy_check_ratelimit(session->common.ctx,
+ session->common.addr,
+ session->common.worker,
+ session->common.timestamp); /* Check but ignore */
}
else {
- can_continue = rspamd_fuzzy_check_ratelimit(session->ctx,
- session->addr,
- session->worker,
- session->timestamp);
+ can_continue = rspamd_fuzzy_check_ratelimit(session->common.ctx,
+ session->common.addr,
+ session->common.worker,
+ session->common.timestamp);
}
}
if (can_continue) {
REF_RETAIN(session);
- rspamd_fuzzy_backend_check(session->ctx->backend, cmd,
- rspamd_fuzzy_check_callback, session);
+ rspamd_fuzzy_backend_check(session->common.ctx->backend, cmd,
+ rspamd_fuzzy_udp_check_callback, session);
}
else {
- result.v1.value = 403;
- result.v1.prob = 0.0f;
- result.v1.flag = 0;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ session->reply.rep.v1.value = 403;
+ session->reply.rep.v1.prob = 0.0f;
+ session->reply.rep.v1.flag = 0;
+ rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
}
}
else if (cmd->cmd == FUZZY_STAT) {
/* Store approximation (if needed) */
- result.v1.prob = session->ctx->stat.fuzzy_hashes;
+ session->reply.rep.v1.prob = session->common.ctx->stat.fuzzy_hashes;
/* Store high qword in value and low qword in flag */
- result.v1.value = (int32_t) ((uint64_t) session->ctx->stat.fuzzy_hashes >> 32);
- result.v1.flag = (uint32_t) (session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ session->reply.rep.v1.value = (int32_t) ((uint64_t) session->common.ctx->stat.fuzzy_hashes >> 32);
+ session->reply.rep.v1.flag = (uint32_t) (session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32);
+ rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
}
else if (cmd->cmd == FUZZY_PING) {
- result.v1.prob = 1.0f;
- result.v1.value = cmd->value;
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ session->reply.rep.v1.prob = 1.0f;
+ session->reply.rep.v1.value = cmd->value;
+ rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
}
else {
- if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key)) {
+ if (rspamd_fuzzy_check_write(session->common.ctx, session->common.addr, session->common.key)) {
/* Check whitelist */
- if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
+ if (session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
hexbuf, sizeof(hexbuf) - 1);
hexbuf[sizeof(hexbuf) - 1] = '\0';
- if (rspamd_match_hash_map(session->ctx->skip_hashes,
+ if (rspamd_match_hash_map(session->common.ctx->skip_hashes,
hexbuf, sizeof(hexbuf) - 1)) {
- result.v1.value = 401;
- result.v1.prob = 0.0f;
+ session->reply.rep.v1.value = 401;
+ session->reply.rep.v1.prob = 0.0f;
goto reply;
}
}
- if (session->ctx->weak_ids && kh_get(fuzzy_key_ids_set, session->ctx->weak_ids, cmd->flag) != kh_end(session->ctx->weak_ids)) {
+ if (session->common.ctx->weak_ids &&
+ kh_get(fuzzy_key_ids_set, session->common.ctx->weak_ids, cmd->flag) != kh_end(session->common.ctx->weak_ids)) {
/* Flag command as weak */
cmd->version |= RSPAMD_FUZZY_FLAG_WEAK;
}
- if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
+ struct fuzzy_peer_cmd up_cmd;
+ struct fuzzy_peer_request *up_req;
+
+ /* Decide if we can process add request by this worker */
+ if (session->common.worker->index == 0 || session->common.ctx->peer_fd == -1) {
/* Just add to the queue */
up_cmd.is_shingle = is_shingle;
ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- g_array_append_val(session->ctx->updates_pending, up_cmd);
+ g_array_append_val(session->common.ctx->updates_pending, up_cmd);
}
else {
/* We need to send request to the peer */
ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+ if (!fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) {
up_req->io_ev.data = up_req;
ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
- session->ctx->peer_fd, EV_WRITE);
- ev_io_start(session->ctx->event_loop, &up_req->io_ev);
+ session->common.ctx->peer_fd, EV_WRITE);
+ ev_io_start(session->common.ctx->event_loop, &up_req->io_ev);
}
else {
g_free(up_req);
}
}
- result.v1.value = 0;
- result.v1.prob = 1.0f;
+ session->reply.rep.v1.value = 0;
+ session->reply.rep.v1.prob = 1.0f;
}
else {
- result.v1.value = 403;
- result.v1.prob = 0.0f;
+ session->reply.rep.v1.value = 403;
+ session->reply.rep.v1.prob = 0.0f;
}
reply:
- rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+ rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
}
}
return TRUE;
}
-
static void
-fuzzy_session_destroy(gpointer d)
+fuzzy_common_session_dtor(struct fuzzy_common_session *session)
{
- struct fuzzy_session *session = d;
-
rspamd_inet_address_free(session->addr);
rspamd_explicit_memzero(session->nm, sizeof(session->nm));
session->worker->nconns--;
if (session->key) {
REF_RELEASE(session->key);
}
+}
+
+static void
+fuzzy_udp_session_dtor(gpointer d)
+{
+ struct fuzzy_udp_session *session = d;
+ fuzzy_common_session_dtor(&session->common);
g_free(session);
}
tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
{
struct fuzzy_tcp_reply *rep;
- if (tcp_session->addr) {
- rspamd_inet_address_free(tcp_session->addr);
- }
-
- tcp_session->worker->nconns--;
- if (tcp_session->ip_stat) {
- REF_RELEASE(tcp_session->ip_stat);
- }
+ fuzzy_common_session_dtor(&tcp_session->common);
- if (tcp_session->ctx->event_loop) {
- ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm);
- ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io);
+ if (tcp_session->common.ctx->event_loop) {
+ ev_timer_stop(tcp_session->common.ctx->event_loop, &tcp_session->tm);
+ ev_io_stop(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
}
DL_FOREACH(tcp_session->replies_queue, rep)
g_free(rep);
}
- close(tcp_session->fd);
+ /* For TCP session we also close a socket as it is owned by a session unlike UDP socket that is shared */
+ close(tcp_session->common.fd);
g_free(tcp_session);
}
gpointer ptr;
int send_flags = 0;
- if (!rspamd_fuzzy_cmd_from_wire(tcp_session->ctx, tcp_session->addr, buf,
+ if (!rspamd_fuzzy_cmd_from_wire(tcp_session->common.ctx, tcp_session->common.addr, buf,
buflen,
- &tcp_session->key,
- tcp_session->nm,
- &tcp_session->cmd,
- &tcp_session->epoch,
- &tcp_session->cmd_type,
- &tcp_session->extensions)) {
+ &tcp_session->common.key,
+ tcp_session->common.nm,
+ &tcp_session->common.cmd,
+ &tcp_session->common.epoch,
+ &tcp_session->common.cmd_type,
+ &tcp_session->common.extensions)) {
/* Discard input */
- tcp_session->ctx->stat.invalid_requests++;
+ tcp_session->common.ctx->stat.invalid_requests++;
msg_debug("invalid fuzzy command of size %z received", buflen);
- if (tcp_session->addr) {
- uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->ctx->errors_ips,
- tcp_session->addr, -1);
+ if (tcp_session->common.addr) {
+ uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->common.ctx->errors_ips,
+ tcp_session->common.addr, -1);
if (nerrors == NULL) {
nerrors = g_malloc(sizeof(*nerrors));
*nerrors = 1;
- rspamd_lru_hash_insert(tcp_session->ctx->errors_ips,
- rspamd_inet_address_copy(tcp_session->addr, NULL),
+ rspamd_lru_hash_insert(tcp_session->common.ctx->errors_ips,
+ rspamd_inet_address_copy(tcp_session->common.addr, NULL),
nerrors, -1, -1);
}
else {
return false;
}
- struct rspamd_fuzzy_cmd *cmd = &tcp_session->cmd.basic;
+ bool final = false;
size_t up_len = 0;
+ struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&tcp_session->common, &result, &send_flags, &up_len, &final);
- switch (tcp_session->cmd_type) {
- case CMD_NORMAL:
- up_len = sizeof(tcp_session->cmd.basic);
- break;
- case CMD_SHINGLE:
- up_len = sizeof(tcp_session->cmd);
- is_shingle = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
- break;
- case CMD_ENCRYPTED_NORMAL:
- up_len = sizeof(tcp_session->cmd.basic);
- encrypted = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
- break;
- case CMD_ENCRYPTED_SHINGLE:
- up_len = sizeof(tcp_session->cmd);
- encrypted = TRUE;
- is_shingle = TRUE;
- send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
- break;
- default:
- msg_err("invalid command type: %d", tcp_session->cmd_type);
- return false;
- }
-
- memset(&result, 0, sizeof(result));
- memcpy(result.digest, cmd->digest, sizeof(result.digest));
- result.v1.flag = cmd->flag;
- result.v1.tag = cmd->tag;
+ if (G_UNLIKELY(cmd == NULL || final)) {
- if (tcp_session->ctx->lua_pre_handler_cbref != -1) {
- /* Start lua pre handler */
- lua_State *L = tcp_session->ctx->cfg->lua_state;
- int err_idx, ret;
-
- lua_pushcfunction(L, &rspamd_lua_traceback);
- err_idx = lua_gettop(L);
- /* Preallocate stack (small opt) */
- lua_checkstack(L, err_idx + 5);
- /* function */
- lua_rawgeti(L, LUA_REGISTRYINDEX, tcp_session->ctx->lua_pre_handler_cbref);
- /* client IP */
- rspamd_lua_ip_push(L, tcp_session->addr);
- /* client command */
- lua_pushinteger(L, cmd->cmd);
- /* command value (push as rspamd_text) */
- (void) lua_new_text(L, cmd->digest, sizeof(cmd->digest), FALSE);
- /* is shingle */
- lua_pushboolean(L, is_shingle);
- /* TODO: add additional data maybe (encryption, pubkey, etc) */
- rspamd_fuzzy_extensions_tolua(L, tcp_session->extensions);
-
- if ((ret = lua_pcall(L, 5, LUA_MULTRET, err_idx)) != 0) {
- msg_err("call to lua_pre_handler lua "
- "script failed (%d): %s",
- ret, lua_tostring(L, -1));
-
- return false;
- }
- else {
- /* Return values order:
- * the first reply will be on err_idx + 1
- * if it is true, then we need to read the former ones:
- * 2-nd will be reply code
- * 3-rd will be probability (or 0.0 if missing)
- */
- ret = lua_toboolean(L, err_idx + 1);
-
- if (ret) {
- /* Artificial reply */
- result.v1.value = lua_tointeger(L, err_idx + 2);
-
- if (lua_isnumber(L, err_idx + 3)) {
- result.v1.prob = lua_tonumber(L, err_idx + 3);
- }
- else {
- result.v1.prob = 0.0f;
- }
-
- lua_settop(L, 0);
- /* TODO: write reply */
-
- return true;
- }
- }
-
- lua_settop(L, 0);
- }
-
-
- if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
- result.v1.value = 500;
- result.v1.prob = 0.0f;
- /* TODO: write reply */
-
- return true;
- }
-
- if (tcp_session->ctx->encrypted_only && !encrypted) {
- /* Do not accept unencrypted commands */
- result.v1.value = 403;
- result.v1.prob = 0.0f;
/* TODO: write reply */
return true;
}
- if (tcp_session->key && tcp_session->addr) {
- ip_stat = rspamd_lru_hash_lookup(tcp_session->key->stat->last_ips,
- tcp_session->addr, -1);
+ if (tcp_session->common.key && tcp_session->common.addr) {
+ ip_stat = rspamd_lru_hash_lookup(tcp_session->common.key->stat->last_ips,
+ tcp_session->common.addr, -1);
if (ip_stat == NULL) {
- naddr = rspamd_inet_address_copy(tcp_session->addr, NULL);
+ naddr = rspamd_inet_address_copy(tcp_session->common.addr, NULL);
ip_stat = g_malloc0(sizeof(*ip_stat));
REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor);
- rspamd_lru_hash_insert(tcp_session->key->stat->last_ips,
+ rspamd_lru_hash_insert(tcp_session->common.key->stat->last_ips,
naddr, ip_stat, -1, 0);
}
REF_RETAIN(ip_stat);
- tcp_session->ip_stat = ip_stat;
+ tcp_session->common.ip_stat = ip_stat;
}
if (cmd->cmd == FUZZY_CHECK) {
bool can_continue = true;
- if (tcp_session->ctx->ratelimit_buckets) {
- if (tcp_session->ctx->ratelimit_log_only) {
- (void) rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
- tcp_session->worker,
- ev_now(tcp_session->ctx->event_loop)); /* Check but ignore */
+ if (tcp_session->common.ctx->ratelimit_buckets) {
+ if (tcp_session->common.ctx->ratelimit_log_only) {
+ (void) rspamd_fuzzy_check_ratelimit(tcp_session->common.ctx, tcp_session->common.addr,
+ tcp_session->common.worker,
+ ev_now(tcp_session->common.ctx->event_loop)); /* Check but ignore */
}
else {
- can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
- tcp_session->worker,
- ev_now(tcp_session->ctx->event_loop));
+ can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->common.ctx, tcp_session->common.addr,
+ tcp_session->common.worker,
+ ev_now(tcp_session->common.ctx->event_loop));
}
}
if (can_continue) {
REF_RETAIN(tcp_session);
/* TODO: use a different callback */
- rspamd_fuzzy_backend_check(tcp_session->ctx->backend, cmd,
- rspamd_fuzzy_check_callback, tcp_session);
+ rspamd_fuzzy_backend_check(tcp_session->common.ctx->backend, cmd,
+ rspamd_fuzzy_tcp_check_callback, tcp_session);
}
else {
result.v1.value = 403;
}
else if (cmd->cmd == FUZZY_STAT) {
/* Store approximation (if needed) */
- result.v1.prob = tcp_session->ctx->stat.fuzzy_hashes;
+ result.v1.prob = tcp_session->common.ctx->stat.fuzzy_hashes;
/* Store high qword in value and low qword in flag */
- result.v1.value = (int32_t) ((uint64_t) tcp_session->ctx->stat.fuzzy_hashes >> 32);
- result.v1.flag = (uint32_t) (tcp_session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
+ result.v1.value = (int32_t) ((uint64_t) tcp_session->common.ctx->stat.fuzzy_hashes >> 32);
+ result.v1.flag = (uint32_t) (tcp_session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32);
/* TODO: write reply */
}
else if (cmd->cmd == FUZZY_PING) {
/* TODO: write reply */
}
else {
- if (rspamd_fuzzy_check_write(tcp_session->ctx, tcp_session->addr, tcp_session->key)) {
+ if (rspamd_fuzzy_check_write(tcp_session->common.ctx, tcp_session->common.addr, tcp_session->common.key)) {
/* Check whitelist */
- if (tcp_session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
+ if (tcp_session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
hexbuf, sizeof(hexbuf) - 1);
hexbuf[sizeof(hexbuf) - 1] = '\0';
- if (rspamd_match_hash_map(tcp_session->ctx->skip_hashes,
+ if (rspamd_match_hash_map(tcp_session->common.ctx->skip_hashes,
hexbuf, sizeof(hexbuf) - 1)) {
result.v1.value = 401;
result.v1.prob = 0.0f;
}
}
- if (tcp_session->ctx->weak_ids &&
- kh_get(fuzzy_key_ids_set, tcp_session->ctx->weak_ids, cmd->flag) != kh_end(tcp_session->ctx->weak_ids)) {
+ if (tcp_session->common.ctx->weak_ids &&
+ kh_get(fuzzy_key_ids_set, tcp_session->common.ctx->weak_ids, cmd->flag) != kh_end(tcp_session->common.ctx->weak_ids)) {
/* Flag command as weak */
cmd->version |= RSPAMD_FUZZY_FLAG_WEAK;
}
- if (tcp_session->worker->index == 0 || tcp_session->ctx->peer_fd == -1) {
+ if (tcp_session->common.worker->index == 0 || tcp_session->common.ctx->peer_fd == -1) {
/* Just add to the queue */
up_cmd.is_shingle = is_shingle;
ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- g_array_append_val(tcp_session->ctx->updates_pending, up_cmd);
+ g_array_append_val(tcp_session->common.ctx->updates_pending, up_cmd);
}
else {
/* We need to send request to the peer */
ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- if (!fuzzy_peer_try_send(tcp_session->ctx->peer_fd, up_req)) {
+ if (!fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) {
up_req->io_ev.data = up_req;
ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
- tcp_session->ctx->peer_fd, EV_WRITE);
- ev_io_start(tcp_session->ctx->event_loop, &up_req->io_ev);
+ tcp_session->common.ctx->peer_fd, EV_WRITE);
+ ev_io_start(tcp_session->common.ctx->event_loop, &up_req->io_ev);
}
else {
g_free(up_req);
{
struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
- msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);
+ msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->common.addr), revents);
if (revents & EV_READ) {
ssize_t r;
- r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
+ r = read(tcp_session->common.fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed);
if (r == -1) {
/* Cannot read anything */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s",
- rspamd_inet_address_to_string(tcp_session->addr),
+ rspamd_inet_address_to_string(tcp_session->common.addr),
strerror(errno));
REF_RELEASE(tcp_session);
else if (r == 0) {
/* Got EOF */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
- rspamd_inet_address_to_string(tcp_session->addr));
+ rspamd_inet_address_to_string(tcp_session->common.addr));
REF_RELEASE(tcp_session);
}
else {
if (tcp_session->replies_queue != NULL) {
/* No more replies */
- ev_io_set(w, tcp_session->fd, EV_READ);
+ ev_io_set(w, tcp_session->common.fd, EV_READ);
}
else {
/* Wait for another write readiness */
- ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+ ev_io_set(w, tcp_session->common.fd, EV_WRITE | EV_READ);
}
ev_io_start(loop, w);
}
/* Try to write everything */
- ssize_t r = writev(tcp_session->fd, iov, n);
+ ssize_t r = writev(tcp_session->common.fd, iov, n);
if (n > 32) {
g_free(iov);
if (r == -1) {
/* Cannot write anything */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",
- rspamd_inet_address_to_string(tcp_session->addr),
+ rspamd_inet_address_to_string(tcp_session->common.addr),
strerror(errno));
REF_RELEASE(tcp_session);
if (tcp_session->replies_queue != NULL) {
/* No more replies */
- ev_io_set(w, tcp_session->fd, EV_READ);
+ ev_io_set(w, tcp_session->common.fd, EV_READ);
}
else {
/* Wait for another write readiness */
- ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+ ev_io_set(w, tcp_session->common.fd, EV_WRITE | EV_READ);
}
ev_io_start(loop, w);
{
struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
- msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr));
+ msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->common.addr));
REF_RELEASE(tcp_session);
}
}
tcp_session = g_malloc0(sizeof(*tcp_session));
- tcp_session->addr = addr;
- tcp_session->ctx = ctx;
- tcp_session->fd = nfd;
+ tcp_session->common.addr = addr;
+ tcp_session->common.ctx = ctx;
+ tcp_session->common.fd = nfd;
+ tcp_session->common.worker = worker;
+ tcp_session->common.timestamp = ev_now(ctx->event_loop);
REF_INIT_RETAIN(tcp_session, tcp_session_dtor);
- ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
+ ev_io_init(&tcp_session->common.io, tcp_fuzzy_socket_io, nfd, EV_READ);
ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
tcp_session->tm.data = tcp_session;
- tcp_session->io.data = tcp_session;
+ tcp_session->common.io.data = tcp_session;
ev_timer_start(ctx->event_loop, &tcp_session->tm);
- ev_io_start(ctx->event_loop, &tcp_session->io);
+ ev_io_start(ctx->event_loop, &tcp_session->common.io);
msg_debug_fuzzy_storage("accepted TCP connection from %s", rspamd_inet_address_to_string(addr));
}
{
struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_fuzzy_storage_ctx *ctx;
- struct fuzzy_session *session;
+ struct fuzzy_udp_session *session;
gssize r, msg_len;
uint64_t *nerrors;
struct iovec iovs[MSGVEC_LEN];
}
session = g_malloc0(sizeof(*session));
- REF_INIT_RETAIN(session, fuzzy_session_destroy);
- session->worker = worker;
- session->fd = w->fd;
- session->ctx = ctx;
- session->timestamp = ev_now(ctx->event_loop);
- session->addr = client_addr;
+ REF_INIT_RETAIN(session, fuzzy_udp_session_dtor);
+ session->common.worker = worker;
+ session->common.fd = w->fd;
+ session->common.ctx = ctx;
+ session->common.timestamp = ev_now(ctx->event_loop);
+ session->common.addr = client_addr;
worker->nconns++;
/* Each message can have its length in case of recvmmsg */
if (rspamd_fuzzy_cmd_from_wire(ctx, client_addr, iovs[i].iov_base,
msg_len,
- &session->key,
- session->nm,
- &session->cmd,
- &session->epoch,
- &session->cmd_type,
- &session->extensions)) {
+ &session->common.key,
+ session->common.nm,
+ &session->common.cmd,
+ &session->common.epoch,
+ &session->common.cmd_type,
+ &session->common.extensions)) {
/* Check shingles count sanity */
rspamd_fuzzy_process_udp_session(session);
}
ctx->stat.invalid_requests++;
msg_debug("invalid fuzzy command of size %z received", r);
- if (session->addr) {
- nerrors = rspamd_lru_hash_lookup(session->ctx->errors_ips,
- session->addr, -1);
+ if (session->common.addr) {
+ nerrors = rspamd_lru_hash_lookup(session->common.ctx->errors_ips,
+ session->common.addr, -1);
if (nerrors == NULL) {
nerrors = g_malloc(sizeof(*nerrors));
*nerrors = 1;
- rspamd_lru_hash_insert(session->ctx->errors_ips,
- rspamd_inet_address_copy(session->addr, NULL),
+ rspamd_lru_hash_insert(session->common.ctx->errors_ips,
+ rspamd_inet_address_copy(session->common.addr, NULL),
nerrors, -1, -1);
}
else {