]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Allow fuzzy workers to exchange blocked information
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 1 Jul 2023 12:32:22 +0000 (13:32 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 1 Jul 2023 12:32:22 +0000 (13:32 +0100)
src/fuzzy_storage.c
src/libserver/hyperscan_tools.cxx
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h

index 33f9d40e8dac5e54d0bc11b4b6fbdd7c79851e38..31a2c46e63c72def9400ea09524c411d7701957a 100644 (file)
@@ -273,7 +273,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
                (time_t)session->timestamp);
 
        if (elt) {
-               gboolean ratelimited = FALSE;
+               gboolean ratelimited = FALSE, new_ratelimit = FALSE;
 
                if (isnan (elt->cur)) {
                        /* Ratelimit exceeded, preserve it for the whole ttl */
@@ -301,18 +301,40 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
                                                rspamd_inet_address_to_string (masked),
                                                session->ctx->leaky_bucket_burst);
                                elt->cur = NAN;
+                               new_ratelimit = TRUE;
                        }
                        else {
                                elt->cur ++; /* Allow one more request */
                        }
                }
 
-               rspamd_inet_address_free (masked);
-
                if (ratelimited) {
                        rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
                }
 
+               if (new_ratelimit) {
+                       struct rspamd_srv_command srv_cmd;
+
+                       srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
+                       srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
+
+                       if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
+                               socklen_t slen;
+                               struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
+
+                               if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
+                                       memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
+                                       msg_debug("propagating blocked address to other workers");
+                                       rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+                               }
+                               else {
+                                       msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+                               }
+                       }
+               }
+
+               rspamd_inet_address_free (masked);
+
                return !ratelimited;
        }
        else {
@@ -1906,6 +1928,83 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
        return TRUE;
 }
 
+static gboolean
+rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main,
+                                                  struct rspamd_worker *worker, gint fd,
+                                                  gint attached_fd,
+                                                  struct rspamd_control_command *cmd,
+                                                  gpointer ud)
+{
+       struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud;
+       struct rspamd_control_reply rep;
+       struct rspamd_leaky_bucket_elt *elt;
+       ev_tstamp now = ev_now (ctx->event_loop);
+       rspamd_inet_addr_t *addr = NULL;
+
+       rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+       rep.reply.fuzzy_blocked.status = 0;
+
+       if (cmd->cmd.fuzzy_blocked.af == AF_INET) {
+               addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+                               sizeof (struct sockaddr_in));
+       }
+       else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) {
+               addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+                               sizeof (struct sockaddr_in6));
+       }
+       else {
+               msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af);
+               rep.reply.fuzzy_blocked.status = -1;
+       }
+
+       if (addr) {
+               elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr,
+                       (time_t) now);
+
+               if (elt) {
+                       if (isnan (elt->cur)) {
+                               /* Already ratelimited, ignore */
+                       }
+                       else {
+                               elt->last = now;
+                               elt->cur = NAN;
+
+                               msg_info ("propagating ratelimiting %s, %.1f max elts",
+                                       rspamd_inet_address_to_string(addr),
+                                       ctx->leaky_bucket_burst);
+                               rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+                       }
+
+                       rspamd_inet_address_free(addr);
+
+               }
+               else {
+                       /* New bucket */
+                       elt = g_malloc(sizeof(*elt));
+                       elt->addr = addr; /* transfer ownership */
+                       elt->cur = NAN;
+                       elt->last = now;
+
+                       rspamd_lru_hash_insert(ctx->ratelimit_buckets,
+                               addr,
+                               elt,
+                               (time_t)now,
+                               ctx->leaky_bucket_ttl);
+                       msg_info ("propagating ratelimiting %s, %.1f max elts",
+                               rspamd_inet_address_to_string(addr),
+                               ctx->leaky_bucket_burst);
+                       rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+               }
+       }
+
+       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+               msg_err ("cannot write reply to the control socket: %s",
+                       strerror (errno));
+       }
+
+       return TRUE;
+}
+
 static gboolean
 rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
                struct rspamd_worker *worker, gint fd,
@@ -2856,6 +2955,8 @@ start_fuzzy (struct rspamd_worker *worker)
                        rspamd_fuzzy_storage_stat, ctx);
        rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
                        rspamd_fuzzy_storage_sync, ctx);
+       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED,
+               rspamd_fuzzy_control_blocked, ctx);
 
 
        if (ctx->update_map != NULL) {
index 8159cbd26dc2f803933483c7e3f85a1e963e490e..6fdc72e87cc4daa9a3dbe7f67a5271979b0f9b97 100644 (file)
@@ -609,7 +609,7 @@ rspamd_hyperscan_notice_known(const char *fname)
                                (int)strlen(fname), fname, (int)sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
                }
                else {
-                       notice_cmd.type = RSPAMD_NOTICE_HYPERSCAN_CACHE;
+                       notice_cmd.type = RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE;
                        rspamd_strlcpy(notice_cmd.cmd.hyperscan_cache_file.path, fname, sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
                        rspamd_srv_send_command(rspamd_current_worker,
                                rspamd_current_worker->srv->event_loop, &notice_cmd, -1,
index 82913a19f697d7b6ded12162579f291ecc11dd45..cbafec2700e8505d963f0738f972854ea57ed98e 100644 (file)
@@ -629,6 +629,7 @@ rspamd_control_default_cmd_handler (gint fd,
        case RSPAMD_CONTROL_FUZZY_SYNC:
        case RSPAMD_CONTROL_LOG_PIPE:
        case RSPAMD_CONTROL_CHILD_CHANGE:
+       case RSPAMD_CONTROL_FUZZY_BLOCKED:
                break;
        case RSPAMD_CONTROL_RERESOLVE:
                if (cd->worker->srv->cfg) {
@@ -1020,12 +1021,22 @@ rspamd_srv_handler (EV_P_ ev_io *w, int revents)
                        case RSPAMD_SRV_HEALTH:
                                rspamd_fill_health_reply (srv, &rdata->rep);
                                break;
-                       case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+                       case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
 #ifdef WITH_HYPERSCAN
                                rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
 #endif
                                rdata->rep.reply.hyperscan_cache_file.unused = 0;
                                break;
+                       case RSPAMD_SRV_FUZZY_BLOCKED:
+                               /* Broadcast command to all workers */
+                               memset (&wcmd, 0, sizeof (wcmd));
+                               wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+                               /* Ensure that memcpy is safe */
+                               G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
+                               memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
+                               rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+                                       rspamd_control_ignore_io_handler, NULL, worker->pid);
+                               break;
                        default:
                                msg_err ("unknown command type: %d", cmd.type);
                                break;
@@ -1354,9 +1365,12 @@ const gchar *rspamd_srv_command_to_string (enum rspamd_srv_type cmd)
        case RSPAMD_SRV_HEALTH:
                reply = "health";
                break;
-       case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+       case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
                reply = "notice_hyperscan_cache";
                break;
+       case RSPAMD_SRV_FUZZY_BLOCKED:
+               reply = "fuzzy_blocked";
+               break;
        }
 
        return reply;
index 049c9b80c37f10d4a94cf5b7ba043c2ba9786ad0..dd661c1456337a2afda9fae04a6258f87e025197 100644 (file)
@@ -36,6 +36,7 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_FUZZY_SYNC,
        RSPAMD_CONTROL_MONITORED_CHANGE,
        RSPAMD_CONTROL_CHILD_CHANGE,
+       RSPAMD_CONTROL_FUZZY_BLOCKED,
        RSPAMD_CONTROL_MAX
 };
 
@@ -47,7 +48,8 @@ enum rspamd_srv_type {
        RSPAMD_SRV_ON_FORK,
        RSPAMD_SRV_HEARTBEAT,
        RSPAMD_SRV_HEALTH,
-       RSPAMD_NOTICE_HYPERSCAN_CACHE,
+       RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE,
+       RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
 };
 
 enum rspamd_log_pipe_type {
@@ -96,6 +98,14 @@ struct rspamd_control_command {
                        pid_t pid;
                        guint additional;
                } child_change;
+               struct {
+                       union {
+                               struct sockaddr sa;
+                               struct sockaddr_in s4;
+                               struct sockaddr_in6 s6;
+                       } addr;
+                       sa_family_t af;
+               } fuzzy_blocked;
        } cmd;
 };
 
@@ -134,6 +144,9 @@ struct rspamd_control_reply {
                struct {
                        guint status;
                } fuzzy_sync;
+               struct {
+                       guint status;
+               } fuzzy_blocked;
        } reply;
 };
 
@@ -179,6 +192,15 @@ struct rspamd_srv_command {
                struct {
                        char path[CONTROL_PATHLEN];
                } hyperscan_cache_file;
+               /* Send when one worker has blocked some IP address */
+               struct {
+                       union {
+                               struct sockaddr sa;
+                               struct sockaddr_in s4;
+                               struct sockaddr_in6 s6;
+                       } addr;
+                       sa_family_t af;
+               } fuzzy_blocked;
        } cmd;
 };
 
@@ -213,6 +235,9 @@ struct rspamd_srv_reply {
                struct {
                        int unused;
                } hyperscan_cache_file;
+               struct {
+                       int unused;
+               } fuzzy_blocked;
        } reply;
 };