From: Vsevolod Stakhov Date: Mon, 29 Apr 2024 18:23:27 +0000 (+0100) Subject: [Project] Add some basic tcp session handling X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=17254fe79f4df3c1892903f2fe04d39b6fbd815a;p=thirdparty%2Frspamd.git [Project] Add some basic tcp session handling --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 52baaae247..ce8b205a7f 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -253,7 +253,18 @@ struct fuzzy_tcp_session { * So the length is always cur_frame & 0x3fff */ uint16_t cur_frame_state; - unsigned int bytes_unprocessed; + uint16_t bytes_unprocessed; + + /* Common with UDP session */ + enum rspamd_fuzzy_epoch epoch; + enum fuzzy_cmd_type cmd_type; + struct rspamd_fuzzy_shingle_cmd cmd; + struct fuzzy_key *key; + struct rspamd_fuzzy_cmd_extension *extensions; + struct fuzzy_key_stat *ip_stat; + unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; + + ref_entry_t ref; struct fuzzy_tcp_reply *replies_queue; unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH]; @@ -272,11 +283,13 @@ struct fuzzy_session { enum fuzzy_cmd_type cmd_type; int fd; ev_tstamp timestamp; - struct ev_io io; - ref_entry_t ref; + struct fuzzy_key *key; struct rspamd_fuzzy_cmd_extension *extensions; unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; + + struct ev_io io; + ref_entry_t ref; }; struct fuzzy_peer_request { @@ -447,18 +460,21 @@ ucl_keymap_dtor_cb(struct map_cb_data *data) } static gboolean -rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) +rspamd_fuzzy_check_ratelimit(struct rspamd_fuzzy_storage_ctx *ctx, + rspamd_inet_addr_t *addr, + struct rspamd_worker *worker, + ev_tstamp timestamp) { rspamd_inet_addr_t *masked; struct rspamd_leaky_bucket_elt *elt; - if (!session->addr) { + if (!addr) { return TRUE; } - if (session->ctx->ratelimit_whitelist != NULL) { - if (rspamd_match_radix_map_addr(session->ctx->ratelimit_whitelist, - session->addr) != NULL) { + if (ctx->ratelimit_whitelist != NULL) { + if (rspamd_match_radix_map_addr(ctx->ratelimit_whitelist, + addr) != NULL) { return TRUE; } } @@ -469,20 +485,20 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) } */ - masked = rspamd_inet_address_copy(session->addr, NULL); + masked = rspamd_inet_address_copy(addr, NULL); if (rspamd_inet_address_get_af(masked) == AF_INET) { rspamd_inet_address_apply_mask(masked, - MIN(session->ctx->leaky_bucket_mask, 32)); + MIN(ctx->leaky_bucket_mask, 32)); } else { /* Must be at least /64 */ rspamd_inet_address_apply_mask(masked, - MIN(MAX(session->ctx->leaky_bucket_mask * 4, 64), 128)); + MIN(MAX(ctx->leaky_bucket_mask * 4, 64), 128)); } - elt = rspamd_lru_hash_lookup(session->ctx->ratelimit_buckets, masked, - (time_t) session->timestamp); + elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, masked, + (time_t) timestamp); if (elt) { gboolean ratelimited = FALSE, new_ratelimit = FALSE; @@ -491,13 +507,13 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) /* There is an issue with the previous logic: the TTL is updated each time * we see that new bucket. Hence, we need to check the `last` and act accordingly */ - if (elt->last < session->timestamp && session->timestamp - elt->last >= session->ctx->leaky_bucket_ttl) { + if (elt->last < timestamp && timestamp - elt->last >= ctx->leaky_bucket_ttl) { /* * We reset bucket to it's 90% capacity to allow some requests * This should cope with the issue when we block an IP network for some burst and never unblock it */ - elt->cur = session->ctx->leaky_bucket_burst * 0.9; - elt->last = session->timestamp; + elt->cur = ctx->leaky_bucket_burst * 0.9; + elt->last = timestamp; } else { ratelimited = TRUE; @@ -505,25 +521,25 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) } else { /* Update bucket: leak some elements */ - if (elt->last < session->timestamp) { - elt->cur -= session->ctx->leaky_bucket_rate * (session->timestamp - elt->last); - elt->last = session->timestamp; + if (elt->last < timestamp) { + elt->cur -= ctx->leaky_bucket_rate * (timestamp - elt->last); + elt->last = timestamp; if (elt->cur < 0) { elt->cur = 0; } } else { - elt->last = session->timestamp; + elt->last = timestamp; } /* Check the bucket */ - if (elt->cur >= session->ctx->leaky_bucket_burst) { + if (elt->cur >= ctx->leaky_bucket_burst) { msg_info("ratelimiting %s (%s), %.1f max elts", - rspamd_inet_address_to_string(session->addr), + rspamd_inet_address_to_string(addr), rspamd_inet_address_to_string(masked), - session->ctx->leaky_bucket_burst); + ctx->leaky_bucket_burst); elt->cur = NAN; new_ratelimit = TRUE; ratelimited = TRUE; @@ -534,7 +550,7 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) } if (ratelimited) { - rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit"); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); } if (new_ratelimit) { @@ -550,10 +566,11 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) { memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen); msg_debug("propagating blocked address to other workers"); - rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL); + rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); } else { - msg_err("bad address length: %d, expected to be %d", (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); + msg_err("bad address length: %d, expected to be %d", + (int) slen, (int) sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); } } } @@ -567,13 +584,13 @@ rspamd_fuzzy_check_ratelimit(struct fuzzy_session *session) elt = g_malloc(sizeof(*elt)); elt->addr = masked; /* transfer ownership */ elt->cur = 1; - elt->last = session->timestamp; + elt->last = timestamp; - rspamd_lru_hash_insert(session->ctx->ratelimit_buckets, + rspamd_lru_hash_insert(ctx->ratelimit_buckets, masked, elt, - session->timestamp, - session->ctx->leaky_bucket_ttl); + timestamp, + ctx->leaky_bucket_ttl); } return TRUE; @@ -1285,7 +1302,7 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud) /* result timestamp */ lua_pushinteger(L, result->ts); /* TODO: add additional data maybe (encryption, pubkey, etc) */ - rspamd_fuzzy_extensions_tolua(L, session); + rspamd_fuzzy_extensions_tolua(L, session->extensions); if ((ret = lua_pcall(L, 9, LUA_MULTRET, err_idx)) != 0) { msg_err("call to lua_post_handler lua " @@ -1536,10 +1553,16 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) if (session->ctx->ratelimit_buckets) { if (session->ctx->ratelimit_log_only) { - (void) rspamd_fuzzy_check_ratelimit(session); /* Check but ignore */ + (void) rspamd_fuzzy_check_ratelimit(session->ctx, + session->addr, + session->worker, + session->timestamp); /* Check but ignore */ } else { - can_continue = rspamd_fuzzy_check_ratelimit(session); + can_continue = rspamd_fuzzy_check_ratelimit(session->ctx, + session->addr, + session->worker, + session->timestamp); } } @@ -2038,6 +2061,12 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) rspamd_inet_address_free(tcp_session->addr); } + tcp_session->worker->nconns--; + + if (tcp_session->ip_stat) { + REF_RELEASE(tcp_session->ip_stat); + } + if (tcp_session->ctx->event_loop) { ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm); ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io); @@ -2052,6 +2081,282 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) g_free(tcp_session); } +static bool +rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned char *buf, size_t buflen) +{ + gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE; + struct rspamd_fuzzy_reply result; + struct fuzzy_peer_cmd up_cmd; + struct fuzzy_peer_request *up_req; + struct fuzzy_key_stat *ip_stat = NULL; + char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1]; + rspamd_inet_addr_t *naddr; + gpointer ptr; + int send_flags = 0; + + if (!rspamd_fuzzy_cmd_from_wire(tcp_session->ctx, tcp_session->addr, buf, + buflen, + &tcp_session->key, + tcp_session->nm, + &tcp_session->cmd, + &tcp_session->epoch, + &tcp_session->cmd_type, + &tcp_session->extensions)) { + /* Discard input */ + tcp_session->ctx->stat.invalid_requests++; + msg_debug("invalid fuzzy command of size %z received", buflen); + + if (tcp_session->addr) { + uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->ctx->errors_ips, + tcp_session->addr, -1); + + if (nerrors == NULL) { + nerrors = g_malloc(sizeof(*nerrors)); + *nerrors = 1; + rspamd_lru_hash_insert(tcp_session->ctx->errors_ips, + rspamd_inet_address_copy(tcp_session->addr, NULL), + nerrors, -1, -1); + } + else { + *nerrors = *nerrors + 1; + } + } + + return false; + } + + struct rspamd_fuzzy_cmd *cmd = &tcp_session->cmd.basic; + size_t up_len = 0; + + switch (tcp_session->cmd_type) { + case CMD_NORMAL: + up_len = sizeof(tcp_session->cmd.basic); + break; + case CMD_SHINGLE: + up_len = sizeof(tcp_session->cmd); + is_shingle = TRUE; + send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE; + break; + case CMD_ENCRYPTED_NORMAL: + up_len = sizeof(tcp_session->cmd.basic); + encrypted = TRUE; + send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; + break; + case CMD_ENCRYPTED_SHINGLE: + up_len = sizeof(tcp_session->cmd); + encrypted = TRUE; + is_shingle = TRUE; + send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED; + break; + default: + msg_err("invalid command type: %d", tcp_session->cmd_type); + return false; + } + + memset(&result, 0, sizeof(result)); + memcpy(result.digest, cmd->digest, sizeof(result.digest)); + result.v1.flag = cmd->flag; + result.v1.tag = cmd->tag; + + if (tcp_session->ctx->lua_pre_handler_cbref != -1) { + /* Start lua pre handler */ + lua_State *L = tcp_session->ctx->cfg->lua_state; + int err_idx, ret; + + lua_pushcfunction(L, &rspamd_lua_traceback); + err_idx = lua_gettop(L); + /* Preallocate stack (small opt) */ + lua_checkstack(L, err_idx + 5); + /* function */ + lua_rawgeti(L, LUA_REGISTRYINDEX, tcp_session->ctx->lua_pre_handler_cbref); + /* client IP */ + rspamd_lua_ip_push(L, tcp_session->addr); + /* client command */ + lua_pushinteger(L, cmd->cmd); + /* command value (push as rspamd_text) */ + (void) lua_new_text(L, cmd->digest, sizeof(cmd->digest), FALSE); + /* is shingle */ + lua_pushboolean(L, is_shingle); + /* TODO: add additional data maybe (encryption, pubkey, etc) */ + rspamd_fuzzy_extensions_tolua(L, tcp_session->extensions); + + if ((ret = lua_pcall(L, 5, LUA_MULTRET, err_idx)) != 0) { + msg_err("call to lua_pre_handler lua " + "script failed (%d): %s", + ret, lua_tostring(L, -1)); + + return false; + } + else { + /* Return values order: + * the first reply will be on err_idx + 1 + * if it is true, then we need to read the former ones: + * 2-nd will be reply code + * 3-rd will be probability (or 0.0 if missing) + */ + ret = lua_toboolean(L, err_idx + 1); + + if (ret) { + /* Artificial reply */ + result.v1.value = lua_tointeger(L, err_idx + 2); + + if (lua_isnumber(L, err_idx + 3)) { + result.v1.prob = lua_tonumber(L, err_idx + 3); + } + else { + result.v1.prob = 0.0f; + } + + lua_settop(L, 0); + /* TODO: write reply */ + + return true; + } + } + + lua_settop(L, 0); + } + + + if (G_UNLIKELY(cmd == NULL || up_len == 0)) { + result.v1.value = 500; + result.v1.prob = 0.0f; + /* TODO: write reply */ + + return true; + } + + if (tcp_session->ctx->encrypted_only && !encrypted) { + /* Do not accept unencrypted commands */ + result.v1.value = 403; + result.v1.prob = 0.0f; + /* TODO: write reply */ + + return true; + } + + if (tcp_session->key && tcp_session->addr) { + ip_stat = rspamd_lru_hash_lookup(tcp_session->key->stat->last_ips, + tcp_session->addr, -1); + + if (ip_stat == NULL) { + naddr = rspamd_inet_address_copy(tcp_session->addr, NULL); + ip_stat = g_malloc0(sizeof(*ip_stat)); + REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor); + rspamd_lru_hash_insert(tcp_session->key->stat->last_ips, + naddr, ip_stat, -1, 0); + } + + REF_RETAIN(ip_stat); + tcp_session->ip_stat = ip_stat; + } + + if (cmd->cmd == FUZZY_CHECK) { + bool can_continue = true; + + if (tcp_session->ctx->ratelimit_buckets) { + if (tcp_session->ctx->ratelimit_log_only) { + (void) rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr, + tcp_session->worker, + ev_now(tcp_session->ctx->event_loop)); /* Check but ignore */ + } + else { + can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr, + tcp_session->worker, + ev_now(tcp_session->ctx->event_loop)); + } + } + + if (can_continue) { + REF_RETAIN(tcp_session); + /* TODO: use a different callback */ + rspamd_fuzzy_backend_check(tcp_session->ctx->backend, cmd, + rspamd_fuzzy_check_callback, tcp_session); + } + else { + result.v1.value = 403; + result.v1.prob = 0.0f; + result.v1.flag = 0; + /* TODO: write reply */ + + return false; + } + } + else if (cmd->cmd == FUZZY_STAT) { + /* Store approximation (if needed) */ + result.v1.prob = tcp_session->ctx->stat.fuzzy_hashes; + /* Store high qword in value and low qword in flag */ + result.v1.value = (int32_t) ((uint64_t) tcp_session->ctx->stat.fuzzy_hashes >> 32); + result.v1.flag = (uint32_t) (tcp_session->ctx->stat.fuzzy_hashes & G_MAXUINT32); + /* TODO: write reply */ + } + else if (cmd->cmd == FUZZY_PING) { + result.v1.prob = 1.0f; + result.v1.value = cmd->value; + /* TODO: write reply */ + } + else { + if (rspamd_fuzzy_check_write(tcp_session->ctx, tcp_session->addr, tcp_session->key)) { + /* Check whitelist */ + if (tcp_session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { + rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest), + hexbuf, sizeof(hexbuf) - 1); + hexbuf[sizeof(hexbuf) - 1] = '\0'; + + if (rspamd_match_hash_map(tcp_session->ctx->skip_hashes, + hexbuf, sizeof(hexbuf) - 1)) { + result.v1.value = 401; + result.v1.prob = 0.0f; + + goto reply; + } + } + + if (tcp_session->ctx->weak_ids && + kh_get(fuzzy_key_ids_set, tcp_session->ctx->weak_ids, cmd->flag) != kh_end(tcp_session->ctx->weak_ids)) { + /* Flag command as weak */ + cmd->version |= RSPAMD_FUZZY_FLAG_WEAK; + } + + if (tcp_session->worker->index == 0 || tcp_session->ctx->peer_fd == -1) { + /* Just add to the queue */ + up_cmd.is_shingle = is_shingle; + ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal; + memcpy(ptr, cmd, up_len); + g_array_append_val(tcp_session->ctx->updates_pending, up_cmd); + } + else { + /* We need to send request to the peer */ + up_req = g_malloc0(sizeof(*up_req)); + up_req->cmd.is_shingle = is_shingle; + ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal; + memcpy(ptr, cmd, up_len); + + if (!fuzzy_peer_try_send(tcp_session->ctx->peer_fd, up_req)) { + up_req->io_ev.data = up_req; + ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, + tcp_session->ctx->peer_fd, EV_WRITE); + ev_io_start(tcp_session->ctx->event_loop, &up_req->io_ev); + } + else { + g_free(up_req); + } + } + + result.v1.value = 0; + result.v1.prob = 1.0f; + } + else { + result.v1.value = 403; + result.v1.prob = 0.0f; + } + reply: + /* TODO: write reply */ + } + + return true; +} + static bool fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read) { @@ -2128,18 +2433,18 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) rspamd_inet_address_to_string(tcp_session->addr), strerror(errno)); - tcp_session_dtor(tcp_session); + REF_RELEASE(tcp_session); } else if (r == 0) { /* Got EOF */ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF", rspamd_inet_address_to_string(tcp_session->addr)); - tcp_session_dtor(tcp_session); + REF_RELEASE(tcp_session); } else { if (!fuzzy_tcp_process_input(tcp_session, r)) { - tcp_session_dtor(tcp_session); + REF_RELEASE(tcp_session); } else { if (tcp_session->replies_queue != NULL) { @@ -2191,7 +2496,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) rspamd_inet_address_to_string(tcp_session->addr), strerror(errno)); - tcp_session_dtor(tcp_session); + REF_RELEASE(tcp_session); } else if (r == 0) { /* Fake EV_WRITE? */ @@ -2238,7 +2543,7 @@ tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents) msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr)); - tcp_session_dtor(tcp_session); + REF_RELEASE(tcp_session); } static void @@ -2272,6 +2577,7 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents) tcp_session->addr = addr; tcp_session->ctx = ctx; tcp_session->fd = nfd; + REF_INIT_RETAIN(tcp_session, tcp_session_dtor); ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ); ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout); tcp_session->tm.data = tcp_session;