]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Project] Add some basic tcp session handling
authorVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 29 Apr 2024 18:23:27 +0000 (19:23 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 25 Jun 2024 13:27:55 +0000 (14:27 +0100)
src/fuzzy_storage.c

index 52baaae2472e15bc9b6bec811033f9cb2f6469a0..ce8b205a7fd73eeac46597528afee2671ce6be50 100644 (file)
@@ -253,7 +253,18 @@ struct fuzzy_tcp_session {
         * So the length is always cur_frame & 0x3fff
         */
        uint16_t cur_frame_state;
-       unsigned int bytes_unprocessed;
+       uint16_t bytes_unprocessed;
+
+       /* Common with UDP session */
+       enum rspamd_fuzzy_epoch epoch;
+       enum fuzzy_cmd_type cmd_type;
+       struct rspamd_fuzzy_shingle_cmd cmd;
+       struct fuzzy_key *key;
+       struct rspamd_fuzzy_cmd_extension *extensions;
+       struct fuzzy_key_stat *ip_stat;
+       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+       ref_entry_t ref;
 
        struct fuzzy_tcp_reply *replies_queue;
        unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
@@ -272,11 +283,13 @@ struct fuzzy_session {
        enum fuzzy_cmd_type cmd_type;
        int fd;
        ev_tstamp timestamp;
-       struct ev_io io;
-       ref_entry_t ref;
+
        struct fuzzy_key *key;
        struct rspamd_fuzzy_cmd_extension *extensions;
        unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+
+       struct ev_io io;
+       ref_entry_t ref;
 };
 
 struct fuzzy_peer_request {
@@ -447,18 +460,21 @@ ucl_keymap_dtor_cb(struct map_cb_data *data)
 }
 
 static gboolean
-rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
+rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx,
+                                                        rspamd_inet_addr_t *addr,
+                                                        struct rspamd_worker *worker,
+                                                        ev_tstamp timestamp)
 {
        rspamd_inet_addr_t *masked;
        struct rspamd_leaky_bucket_elt *elt;
 
-       if (!session->addr) {
+       if (!addr) {
                return TRUE;
        }
 
-       if (session->ctx->ratelimit_whitelist != NULL) {
-               if (rspamd_match_radix_map_addr(session->ctx->ratelimit_whitelist,
-                                                                               session->addr) != NULL) {
+       if (ctx->ratelimit_whitelist != NULL) {
+               if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist,
+                                                                               addr) != NULL) {
                        return TRUE;
                }
        }
@@ -469,20 +485,20 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
        }
        */
 
-       masked = rspamd_inet_address_copy(session->addr, NULL);
+       masked = rspamd_inet_address_copy(addr, NULL);
 
        if (rspamd_inet_address_get_af(masked) == AF_INET) {
                rspamd_inet_address_apply_mask(masked,
-                                                                          MIN(session->ctx->leaky_bucket_mask, 32));
+                                                                          MIN(ctx->leaky_bucket_mask, 32));
        }
        else {
                /* Must be at least /64 */
                rspamd_inet_address_apply_mask(masked,
-                                                                          MIN(MAX(session->ctx->leaky_bucket_mask * 4, 64), 128));
+                                                                          MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128));
        }
 
-       elt = rspamd_lru_hash_lookup(session->ctx->ratelimit_buckets, masked,
-                                                                (time_t) session->timestamp);
+       elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked,
+                                                                (time_t) timestamp);
 
        if (elt) {
                gboolean ratelimited = FALSE, new_ratelimit = FALSE;
@@ -491,13 +507,13 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                        /* There is an issue with the previous logic: the TTL is updated each time
                         * we see that new bucket. Hence, we need to check the `last` and act accordingly
                         */
-                       if (elt->last < session->timestamp && session->timestamp - elt->last >= session->ctx->leaky_bucket_ttl) {
+                       if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) {
                                /*
                                 * We reset bucket to it's 90% capacity to allow some requests
                                 * This should cope with the issue when we block an IP network for some burst and never unblock it
                                 */
-                               elt->cur = session->ctx->leaky_bucket_burst * 0.9;
-                               elt->last = session->timestamp;
+                               elt->cur = ctx->leaky_bucket_burst * 0.9;
+                               elt->last = timestamp;
                        }
                        else {
                                ratelimited = TRUE;
@@ -505,25 +521,25 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                }
                else {
                        /* Update bucket: leak some elements */
-                       if (elt->last < session->timestamp) {
-                               elt->cur -= session->ctx->leaky_bucket_rate * (session->timestamp - elt->last);
-                               elt->last = session->timestamp;
+                       if (elt->last < timestamp) {
+                               elt->cur -= ctx->leaky_bucket_rate * (timestamp - elt->last);
+                               elt->last = timestamp;
 
                                if (elt->cur < 0) {
                                        elt->cur = 0;
                                }
                        }
                        else {
-                               elt->last = session->timestamp;
+                               elt->last = timestamp;
                        }
 
                        /* Check the bucket */
-                       if (elt->cur >= session->ctx->leaky_bucket_burst) {
+                       if (elt->cur >= ctx->leaky_bucket_burst) {
 
                                msg_info("ratelimiting %s (%s), %.1f max elts",
-                                                rspamd_inet_address_to_string(session->addr),
+                                                rspamd_inet_address_to_string(addr),
                                                 rspamd_inet_address_to_string(masked),
-                                                session->ctx->leaky_bucket_burst);
+                                                ctx->leaky_bucket_burst);
                                elt->cur = NAN;
                                new_ratelimit = TRUE;
                                ratelimited = TRUE;
@@ -534,7 +550,7 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                }
 
                if (ratelimited) {
-                       rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
+                       rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
                }
 
                if (new_ratelimit) {
@@ -550,10 +566,11 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                                if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
                                        memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
                                        msg_debug("propagating blocked address to other workers");
-                                       rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+                                       rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
                                }
                                else {
-                                       msg_err("bad address length: %d, expected to be %d", (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+                                       msg_err("bad address length: %d, expected to be %d",
+                                                       (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
                                }
                        }
                }
@@ -567,13 +584,13 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session)
                elt = g_malloc(sizeof(*elt));
                elt->addr = masked; /* transfer ownership */
                elt->cur = 1;
-               elt->last = session->timestamp;
+               elt->last = timestamp;
 
-               rspamd_lru_hash_insert(session->ctx->ratelimit_buckets,
+               rspamd_lru_hash_insert(ctx->ratelimit_buckets,
                                                           masked,
                                                           elt,
-                                                          session->timestamp,
-                                                          session->ctx->leaky_bucket_ttl);
+                                                          timestamp,
+                                                          ctx->leaky_bucket_ttl);
        }
 
        return TRUE;
@@ -1285,7 +1302,7 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
                /* result timestamp */
                lua_pushinteger(L, result->ts);
                /* TODO: add additional data maybe (encryption, pubkey, etc) */
-               rspamd_fuzzy_extensions_tolua(L, session);
+               rspamd_fuzzy_extensions_tolua(L, session->extensions);
 
                if ((ret = lua_pcall(L, 9, LUA_MULTRET, err_idx)) != 0) {
                        msg_err("call to lua_post_handler lua "
@@ -1536,10 +1553,16 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
 
                if (session->ctx->ratelimit_buckets) {
                        if (session->ctx->ratelimit_log_only) {
-                               (void) rspamd_fuzzy_check_ratelimit(session); /* Check but ignore */
+                               (void) rspamd_fuzzy_check_ratelimit(session->ctx,
+                                                                                                       session->addr,
+                                                                                                       session->worker,
+                                                                                                       session->timestamp); /* Check but ignore */
                        }
                        else {
-                               can_continue = rspamd_fuzzy_check_ratelimit(session);
+                               can_continue = rspamd_fuzzy_check_ratelimit(session->ctx,
+                                                                                                                       session->addr,
+                                                                                                                       session->worker,
+                                                                                                                       session->timestamp);
                        }
                }
 
@@ -2038,6 +2061,12 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
                rspamd_inet_address_free(tcp_session->addr);
        }
 
+       tcp_session->worker->nconns--;
+
+       if (tcp_session->ip_stat) {
+               REF_RELEASE(tcp_session->ip_stat);
+       }
+
        if (tcp_session->ctx->event_loop) {
                ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm);
                ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io);
@@ -2052,6 +2081,282 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
        g_free(tcp_session);
 }
 
+static bool
+rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned char *buf, size_t buflen)
+{
+       gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
+       struct rspamd_fuzzy_reply result;
+       struct fuzzy_peer_cmd up_cmd;
+       struct fuzzy_peer_request *up_req;
+       struct fuzzy_key_stat *ip_stat = NULL;
+       char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
+       rspamd_inet_addr_t *naddr;
+       gpointer ptr;
+       int send_flags = 0;
+
+       if (!rspamd_fuzzy_cmd_from_wire(tcp_session->ctx, tcp_session->addr, buf,
+                                                                       buflen,
+                                                                       &tcp_session->key,
+                                                                       tcp_session->nm,
+                                                                       &tcp_session->cmd,
+                                                                       &tcp_session->epoch,
+                                                                       &tcp_session->cmd_type,
+                                                                       &tcp_session->extensions)) {
+               /* Discard input */
+               tcp_session->ctx->stat.invalid_requests++;
+               msg_debug("invalid fuzzy command of size %z received", buflen);
+
+               if (tcp_session->addr) {
+                       uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->ctx->errors_ips,
+                                                                                                          tcp_session->addr, -1);
+
+                       if (nerrors == NULL) {
+                               nerrors = g_malloc(sizeof(*nerrors));
+                               *nerrors = 1;
+                               rspamd_lru_hash_insert(tcp_session->ctx->errors_ips,
+                                                                          rspamd_inet_address_copy(tcp_session->addr, NULL),
+                                                                          nerrors, -1, -1);
+                       }
+                       else {
+                               *nerrors = *nerrors + 1;
+                       }
+               }
+
+               return false;
+       }
+
+       struct rspamd_fuzzy_cmd *cmd = &tcp_session->cmd.basic;
+       size_t up_len = 0;
+
+       switch (tcp_session->cmd_type) {
+       case CMD_NORMAL:
+               up_len = sizeof(tcp_session->cmd.basic);
+               break;
+       case CMD_SHINGLE:
+               up_len = sizeof(tcp_session->cmd);
+               is_shingle = TRUE;
+               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
+               break;
+       case CMD_ENCRYPTED_NORMAL:
+               up_len = sizeof(tcp_session->cmd.basic);
+               encrypted = TRUE;
+               send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+               break;
+       case CMD_ENCRYPTED_SHINGLE:
+               up_len = sizeof(tcp_session->cmd);
+               encrypted = TRUE;
+               is_shingle = TRUE;
+               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
+               break;
+       default:
+               msg_err("invalid command type: %d", tcp_session->cmd_type);
+               return false;
+       }
+
+       memset(&result, 0, sizeof(result));
+       memcpy(result.digest, cmd->digest, sizeof(result.digest));
+       result.v1.flag = cmd->flag;
+       result.v1.tag = cmd->tag;
+
+       if (tcp_session->ctx->lua_pre_handler_cbref != -1) {
+               /* Start lua pre handler */
+               lua_State *L = tcp_session->ctx->cfg->lua_state;
+               int err_idx, ret;
+
+               lua_pushcfunction(L, &rspamd_lua_traceback);
+               err_idx = lua_gettop(L);
+               /* Preallocate stack (small opt) */
+               lua_checkstack(L, err_idx + 5);
+               /* function */
+               lua_rawgeti(L, LUA_REGISTRYINDEX, tcp_session->ctx->lua_pre_handler_cbref);
+               /* client IP */
+               rspamd_lua_ip_push(L, tcp_session->addr);
+               /* client command */
+               lua_pushinteger(L, cmd->cmd);
+               /* command value (push as rspamd_text) */
+               (void) lua_new_text(L, cmd->digest, sizeof(cmd->digest), FALSE);
+               /* is shingle */
+               lua_pushboolean(L, is_shingle);
+               /* TODO: add additional data maybe (encryption, pubkey, etc) */
+               rspamd_fuzzy_extensions_tolua(L, tcp_session->extensions);
+
+               if ((ret = lua_pcall(L, 5, LUA_MULTRET, err_idx)) != 0) {
+                       msg_err("call to lua_pre_handler lua "
+                                       "script failed (%d): %s",
+                                       ret, lua_tostring(L, -1));
+
+                       return false;
+               }
+               else {
+                       /* Return values order:
+                        * the first reply will be on err_idx + 1
+                        * if it is true, then we need to read the former ones:
+                        * 2-nd will be reply code
+                        * 3-rd will be probability (or 0.0 if missing)
+                        */
+                       ret = lua_toboolean(L, err_idx + 1);
+
+                       if (ret) {
+                               /* Artificial reply */
+                               result.v1.value = lua_tointeger(L, err_idx + 2);
+
+                               if (lua_isnumber(L, err_idx + 3)) {
+                                       result.v1.prob = lua_tonumber(L, err_idx + 3);
+                               }
+                               else {
+                                       result.v1.prob = 0.0f;
+                               }
+
+                               lua_settop(L, 0);
+                               /* TODO: write reply */
+
+                               return true;
+                       }
+               }
+
+               lua_settop(L, 0);
+       }
+
+
+       if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
+               result.v1.value = 500;
+               result.v1.prob = 0.0f;
+               /* TODO: write reply */
+
+               return true;
+       }
+
+       if (tcp_session->ctx->encrypted_only && !encrypted) {
+               /* Do not accept unencrypted commands */
+               result.v1.value = 403;
+               result.v1.prob = 0.0f;
+               /* TODO: write reply */
+
+               return true;
+       }
+
+       if (tcp_session->key && tcp_session->addr) {
+               ip_stat = rspamd_lru_hash_lookup(tcp_session->key->stat->last_ips,
+                                                                                tcp_session->addr, -1);
+
+               if (ip_stat == NULL) {
+                       naddr = rspamd_inet_address_copy(tcp_session->addr, NULL);
+                       ip_stat = g_malloc0(sizeof(*ip_stat));
+                       REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor);
+                       rspamd_lru_hash_insert(tcp_session->key->stat->last_ips,
+                                                                  naddr, ip_stat, -1, 0);
+               }
+
+               REF_RETAIN(ip_stat);
+               tcp_session->ip_stat = ip_stat;
+       }
+
+       if (cmd->cmd == FUZZY_CHECK) {
+               bool can_continue = true;
+
+               if (tcp_session->ctx->ratelimit_buckets) {
+                       if (tcp_session->ctx->ratelimit_log_only) {
+                               (void) rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
+                                                                                                       tcp_session->worker,
+                                                                                                       ev_now(tcp_session->ctx->event_loop)); /* Check but ignore */
+                       }
+                       else {
+                               can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
+                                                                                                                       tcp_session->worker,
+                                                                                                                       ev_now(tcp_session->ctx->event_loop));
+                       }
+               }
+
+               if (can_continue) {
+                       REF_RETAIN(tcp_session);
+                       /* TODO: use a different callback */
+                       rspamd_fuzzy_backend_check(tcp_session->ctx->backend, cmd,
+                                                                          rspamd_fuzzy_check_callback, tcp_session);
+               }
+               else {
+                       result.v1.value = 403;
+                       result.v1.prob = 0.0f;
+                       result.v1.flag = 0;
+                       /* TODO: write reply */
+
+                       return false;
+               }
+       }
+       else if (cmd->cmd == FUZZY_STAT) {
+               /* Store approximation (if needed) */
+               result.v1.prob = tcp_session->ctx->stat.fuzzy_hashes;
+               /* Store high qword in value and low qword in flag */
+               result.v1.value = (int32_t) ((uint64_t) tcp_session->ctx->stat.fuzzy_hashes >> 32);
+               result.v1.flag = (uint32_t) (tcp_session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
+               /* TODO: write reply */
+       }
+       else if (cmd->cmd == FUZZY_PING) {
+               result.v1.prob = 1.0f;
+               result.v1.value = cmd->value;
+               /* TODO: write reply */
+       }
+       else {
+               if (rspamd_fuzzy_check_write(tcp_session->ctx, tcp_session->addr, tcp_session->key)) {
+                       /* Check whitelist */
+                       if (tcp_session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
+                               rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
+                                                                         hexbuf, sizeof(hexbuf) - 1);
+                               hexbuf[sizeof(hexbuf) - 1] = '\0';
+
+                               if (rspamd_match_hash_map(tcp_session->ctx->skip_hashes,
+                                                                                 hexbuf, sizeof(hexbuf) - 1)) {
+                                       result.v1.value = 401;
+                                       result.v1.prob = 0.0f;
+
+                                       goto reply;
+                               }
+                       }
+
+                       if (tcp_session->ctx->weak_ids &&
+                               kh_get(fuzzy_key_ids_set, tcp_session->ctx->weak_ids, cmd->flag) != kh_end(tcp_session->ctx->weak_ids)) {
+                               /* Flag command as weak */
+                               cmd->version |= RSPAMD_FUZZY_FLAG_WEAK;
+                       }
+
+                       if (tcp_session->worker->index == 0 || tcp_session->ctx->peer_fd == -1) {
+                               /* Just add to the queue */
+                               up_cmd.is_shingle = is_shingle;
+                               ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal;
+                               memcpy(ptr, cmd, up_len);
+                               g_array_append_val(tcp_session->ctx->updates_pending, up_cmd);
+                       }
+                       else {
+                               /* We need to send request to the peer */
+                               up_req = g_malloc0(sizeof(*up_req));
+                               up_req->cmd.is_shingle = is_shingle;
+                               ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
+                               memcpy(ptr, cmd, up_len);
+
+                               if (!fuzzy_peer_try_send(tcp_session->ctx->peer_fd, up_req)) {
+                                       up_req->io_ev.data = up_req;
+                                       ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+                                                          tcp_session->ctx->peer_fd, EV_WRITE);
+                                       ev_io_start(tcp_session->ctx->event_loop, &up_req->io_ev);
+                               }
+                               else {
+                                       g_free(up_req);
+                               }
+                       }
+
+                       result.v1.value = 0;
+                       result.v1.prob = 1.0f;
+               }
+               else {
+                       result.v1.value = 403;
+                       result.v1.prob = 0.0f;
+               }
+       reply:
+               /* TODO: write reply */
+       }
+
+       return true;
+}
+
 static bool
 fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
 {
@@ -2128,18 +2433,18 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                                                                        rspamd_inet_address_to_string(tcp_session->addr),
                                                                        strerror(errno));
 
-                       tcp_session_dtor(tcp_session);
+                       REF_RELEASE(tcp_session);
                }
                else if (r == 0) {
                        /* Got EOF */
                        msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
                                                                        rspamd_inet_address_to_string(tcp_session->addr));
 
-                       tcp_session_dtor(tcp_session);
+                       REF_RELEASE(tcp_session);
                }
                else {
                        if (!fuzzy_tcp_process_input(tcp_session, r)) {
-                               tcp_session_dtor(tcp_session);
+                               REF_RELEASE(tcp_session);
                        }
                        else {
                                if (tcp_session->replies_queue != NULL) {
@@ -2191,7 +2496,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                                                                                rspamd_inet_address_to_string(tcp_session->addr),
                                                                                strerror(errno));
 
-                               tcp_session_dtor(tcp_session);
+                               REF_RELEASE(tcp_session);
                        }
                        else if (r == 0) {
                                /* Fake EV_WRITE? */
@@ -2238,7 +2543,7 @@ tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents)
 
        msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr));
 
-       tcp_session_dtor(tcp_session);
+       REF_RELEASE(tcp_session);
 }
 
 static void
@@ -2272,6 +2577,7 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
        tcp_session->addr = addr;
        tcp_session->ctx = ctx;
        tcp_session->fd = nfd;
+       REF_INIT_RETAIN(tcp_session, tcp_session_dtor);
        ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
        ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
        tcp_session->tm.data = tcp_session;