]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add a signal from main to workers for workers ready state
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 1 Jul 2025 20:27:23 +0000 (21:27 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 1 Jul 2025 20:27:23 +0000 (21:27 +0100)
src/hs_helper.c
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/rspamd.c

index 3bd2040f85840b2f779ebbb054f0ec932c63db95..55dbb53f6f6f1e26bd3596569d7646dd2ef45496 100644 (file)
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
+/*
+ * Copyright 2025 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -250,12 +250,12 @@ struct rspamd_hs_helper_compile_cbdata {
        unsigned int total_compiled;
        unsigned int scopes_remaining;
        gboolean forced;
+       gboolean workers_ready;
 };
 
 static void
-rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents)
+rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd)
 {
-       struct rspamd_hs_helper_compile_cbdata *cbd = (struct rspamd_hs_helper_compile_cbdata *) w->data;
        struct rspamd_worker *worker = cbd->worker;
        struct hs_helper_ctx *ctx = cbd->ctx;
        static struct rspamd_srv_command srv_cmd;
@@ -269,11 +269,12 @@ rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents)
 
        rspamd_srv_send_command(worker,
                                                        ctx->event_loop, &srv_cmd, -1, NULL, NULL);
-       ev_timer_stop(EV_A_ w);
-       g_free(w);
-       g_free(cbd);
 
-       ev_timer_again(EV_A_ & ctx->recompile_timer);
+       msg_info("sent final hyperscan loaded notification (%d total expressions compiled)",
+                        cbd->total_compiled);
+
+       g_free(cbd);
+       ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
 }
 
 static void
@@ -320,44 +321,32 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e
 
        /* Check if all scopes are done */
        if (compile_cbd->scopes_remaining == 0) {
-               ev_timer *tm;
-               ev_tstamp when = 0.0;
-
-               /*
-                * Do not send notification unless all other workers are started
-                * XXX: now we just sleep for 1 seconds to ensure that
-                */
-               if (!ctx->loaded) {
-                       when = 1.0; /* Postpone */
-                       ctx->loaded = TRUE;
+               if (compile_cbd->workers_ready) {
+                       /* Workers are ready, send notification immediately */
                        msg_info("compiled %d total regular expressions to the hyperscan tree, "
-                                        "postpone final notification for %.0f seconds to avoid races",
-                                        compile_cbd->total_compiled,
-                                        when);
+                                        "send final notification",
+                                        compile_cbd->total_compiled);
+                       rspamd_rs_send_final_notification(compile_cbd);
                }
                else {
+                       /* Workers not ready yet, notification will be sent when workers_spawned event is received */
                        msg_info("compiled %d total regular expressions to the hyperscan tree, "
-                                        "send final notification",
+                                        "waiting for workers to be ready before sending notification",
                                         compile_cbd->total_compiled);
+                       ctx->loaded = TRUE;
                }
-
-               tm = g_malloc0(sizeof(*tm));
-               tm->data = (void *) compile_cbd;
-               ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, when, 0);
-               ev_timer_start(ctx->event_loop, tm);
        }
 }
 
 struct rspamd_hs_helper_single_compile_cbdata {
        struct rspamd_worker *worker;
        gboolean forced;
+       gboolean workers_ready;
 };
 
 static void
-rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents)
+rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata *cbd)
 {
-       struct rspamd_hs_helper_single_compile_cbdata *cbd =
-               (struct rspamd_hs_helper_single_compile_cbdata *) w->data;
        struct rspamd_worker *worker = cbd->worker;
        static struct rspamd_srv_command srv_cmd;
        struct hs_helper_ctx *ctx;
@@ -372,11 +361,11 @@ rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents)
 
        rspamd_srv_send_command(worker,
                                                        ctx->event_loop, &srv_cmd, -1, NULL, NULL);
-       ev_timer_stop(EV_A_ w);
-       g_free(w);
-       g_free(cbd);
 
-       ev_timer_again(EV_A_ & ctx->recompile_timer);
+       msg_info("sent hyperscan loaded notification");
+
+       g_free(cbd);
+       ev_timer_again(ctx->event_loop, &ctx->recompile_timer);
 }
 
 static void
@@ -385,8 +374,6 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd)
        struct rspamd_hs_helper_single_compile_cbdata *compile_cbd =
                (struct rspamd_hs_helper_single_compile_cbdata *) cbd;
        struct rspamd_worker *worker = compile_cbd->worker;
-       ev_timer *tm;
-       ev_tstamp when = 0.0;
        struct hs_helper_ctx *ctx;
        struct rspamd_hs_helper_single_compile_cbdata *timer_cbd;
 
@@ -399,33 +386,26 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd)
                return;
        }
 
-       /*
-        * Do not send notification unless all other workers are started
-        * XXX: now we just sleep for 1 seconds to ensure that
-        */
-       if (!ctx->loaded) {
-               when = 1.0; /* Postpone */
-               ctx->loaded = TRUE;
+       timer_cbd = g_malloc0(sizeof(*timer_cbd));
+       timer_cbd->worker = worker;
+       timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced;
+       timer_cbd->workers_ready = compile_cbd->workers_ready;
+
+       if (timer_cbd->workers_ready) {
+               /* Workers are ready, send notification immediately */
                msg_info("compiled %d regular expressions to the hyperscan tree, "
-                                "postpone loaded notification for %.0f seconds to avoid races",
-                                ncompiled,
-                                when);
+                                "send loaded notification",
+                                ncompiled);
+               rspamd_rs_send_single_notification(timer_cbd);
        }
        else {
+               /* Workers not ready yet, notification will be sent when workers_spawned event is received */
                msg_info("compiled %d regular expressions to the hyperscan tree, "
-                                "send loaded notification",
+                                "waiting for workers to be ready before sending notification",
                                 ncompiled);
+               ctx->loaded = TRUE;
        }
 
-       timer_cbd = g_malloc0(sizeof(*timer_cbd));
-       timer_cbd->worker = worker;
-       timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced;
-
-       tm = g_malloc0(sizeof(*tm));
-       tm->data = (void *) timer_cbd;
-       ev_timer_init(tm, rspamd_rs_delayed_cb, when, 0);
-       ev_timer_start(ctx->event_loop, tm);
-
        g_free(compile_cbd);
 }
 
@@ -453,6 +433,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                        g_malloc0(sizeof(*single_cbd));
                single_cbd->worker = worker;
                single_cbd->forced = forced;
+               single_cbd->workers_ready = ctx->loaded;
 
                rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
                                                                                  ctx->hs_dir, ctx->max_time, !forced,
@@ -472,6 +453,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                        g_malloc0(sizeof(*single_cbd));
                single_cbd->worker = worker;
                single_cbd->forced = forced;
+               single_cbd->workers_ready = ctx->loaded;
 
                rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
                                                                                  ctx->hs_dir, ctx->max_time, !forced,
@@ -489,6 +471,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
        compile_cbd->total_compiled = 0;
        compile_cbd->scopes_remaining = names_count;
        compile_cbd->forced = forced;
+       compile_cbd->workers_ready = ctx->loaded;
 
        /* Compile each scope */
        for (unsigned int i = 0; i < names_count; i++) {
@@ -510,10 +493,12 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                        /* Check if we're done */
                        if (compile_cbd->scopes_remaining == 0) {
                                /* No scopes to compile, send final notification immediately */
-                               ev_timer *tm = g_malloc0(sizeof(*tm));
-                               tm->data = (void *) compile_cbd;
-                               ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, 0.0, 0);
-                               ev_timer_start(ctx->event_loop, tm);
+                               if (compile_cbd->workers_ready) {
+                                       rspamd_rs_send_final_notification(compile_cbd);
+                               }
+                               else {
+                                       ctx->loaded = TRUE;
+                               }
                        }
                }
        }
@@ -545,11 +530,61 @@ rspamd_hs_helper_reload(struct rspamd_main *rspamd_main,
 
        /* Stop recompile */
        ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+       ctx->loaded = FALSE; /* Reset flag for forced recompile */
        rspamd_rs_compile(ctx, worker, TRUE);
 
        return TRUE;
 }
 
+static gboolean
+rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main,
+                                                                struct rspamd_worker *worker, int fd,
+                                                                int attached_fd,
+                                                                struct rspamd_control_command *cmd,
+                                                                gpointer ud)
+{
+       struct rspamd_control_reply rep;
+       struct hs_helper_ctx *ctx = ud;
+
+       msg_info("received workers_spawned notification (%d workers); hyperscan ready: %s",
+                        cmd->cmd.workers_spawned.workers_count,
+                        ctx->loaded ? "yes" : "no");
+
+       memset(&rep, 0, sizeof(rep));
+       rep.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+       rep.reply.workers_spawned.status = 0;
+
+       /* Write reply */
+       if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) {
+               msg_err("cannot write reply to the control socket: %s",
+                               strerror(errno));
+       }
+
+       /* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */
+       if (ctx->loaded) {
+               static struct rspamd_srv_command srv_cmd;
+
+               memset(&srv_cmd, 0, sizeof(srv_cmd));
+               srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
+               rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
+                                          sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
+               srv_cmd.cmd.hs_loaded.forced = FALSE;
+               srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */
+
+               rspamd_srv_send_command(worker,
+                                                               ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+
+               msg_info("sent delayed hyperscan loaded notification after workers spawned");
+               ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */
+       }
+
+       if (attached_fd != -1) {
+               close(attached_fd);
+       }
+
+       return TRUE;
+}
+
 static void
 rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents)
 {
@@ -583,13 +618,10 @@ start_hs_helper(struct rspamd_worker *worker)
                                                                                        "hs_helper",
                                                                                        NULL);
 
-       if (!rspamd_rs_compile(ctx, worker, FALSE)) {
-               /* Tell main not to respawn more workers */
-               exit(EXIT_SUCCESS);
-       }
-
        rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_RECOMPILE,
                                                                                  rspamd_hs_helper_reload, ctx);
+       rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_WORKERS_SPAWNED,
+                                                                                 rspamd_hs_helper_workers_spawned, ctx);
 
        ctx->recompile_timer.data = worker;
        tim = rspamd_time_jitter(ctx->recompile_time, 0);
index deab5064b1846c250e8bf0eb8f01796c5991b347..e212f7e91d63f4f59a31aa60f4a5458192a9dcf3 100644 (file)
@@ -724,6 +724,9 @@ rspamd_control_default_cmd_handler(int fd,
        case RSPAMD_CONTROL_CHILD_CHANGE:
        case RSPAMD_CONTROL_FUZZY_BLOCKED:
                break;
+       case RSPAMD_CONTROL_WORKERS_SPAWNED:
+               rep.reply.workers_spawned.status = 0;
+               break;
        case RSPAMD_CONTROL_RERESOLVE:
                if (cd->worker->srv->cfg) {
                        REF_RETAIN(cd->worker->srv->cfg);
@@ -1165,6 +1168,10 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents)
                                rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
                                                                                         rspamd_control_ignore_io_handler, NULL, worker->pid);
                                break;
+                       case RSPAMD_SRV_WORKERS_SPAWNED:
+                               /* No need to broadcast, this is just a notification from main to specific workers */
+                               rdata->rep.reply.workers_spawned.status = 0;
+                               break;
                        default:
                                msg_err_main("unknown command type: %d", cmd.type);
                                break;
@@ -1418,6 +1425,9 @@ rspamd_control_command_from_string(const char *str)
        else if (g_ascii_strcasecmp(str, "child_change") == 0) {
                ret = RSPAMD_CONTROL_CHILD_CHANGE;
        }
+       else if (g_ascii_strcasecmp(str, "workers_spawned") == 0) {
+               ret = RSPAMD_CONTROL_WORKERS_SPAWNED;
+       }
 
        return ret;
 }
@@ -1458,6 +1468,9 @@ rspamd_control_command_to_string(enum rspamd_control_type cmd)
        case RSPAMD_CONTROL_CHILD_CHANGE:
                reply = "child_change";
                break;
+       case RSPAMD_CONTROL_WORKERS_SPAWNED:
+               reply = "workers_spawned";
+               break;
        default:
                break;
        }
@@ -1497,6 +1510,9 @@ const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd)
        case RSPAMD_SRV_FUZZY_BLOCKED:
                reply = "fuzzy_blocked";
                break;
+       case RSPAMD_SRV_WORKERS_SPAWNED:
+               reply = "workers_spawned";
+               break;
        }
 
        return reply;
index 92bdec85d5ac8a6f1574f7f084572427e5fc8964..81603cab2b7d85fb5a08c63c6093937338759dea 100644 (file)
@@ -37,6 +37,7 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_MONITORED_CHANGE,
        RSPAMD_CONTROL_CHILD_CHANGE,
        RSPAMD_CONTROL_FUZZY_BLOCKED,
+       RSPAMD_CONTROL_WORKERS_SPAWNED,
        RSPAMD_CONTROL_MAX
 };
 
@@ -49,7 +50,8 @@ enum rspamd_srv_type {
        RSPAMD_SRV_HEARTBEAT,
        RSPAMD_SRV_HEALTH,
        RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE,
-       RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
+       RSPAMD_SRV_FUZZY_BLOCKED,   /* Used to notify main process about a blocked ip */
+       RSPAMD_SRV_WORKERS_SPAWNED, /* Used to notify workers that all workers have been spawned */
 };
 
 enum rspamd_log_pipe_type {
@@ -107,6 +109,9 @@ struct rspamd_control_command {
                        } addr;
                        sa_family_t af;
                } fuzzy_blocked;
+               struct {
+                       unsigned int workers_count;
+               } workers_spawned;
        } cmd;
 };
 
@@ -148,6 +153,9 @@ struct rspamd_control_reply {
                struct {
                        unsigned int status;
                } fuzzy_blocked;
+               struct {
+                       unsigned int status;
+               } workers_spawned;
        } reply;
 };
 
@@ -203,6 +211,10 @@ struct rspamd_srv_command {
                        } addr;
                        sa_family_t af;
                } fuzzy_blocked;
+               /* Sent when all workers have been spawned */
+               struct {
+                       unsigned int workers_count;
+               } workers_spawned;
        } cmd;
 };
 
@@ -240,6 +252,9 @@ struct rspamd_srv_reply {
                struct {
                        int unused;
                } fuzzy_blocked;
+               struct {
+                       int status;
+               } workers_spawned;
        } reply;
 };
 
index dafd9aebebf0512907b45aab14a32e7f37403b1d..ba1ea1fb85ead10e4f95c013000cffdf4288ffe5 100644 (file)
@@ -1155,6 +1155,18 @@ rspamd_hup_handler(struct ev_loop *loop, ev_signal *w, int revents)
                        msg_info_main("spawn workers with a new config");
                        spawn_workers(rspamd_main, rspamd_main->event_loop);
                        msg_info_main("workers spawning has been finished");
+
+                       /* Notify all workers that spawning is complete */
+                       {
+                               struct rspamd_control_command wcmd;
+                               memset(&wcmd, 0, sizeof(wcmd));
+                               wcmd.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+                               wcmd.cmd.workers_spawned.workers_count = g_hash_table_size(rspamd_main->workers);
+                               rspamd_control_broadcast_srv_cmd(rspamd_main, &wcmd, 0);
+                               msg_info_main("notified workers that spawning is complete after reload (%d workers)",
+                                                         wcmd.cmd.workers_spawned.workers_count);
+                       }
+
                        /* Kill marked */
                        msg_info_main("kill old workers");
                        g_hash_table_foreach(rspamd_main->workers, kill_old_workers, NULL);
@@ -1687,6 +1699,17 @@ int main(int argc, char **argv, char **env)
        spawn_workers(rspamd_main, event_loop);
        rspamd_mempool_unlock_mutex(rspamd_main->start_mtx);
 
+       /* Notify all workers that spawning is complete */
+       {
+               struct rspamd_control_command wcmd;
+               memset(&wcmd, 0, sizeof(wcmd));
+               wcmd.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+               wcmd.cmd.workers_spawned.workers_count = g_hash_table_size(rspamd_main->workers);
+               rspamd_control_broadcast_srv_cmd(rspamd_main, &wcmd, 0);
+               msg_info_main("notified workers that spawning is complete (%d workers)",
+                                         wcmd.cmd.workers_spawned.workers_count);
+       }
+
        rspamd_main->http_ctx = rspamd_http_context_create(rspamd_main->cfg,
                                                                                                           event_loop, rspamd_main->cfg->ups_ctx);