* So the length is always cur_frame & 0x3fff
*/
uint16_t cur_frame_state;
- unsigned int bytes_unprocessed;
+ uint16_t bytes_unprocessed;
+
+ /* Common with UDP session */
+ enum rspamd_fuzzy_epoch epoch;
+ enum fuzzy_cmd_type cmd_type;
+ struct rspamd_fuzzy_shingle_cmd cmd;
+ struct fuzzy_key *key;
+ struct rspamd_fuzzy_cmd_extension *extensions;
+ struct fuzzy_key_stat *ip_stat;
+ unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+ ref_entry_t ref;
struct fuzzy_tcp_reply *replies_queue;
unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
enum fuzzy_cmd_type cmd_type;
int fd;
ev_tstamp timestamp;
- struct ev_io io;
- ref_entry_t ref;
+
struct fuzzy_key *key;
struct rspamd_fuzzy_cmd_extension *extensions;
unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+ struct ev_io io;
+ ref_entry_t ref;
};
struct fuzzy_peer_request {
}
static gboolean
-rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
+rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+ rspamd_inet_addr_t *addr,
+ struct rspamd_worker *worker,
+ ev_tstamp timestamp)
{
rspamd_inet_addr_t *masked;
struct rspamd_leaky_bucket_elt *elt;
- if (!session->addr) {
+ if (!addr) {
return TRUE;
}
- if (session->ctx->ratelimit_whitelist != NULL) {
- if (rspamd_match_radix_map_addr(session->ctx->ratelimit_whitelist,
- session->addr) != NULL) {
+ if (ctx->ratelimit_whitelist != NULL) {
+ if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
+ addr) != NULL) {
return TRUE;
}
}
}
*/
- masked = rspamd_inet_address_copy(session->addr, NULL);
+ masked = rspamd_inet_address_copy(addr, NULL);
if (rspamd_inet_address_get_af(masked) == AF_INET) {
rspamd_inet_address_apply_mask(masked,
- MIN(session->ctx->leaky_bucket_mask, 32));
+ MIN(ctx->leaky_bucket_mask, 32));
}
else {
/* Must be at least /64 */
rspamd_inet_address_apply_mask(masked,
- MIN(MAX(session->ctx->leaky_bucket_mask * 4, 64), 128));
+ MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
}
- elt = rspamd_lru_hash_lookup(session->ctx->ratelimit_buckets, masked,
- (time_t) session->timestamp);
+ elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
+ (time_t) timestamp);
if (elt) {
gboolean ratelimited = FALSE, new_ratelimit = FALSE;
/* There is an issue with the previous logic: the TTL is updated each time
* we see that new bucket. Hence, we need to check the `last` and act accordingly
*/
- if (elt->last < session->timestamp && session->timestamp - elt->last >= session->ctx->leaky_bucket_ttl) {
+ if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
/*
* We reset bucket to it's 90% capacity to allow some requests
* This should cope with the issue when we block an IP network for some burst and never unblock it
*/
- elt->cur = session->ctx->leaky_bucket_burst * 0.9;
- elt->last = session->timestamp;
+ elt->cur = ctx->leaky_bucket_burst * 0.9;
+ elt->last = timestamp;
}
else {
ratelimited = TRUE;
}
else {
/* Update bucket: leak some elements */
- if (elt->last < session->timestamp) {
- elt->cur -= session->ctx->leaky_bucket_rate * (session->timestamp - elt->last);
- elt->last = session->timestamp;
+ if (elt->last < timestamp) {
+ elt->cur -= ctx->leaky_bucket_rate * (timestamp - elt->last);
+ elt->last = timestamp;
if (elt->cur < 0) {
elt->cur = 0;
}
}
else {
- elt->last = session->timestamp;
+ elt->last = timestamp;
}
/* Check the bucket */
- if (elt->cur >= session->ctx->leaky_bucket_burst) {
+ if (elt->cur >= ctx->leaky_bucket_burst) {
msg_info("ratelimiting %s (%s), %.1f max elts",
- rspamd_inet_address_to_string(session->addr),
+ rspamd_inet_address_to_string(addr),
rspamd_inet_address_to_string(masked),
- session->ctx->leaky_bucket_burst);
+ ctx->leaky_bucket_burst);
elt->cur = NAN;
new_ratelimit = TRUE;
ratelimited = TRUE;
}
if (ratelimited) {
- rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
}
if (new_ratelimit) {
if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
msg_debug("propagating blocked address to other workers");
- rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+ rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
}
else {
- msg_err("bad address length: %d, expected to be %d", (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+ msg_err("bad address length: %d, expected to be %d",
+ (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
}
}
}
elt = g_malloc(sizeof(*elt));
elt->addr = masked; /* transfer ownership */
elt->cur = 1;
- elt->last = session->timestamp;
+ elt->last = timestamp;
- rspamd_lru_hash_insert(session->ctx->ratelimit_buckets,
+ rspamd_lru_hash_insert(ctx->ratelimit_buckets,
masked,
elt,
- session->timestamp,
- session->ctx->leaky_bucket_ttl);
+ timestamp,
+ ctx->leaky_bucket_ttl);
}
return TRUE;
/* result timestamp */
lua_pushinteger(L, result->ts);
/* TODO: add additional data maybe (encryption, pubkey, etc) */
- rspamd_fuzzy_extensions_tolua(L, session);
+ rspamd_fuzzy_extensions_tolua(L, session->extensions);
if ((ret = lua_pcall(L, 9, LUA_MULTRET, err_idx)) != 0) {
msg_err("call to lua_post_handler lua "
if (session->ctx->ratelimit_buckets) {
if (session->ctx->ratelimit_log_only) {
- (void) rspamd_fuzzy_check_ratelimit(session); /* Check but ignore */
+ (void) rspamd_fuzzy_check_ratelimit(session->ctx,
+ session->addr,
+ session->worker,
+ session->timestamp); /* Check but ignore */
}
else {
- can_continue = rspamd_fuzzy_check_ratelimit(session);
+ can_continue = rspamd_fuzzy_check_ratelimit(session->ctx,
+ session->addr,
+ session->worker,
+ session->timestamp);
}
}
rspamd_inet_address_free(tcp_session->addr);
}
+ tcp_session->worker->nconns--;
+
+ if (tcp_session->ip_stat) {
+ REF_RELEASE(tcp_session->ip_stat);
+ }
+
if (tcp_session->ctx->event_loop) {
ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm);
ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io);
g_free(tcp_session);
}
+static bool
+rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned char *buf, size_t buflen)
+{
+ gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
+ struct rspamd_fuzzy_reply result;
+ struct fuzzy_peer_cmd up_cmd;
+ struct fuzzy_peer_request *up_req;
+ struct fuzzy_key_stat *ip_stat = NULL;
+ char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
+ rspamd_inet_addr_t *naddr;
+ gpointer ptr;
+ int send_flags = 0;
+
+ if (!rspamd_fuzzy_cmd_from_wire(tcp_session->ctx, tcp_session->addr, buf,
+ buflen,
+ &tcp_session->key,
+ tcp_session->nm,
+ &tcp_session->cmd,
+ &tcp_session->epoch,
+ &tcp_session->cmd_type,
+ &tcp_session->extensions)) {
+ /* Discard input */
+ tcp_session->ctx->stat.invalid_requests++;
+ msg_debug("invalid fuzzy command of size %z received", buflen);
+
+ if (tcp_session->addr) {
+ uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->ctx->errors_ips,
+ tcp_session->addr, -1);
+
+ if (nerrors == NULL) {
+ nerrors = g_malloc(sizeof(*nerrors));
+ *nerrors = 1;
+ rspamd_lru_hash_insert(tcp_session->ctx->errors_ips,
+ rspamd_inet_address_copy(tcp_session->addr, NULL),
+ nerrors, -1, -1);
+ }
+ else {
+ *nerrors = *nerrors + 1;
+ }
+ }
+
+ return false;
+ }
+
+ struct rspamd_fuzzy_cmd *cmd = &tcp_session->cmd.basic;
+ size_t up_len = 0;
+
+ switch (tcp_session->cmd_type) {
+ case CMD_NORMAL:
+ up_len = sizeof(tcp_session->cmd.basic);
+ break;
+ case CMD_SHINGLE:
+ up_len = sizeof(tcp_session->cmd);
+ is_shingle = TRUE;
+ send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
+ break;
+ case CMD_ENCRYPTED_NORMAL:
+ up_len = sizeof(tcp_session->cmd.basic);
+ encrypted = TRUE;
+ send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+ break;
+ case CMD_ENCRYPTED_SHINGLE:
+ up_len = sizeof(tcp_session->cmd);
+ encrypted = TRUE;
+ is_shingle = TRUE;
+ send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
+ break;
+ default:
+ msg_err("invalid command type: %d", tcp_session->cmd_type);
+ return false;
+ }
+
+ memset(&result, 0, sizeof(result));
+ memcpy(result.digest, cmd->digest, sizeof(result.digest));
+ result.v1.flag = cmd->flag;
+ result.v1.tag = cmd->tag;
+
+ if (tcp_session->ctx->lua_pre_handler_cbref != -1) {
+ /* Start lua pre handler */
+ lua_State *L = tcp_session->ctx->cfg->lua_state;
+ int err_idx, ret;
+
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+ /* Preallocate stack (small opt) */
+ lua_checkstack(L, err_idx + 5);
+ /* function */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, tcp_session->ctx->lua_pre_handler_cbref);
+ /* client IP */
+ rspamd_lua_ip_push(L, tcp_session->addr);
+ /* client command */
+ lua_pushinteger(L, cmd->cmd);
+ /* command value (push as rspamd_text) */
+ (void) lua_new_text(L, cmd->digest, sizeof(cmd->digest), FALSE);
+ /* is shingle */
+ lua_pushboolean(L, is_shingle);
+ /* TODO: add additional data maybe (encryption, pubkey, etc) */
+ rspamd_fuzzy_extensions_tolua(L, tcp_session->extensions);
+
+ if ((ret = lua_pcall(L, 5, LUA_MULTRET, err_idx)) != 0) {
+ msg_err("call to lua_pre_handler lua "
+ "script failed (%d): %s",
+ ret, lua_tostring(L, -1));
+
+ return false;
+ }
+ else {
+ /* Return values order:
+ * the first reply will be on err_idx + 1
+ * if it is true, then we need to read the former ones:
+ * 2-nd will be reply code
+ * 3-rd will be probability (or 0.0 if missing)
+ */
+ ret = lua_toboolean(L, err_idx + 1);
+
+ if (ret) {
+ /* Artificial reply */
+ result.v1.value = lua_tointeger(L, err_idx + 2);
+
+ if (lua_isnumber(L, err_idx + 3)) {
+ result.v1.prob = lua_tonumber(L, err_idx + 3);
+ }
+ else {
+ result.v1.prob = 0.0f;
+ }
+
+ lua_settop(L, 0);
+ /* TODO: write reply */
+
+ return true;
+ }
+ }
+
+ lua_settop(L, 0);
+ }
+
+
+ if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
+ result.v1.value = 500;
+ result.v1.prob = 0.0f;
+ /* TODO: write reply */
+
+ return true;
+ }
+
+ if (tcp_session->ctx->encrypted_only && !encrypted) {
+ /* Do not accept unencrypted commands */
+ result.v1.value = 403;
+ result.v1.prob = 0.0f;
+ /* TODO: write reply */
+
+ return true;
+ }
+
+ if (tcp_session->key && tcp_session->addr) {
+ ip_stat = rspamd_lru_hash_lookup(tcp_session->key->stat->last_ips,
+ tcp_session->addr, -1);
+
+ if (ip_stat == NULL) {
+ naddr = rspamd_inet_address_copy(tcp_session->addr, NULL);
+ ip_stat = g_malloc0(sizeof(*ip_stat));
+ REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor);
+ rspamd_lru_hash_insert(tcp_session->key->stat->last_ips,
+ naddr, ip_stat, -1, 0);
+ }
+
+ REF_RETAIN(ip_stat);
+ tcp_session->ip_stat = ip_stat;
+ }
+
+ if (cmd->cmd == FUZZY_CHECK) {
+ bool can_continue = true;
+
+ if (tcp_session->ctx->ratelimit_buckets) {
+ if (tcp_session->ctx->ratelimit_log_only) {
+ (void) rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
+ tcp_session->worker,
+ ev_now(tcp_session->ctx->event_loop)); /* Check but ignore */
+ }
+ else {
+ can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
+ tcp_session->worker,
+ ev_now(tcp_session->ctx->event_loop));
+ }
+ }
+
+ if (can_continue) {
+ REF_RETAIN(tcp_session);
+ /* TODO: use a different callback */
+ rspamd_fuzzy_backend_check(tcp_session->ctx->backend, cmd,
+ rspamd_fuzzy_check_callback, tcp_session);
+ }
+ else {
+ result.v1.value = 403;
+ result.v1.prob = 0.0f;
+ result.v1.flag = 0;
+ /* TODO: write reply */
+
+ return false;
+ }
+ }
+ else if (cmd->cmd == FUZZY_STAT) {
+ /* Store approximation (if needed) */
+ result.v1.prob = tcp_session->ctx->stat.fuzzy_hashes;
+ /* Store high qword in value and low qword in flag */
+ result.v1.value = (int32_t) ((uint64_t) tcp_session->ctx->stat.fuzzy_hashes >> 32);
+ result.v1.flag = (uint32_t) (tcp_session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
+ /* TODO: write reply */
+ }
+ else if (cmd->cmd == FUZZY_PING) {
+ result.v1.prob = 1.0f;
+ result.v1.value = cmd->value;
+ /* TODO: write reply */
+ }
+ else {
+ if (rspamd_fuzzy_check_write(tcp_session->ctx, tcp_session->addr, tcp_session->key)) {
+ /* Check whitelist */
+ if (tcp_session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
+ rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
+ hexbuf, sizeof(hexbuf) - 1);
+ hexbuf[sizeof(hexbuf) - 1] = '\0';
+
+ if (rspamd_match_hash_map(tcp_session->ctx->skip_hashes,
+ hexbuf, sizeof(hexbuf) - 1)) {
+ result.v1.value = 401;
+ result.v1.prob = 0.0f;
+
+ goto reply;
+ }
+ }
+
+ if (tcp_session->ctx->weak_ids &&
+ kh_get(fuzzy_key_ids_set, tcp_session->ctx->weak_ids, cmd->flag) != kh_end(tcp_session->ctx->weak_ids)) {
+ /* Flag command as weak */
+ cmd->version |= RSPAMD_FUZZY_FLAG_WEAK;
+ }
+
+ if (tcp_session->worker->index == 0 || tcp_session->ctx->peer_fd == -1) {
+ /* Just add to the queue */
+ up_cmd.is_shingle = is_shingle;
+ ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal;
+ memcpy(ptr, cmd, up_len);
+ g_array_append_val(tcp_session->ctx->updates_pending, up_cmd);
+ }
+ else {
+ /* We need to send request to the peer */
+ up_req = g_malloc0(sizeof(*up_req));
+ up_req->cmd.is_shingle = is_shingle;
+ ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
+ memcpy(ptr, cmd, up_len);
+
+ if (!fuzzy_peer_try_send(tcp_session->ctx->peer_fd, up_req)) {
+ up_req->io_ev.data = up_req;
+ ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+ tcp_session->ctx->peer_fd, EV_WRITE);
+ ev_io_start(tcp_session->ctx->event_loop, &up_req->io_ev);
+ }
+ else {
+ g_free(up_req);
+ }
+ }
+
+ result.v1.value = 0;
+ result.v1.prob = 1.0f;
+ }
+ else {
+ result.v1.value = 403;
+ result.v1.prob = 0.0f;
+ }
+ reply:
+ /* TODO: write reply */
+ }
+
+ return true;
+}
+
static bool
fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
{
rspamd_inet_address_to_string(tcp_session->addr),
strerror(errno));
- tcp_session_dtor(tcp_session);
+ REF_RELEASE(tcp_session);
}
else if (r == 0) {
/* Got EOF */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
rspamd_inet_address_to_string(tcp_session->addr));
- tcp_session_dtor(tcp_session);
+ REF_RELEASE(tcp_session);
}
else {
if (!fuzzy_tcp_process_input(tcp_session, r)) {
- tcp_session_dtor(tcp_session);
+ REF_RELEASE(tcp_session);
}
else {
if (tcp_session->replies_queue != NULL) {
rspamd_inet_address_to_string(tcp_session->addr),
strerror(errno));
- tcp_session_dtor(tcp_session);
+ REF_RELEASE(tcp_session);
}
else if (r == 0) {
/* Fake EV_WRITE? */
msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr));
- tcp_session_dtor(tcp_session);
+ REF_RELEASE(tcp_session);
}
static void
tcp_session->addr = addr;
tcp_session->ctx = ctx;
tcp_session->fd = nfd;
+ REF_INIT_RETAIN(tcp_session, tcp_session_dtor);
ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
tcp_session->tm.data = tcp_session;