]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Properly terminate hs_helper during shutdown
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 8 Jan 2026 22:40:38 +0000 (22:40 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 8 Jan 2026 22:40:38 +0000 (22:40 +0000)
Add RSPAMD_SRV_BUSY command to allow hs_helper to notify main process
when busy with long-running hyperscan compilation. Main skips heartbeat
checks while worker is busy and logs busy reason during shutdown.

Key fixes:
- Prevent notifications being sent after worker receives termination signal
- Propagate ev_break through rspamd_worker_set_busy to properly exit event loop
- Add shutdown monitor timer to log pending workers during termination
- Pass worker pointer to re_cache compile functions for termination checks

src/hs_helper.c
src/libserver/hyperscan_tools.cxx
src/libserver/maps/map.c
src/libserver/re_cache.c
src/libserver/re_cache.h
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/libserver/worker_util.c
src/rspamd.c
src/rspamd.h

index 026de81d5cd9b37deb41f93b1f674255d86bb870..63c5678919cace27c4820bdf5ad54aafc3ee274a 100644 (file)
@@ -317,6 +317,14 @@ rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd)
        struct hs_helper_ctx *ctx = cbd->ctx;
        static struct rspamd_srv_command srv_cmd;
 
+       /* Don't send if worker is terminating */
+       if (worker->state != rspamd_worker_state_running) {
+               msg_info("skipping final notification, worker is terminating");
+               g_free(cbd);
+               ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+               return;
+       }
+
        memset(&srv_cmd, 0, sizeof(srv_cmd));
        srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
        rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
@@ -343,8 +351,17 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e
        struct hs_helper_ctx *ctx = compile_cbd->ctx;
        static struct rspamd_srv_command srv_cmd;
 
+       /* Don't send notifications if worker is terminating */
+       if (worker->state != rspamd_worker_state_running) {
+               compile_cbd->scopes_remaining--;
+               if (compile_cbd->scopes_remaining == 0) {
+                       g_free(compile_cbd);
+                       ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+               }
+               return;
+       }
+
        if (err != NULL) {
-               /* Failed to compile: log and continue */
                msg_err("cannot compile Hyperscan database for scope %s: %e",
                                scope ? scope : "default", err);
        }
@@ -352,7 +369,18 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e
                if (ncompiled > 0) {
                        compile_cbd->total_compiled += ncompiled;
 
-                       /* Send notification for this specific scope */
+                       /* Re-check state before sending - could have changed during compilation */
+                       if (worker->state != rspamd_worker_state_running) {
+                               msg_info("skipping scope notification for %s, worker is terminating",
+                                                scope ? scope : "default");
+                               compile_cbd->scopes_remaining--;
+                               if (compile_cbd->scopes_remaining == 0) {
+                                       g_free(compile_cbd);
+                                       ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+                               }
+                               return;
+                       }
+
                        memset(&srv_cmd, 0, sizeof(srv_cmd));
                        srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
                        rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
@@ -376,17 +404,14 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e
 
        compile_cbd->scopes_remaining--;
 
-       /* Check if all scopes are done */
        if (compile_cbd->scopes_remaining == 0) {
                if (compile_cbd->workers_ready) {
-                       /* Workers are ready, send notification immediately */
                        msg_info("compiled %d total regular expressions to the hyperscan tree, "
                                         "send final notification",
                                         compile_cbd->total_compiled);
                        rspamd_rs_send_final_notification(compile_cbd);
                }
                else {
-                       /* Workers not ready yet, notification will be sent when workers_spawned event is received */
                        msg_info("compiled %d total regular expressions to the hyperscan tree, "
                                         "waiting for workers to be ready before sending notification",
                                         compile_cbd->total_compiled);
@@ -409,6 +434,14 @@ rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata
        struct hs_helper_ctx *ctx;
 
        ctx = (struct hs_helper_ctx *) worker->ctx;
+
+       /* Don't send if worker is terminating */
+       if (worker->state != rspamd_worker_state_running) {
+               msg_info("skipping single notification, worker is terminating");
+               g_free(cbd);
+               return;
+       }
+
        memset(&srv_cmd, 0, sizeof(srv_cmd));
        srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
        rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
@@ -436,8 +469,13 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd)
 
        ctx = (struct hs_helper_ctx *) worker->ctx;
 
+       /* Don't send notifications if worker is terminating */
+       if (worker->state != rspamd_worker_state_running) {
+               g_free(compile_cbd);
+               return;
+       }
+
        if (err != NULL) {
-               /* Failed to compile: log and go out */
                msg_err("cannot compile Hyperscan database: %e", err);
                g_free(compile_cbd);
                return;
@@ -449,14 +487,12 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd)
        timer_cbd->workers_ready = compile_cbd->workers_ready;
 
        if (timer_cbd->workers_ready) {
-               /* Workers are ready, send notification immediately */
                msg_info("compiled %d regular expressions to the hyperscan tree, "
                                 "send loaded notification",
                                 ncompiled);
                rspamd_rs_send_single_notification(timer_cbd);
        }
        else {
-               /* Workers not ready yet, notification will be sent when workers_spawned event is received */
                msg_info("compiled %d regular expressions to the hyperscan tree, "
                                 "waiting for workers to be ready before sending notification",
                                 ncompiled);
@@ -470,6 +506,10 @@ static gboolean
 rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                                  gboolean forced)
 {
+       if (worker->state != rspamd_worker_state_running) {
+               return FALSE;
+       }
+
        msg_info("starting hyperscan compilation (forced: %s, workers_ready: %s)",
                         forced ? "yes" : "no", ctx->workers_ready ? "yes" : "no");
 
@@ -498,6 +538,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
                                                                                  ctx->hs_dir, ctx->max_time, !forced,
                                                                                  ctx->event_loop,
+                                                                                 worker,
                                                                                  rspamd_rs_compile_cb,
                                                                                  (void *) single_cbd);
                return TRUE;
@@ -530,6 +571,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
                                                                                  ctx->hs_dir, ctx->max_time, !forced,
                                                                                  ctx->event_loop,
+                                                                                 worker,
                                                                                  rspamd_rs_compile_cb,
                                                                                  (void *) single_cbd);
                return TRUE;
@@ -556,6 +598,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
                        rspamd_re_cache_compile_hyperscan_scoped_single(scope, scope_for_compile,
                                                                                                                        ctx->hs_dir, ctx->max_time, !forced,
                                                                                                                        ctx->event_loop,
+                                                                                                                       worker,
                                                                                                                        rspamd_rs_compile_scoped_cb,
                                                                                                                        compile_cbd);
                }
@@ -621,6 +664,11 @@ rspamd_hs_helper_compile_pending_multipatterns(struct hs_helper_ctx *ctx,
                char fp[PATH_MAX];
                GError *err = NULL;
 
+               if (worker->state != rspamd_worker_state_running) {
+                       msg_info("worker terminating, stopping multipattern compilation");
+                       break;
+               }
+
                npatterns = rspamd_multipattern_get_npatterns(mp);
                msg_info("compiling multipattern '%s' with %ud patterns", entry->name, npatterns);
 
@@ -634,17 +682,33 @@ rspamd_hs_helper_compile_pending_multipatterns(struct hs_helper_ctx *ctx,
                                         fp, entry->name);
                }
                else {
-                       /* Compile and save to cache */
+                       rspamd_worker_set_busy(worker, ctx->event_loop, "compile multipattern");
+
+                       if (worker->state != rspamd_worker_state_running) {
+                               rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
+                               break;
+                       }
+
                        if (!rspamd_multipattern_compile_hs_to_cache(mp, ctx->hs_dir, &err)) {
+                               rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
                                msg_err("failed to compile multipattern '%s': %e", entry->name, err);
                                if (err) {
                                        g_error_free(err);
                                }
                                continue;
                        }
+
+                       rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
+
+                       if (worker->state != rspamd_worker_state_running) {
+                               break;
+                       }
+               }
+
+               if (worker->state != rspamd_worker_state_running) {
+                       break;
                }
 
-               /* Send notification to main process */
                struct rspamd_srv_command srv_cmd;
                memset(&srv_cmd, 0, sizeof(srv_cmd));
                srv_cmd.type = RSPAMD_SRV_MULTIPATTERN_LOADED;
@@ -689,7 +753,7 @@ rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main,
        }
 
        /* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */
-       if (ctx->loaded) {
+       if (ctx->loaded && worker->state == rspamd_worker_state_running) {
                static struct rspamd_srv_command srv_cmd;
 
                memset(&srv_cmd, 0, sizeof(srv_cmd));
@@ -705,7 +769,7 @@ rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main,
                msg_info("sent delayed hyperscan loaded notification after workers spawned");
                ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */
        }
-       else {
+       else if (!ctx->loaded && worker->state == rspamd_worker_state_running) {
                /* Start initial compilation now that workers are ready */
                msg_info("starting initial hyperscan compilation after workers spawned");
                if (!rspamd_rs_compile(ctx, worker, FALSE)) {
@@ -733,6 +797,12 @@ rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents)
        double tim;
 
        ctx = worker->ctx;
+
+       if (worker->state != rspamd_worker_state_running) {
+               ev_timer_stop(EV_A_ w);
+               return;
+       }
+
        tim = rspamd_time_jitter(ctx->recompile_time, 0);
        w->repeat = tim;
 
index 2dd13da07953886deb221b94554bd7feccf40e88..724c69417d6c90c5f0ae036e2d4556ca672fd714 100644 (file)
@@ -533,7 +533,9 @@ hs_shared_from_unserialized(hs_known_files_cache &hs_cache, raii_mmaped_file &&m
                return tl::make_unexpected(error{is_valid.error(), -1, error_category::IMPORTANT});
        }
 
-       hs_cache.add_cached_file(map.get_file());
+       /* Use rspamd_hyperscan_notice_known() to both add to local cache AND notify main process
+        * This ensures .unser files are known to main and won't be deleted during cleanup */
+       rspamd_hyperscan_notice_known(map.get_file().get_name().data());
        return tl::expected<hs_shared_database, error>{tl::in_place, std::move(map), db};
 }
 
index 6a90aa9b61ebb04e43c125709781ebad6e3c0525..10f42b9199e6f69ee9eae1136812709c026c6250 100644 (file)
@@ -3898,6 +3898,7 @@ void rspamd_map_trigger_hyperscan_compilation(struct rspamd_map *map)
                                                                                                                        1.0,   /* max_time */
                                                                                                                        FALSE, /* silent */
                                                                                                                        worker->ctx ? ((struct rspamd_abstract_worker_ctx *) worker->ctx)->event_loop : NULL,
+                                                                                                                       worker,
                                                                                                                        NULL,  /* callback */
                                                                                                                        NULL); /* cbdata */
                }
index 2aa93e0d350f1d5117ab09148c002ed2dd38f2f8..e5b7caeb67770d80d202407a9805344bb8706814 100644 (file)
@@ -32,6 +32,7 @@
 #ifdef WITH_HYPERSCAN
 #include "hs.h"
 #include "hyperscan_tools.h"
+#include "rspamd_control.h"
 #endif
 
 #include "unix-std.h"
@@ -2141,6 +2142,7 @@ struct rspamd_re_cache_hs_compile_cbdata {
        double max_time;
        gboolean silent;
        unsigned int total;
+       struct rspamd_worker *worker;
 
        void (*cb)(unsigned int ncompiled, GError *err, void *cbd);
 
@@ -2192,6 +2194,16 @@ rspamd_re_cache_compile_timer_cb(EV_P_ ev_timer *w, int revents)
 
        cache = cbdata->cache;
 
+       /* Stop if worker is terminating */
+       if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) {
+               ev_timer_stop(EV_A_ w);
+               cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+               g_free(w);
+               g_free(cbdata);
+
+               return;
+       }
+
        if (!g_hash_table_iter_next(&cbdata->it, &k, &v)) {
                /* All done */
                ev_timer_stop(EV_A_ w);
@@ -2361,17 +2373,49 @@ rspamd_re_cache_compile_timer_cb(EV_P_ ev_timer *w, int revents)
        } while (0)
 
        if (n > 0) {
-               /* Create the hs tree */
                hs_errors = NULL;
-               if (hs_compile_ext_multi((const char **) hs_pats,
-                                                                hs_flags,
-                                                                hs_ids,
-                                                                hs_exts,
-                                                                n,
-                                                                HS_MODE_BLOCK,
-                                                                &cache->plt,
-                                                                &test_db,
-                                                                &hs_errors) != HS_SUCCESS) {
+
+               if (cbdata->worker) {
+                       rspamd_worker_set_busy(cbdata->worker, EV_A, "compile hyperscan");
+
+                       if (cbdata->worker->state != rspamd_worker_state_running) {
+                               rspamd_worker_set_busy(cbdata->worker, EV_A, NULL);
+                               CLEANUP_ALLOCATED(false);
+                               ev_timer_stop(EV_A_ w);
+                               cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+                               g_free(w);
+                               g_free(cbdata);
+                               return;
+                       }
+               }
+
+               hs_error_t compile_result = hs_compile_ext_multi((const char **) hs_pats,
+                                                                                                                hs_flags,
+                                                                                                                hs_ids,
+                                                                                                                hs_exts,
+                                                                                                                n,
+                                                                                                                HS_MODE_BLOCK,
+                                                                                                                &cache->plt,
+                                                                                                                &test_db,
+                                                                                                                &hs_errors);
+
+               if (cbdata->worker) {
+                       rspamd_worker_set_busy(cbdata->worker, EV_A, NULL);
+
+                       if (cbdata->worker->state != rspamd_worker_state_running) {
+                               if (test_db) {
+                                       hs_free_database(test_db);
+                               }
+                               CLEANUP_ALLOCATED(false);
+                               ev_timer_stop(EV_A_ w);
+                               cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+                               g_free(w);
+                               g_free(cbdata);
+                               return;
+                       }
+               }
+
+               if (compile_result != HS_SUCCESS) {
                        err = g_error_new(rspamd_re_cache_quark(), EINVAL,
                                                          "cannot create tree of regexp when processing '%s': %s",
                                                          hs_pats[hs_errors->expression], hs_errors->message);
@@ -2516,6 +2560,7 @@ int rspamd_re_cache_compile_hyperscan(struct rspamd_re_cache *cache,
                                                                          double max_time,
                                                                          gboolean silent,
                                                                          struct ev_loop *event_loop,
+                                                                         struct rspamd_worker *worker,
                                                                          void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
                                                                          void *cbd)
 {
@@ -2538,6 +2583,7 @@ int rspamd_re_cache_compile_hyperscan(struct rspamd_re_cache *cache,
        cbdata->max_time = max_time;
        cbdata->silent = silent;
        cbdata->total = 0;
+       cbdata->worker = worker;
        timer = g_malloc0(sizeof(*timer));
        timer->data = (void *) cbdata; /* static */
 
@@ -2555,6 +2601,7 @@ struct rspamd_re_cache_scoped_compile_data {
        unsigned int completed_scopes;
        unsigned int total_compiled;
        GError *first_error;
+       struct rspamd_worker *worker;
 
        void (*final_cb)(unsigned int ncompiled, GError *err, void *cbd);
 
@@ -2596,6 +2643,7 @@ int rspamd_re_cache_compile_hyperscan_scoped(struct rspamd_re_cache *cache_head,
                                                                                         double max_time,
                                                                                         gboolean silent,
                                                                                         struct ev_loop *event_loop,
+                                                                                        struct rspamd_worker *worker,
                                                                                         void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
                                                                                         void *cbd)
 {
@@ -2628,6 +2676,7 @@ int rspamd_re_cache_compile_hyperscan_scoped(struct rspamd_re_cache *cache_head,
        coord_data->completed_scopes = 0;
        coord_data->total_compiled = 0;
        coord_data->first_error = NULL;
+       coord_data->worker = worker;
        coord_data->final_cb = cb;
        coord_data->final_cbd = cbd;
 
@@ -2638,7 +2687,7 @@ int rspamd_re_cache_compile_hyperscan_scoped(struct rspamd_re_cache *cache_head,
        DL_FOREACH(cache_head, cur)
        {
                result = rspamd_re_cache_compile_hyperscan(cur, cache_dir, max_time, silent,
-                                                                                                  event_loop, rspamd_re_cache_compile_scoped_coordination_cb, coord_data);
+                                                                                                  event_loop, worker, rspamd_re_cache_compile_scoped_coordination_cb, coord_data);
                if (result < 0) {
                        /* If we failed to start compilation for this scope, treat it as completed with error */
                        GError *start_error = g_error_new(rspamd_re_cache_quark(), result,
@@ -3432,6 +3481,7 @@ struct rspamd_re_cache_hs_compile_scoped_cbdata {
        double max_time;
        gboolean silent;
        int lock_fd;
+       struct rspamd_worker *worker;
 
        void (*cb)(const char *scope, unsigned int ncompiled, GError *err, void *cbd);
 
@@ -3462,6 +3512,7 @@ int rspamd_re_cache_compile_hyperscan_scoped_single(struct rspamd_re_cache *cach
                                                                                                        double max_time,
                                                                                                        gboolean silent,
                                                                                                        struct ev_loop *event_loop,
+                                                                                                       struct rspamd_worker *worker,
                                                                                                        void (*cb)(const char *scope, unsigned int ncompiled, GError *err,
                                                                                                                           void *cbd),
                                                                                                        void *cbd)
@@ -3489,11 +3540,12 @@ int rspamd_re_cache_compile_hyperscan_scoped_single(struct rspamd_re_cache *cach
        scoped_cbd->max_time = max_time;
        scoped_cbd->silent = silent;
        scoped_cbd->lock_fd = lock_fd;
+       scoped_cbd->worker = worker;
        scoped_cbd->cb = cb;
        scoped_cbd->cbd = cbd;
 
        return rspamd_re_cache_compile_hyperscan(cache, cache_dir, max_time, silent,
-                                                                                        event_loop, rspamd_re_cache_compile_scoped_cb, scoped_cbd);
+                                                                                        event_loop, worker, rspamd_re_cache_compile_scoped_cb, scoped_cbd);
 }
 #else
 /* Non hyperscan version stub */
@@ -3503,6 +3555,7 @@ int rspamd_re_cache_compile_hyperscan_scoped_single(struct rspamd_re_cache *cach
                                                                                                        double max_time,
                                                                                                        gboolean silent,
                                                                                                        struct ev_loop *event_loop,
+                                                                                                       struct rspamd_worker *worker,
                                                                                                        void (*cb)(const char *scope, unsigned int ncompiled, GError *err, void *cbd),
                                                                                                        void *cbd)
 {
index c5c8627d89008cbd9ff7cbd8ee46dc7e2e350559..dc0ac38849585d4d83557af4e3f1c09939a7e0de 100644 (file)
@@ -229,6 +229,7 @@ const char *rspamd_re_cache_type_to_string(enum rspamd_re_type type);
 enum rspamd_re_type rspamd_re_cache_type_from_string(const char *str);
 
 struct ev_loop;
+struct rspamd_worker;
 /**
  * Compile expressions to the hyperscan tree and store in the `cache_dir`
  */
@@ -237,6 +238,7 @@ int rspamd_re_cache_compile_hyperscan(struct rspamd_re_cache *cache,
                                                                          double max_time,
                                                                          gboolean silent,
                                                                          struct ev_loop *event_loop,
+                                                                         struct rspamd_worker *worker,
                                                                          void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
                                                                          void *cbd);
 
@@ -248,6 +250,7 @@ int rspamd_re_cache_compile_hyperscan_scoped(struct rspamd_re_cache *cache_head,
                                                                                         double max_time,
                                                                                         gboolean silent,
                                                                                         struct ev_loop *event_loop,
+                                                                                        struct rspamd_worker *worker,
                                                                                         void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
                                                                                         void *cbd);
 
@@ -283,6 +286,7 @@ int rspamd_re_cache_compile_hyperscan_scoped_single(struct rspamd_re_cache *cach
                                                                                                        double max_time,
                                                                                                        gboolean silent,
                                                                                                        struct ev_loop *event_loop,
+                                                                                                       struct rspamd_worker *worker,
                                                                                                        void (*cb)(const char *scope, unsigned int ncompiled, GError *err, void *cbd),
                                                                                                        void *cbd);
 
index 4ed964d42b05aa95728d82ce67b4e647aa148e22..09dd0f4e6b908424d2f91c3d433503b42ee9ba58 100644 (file)
@@ -1217,6 +1217,22 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents)
                                worker->hb.last_event = ev_time();
                                rdata->rep.reply.heartbeat.status = 0;
                                break;
+                       case RSPAMD_SRV_BUSY:
+                               worker->hb.is_busy = cmd.cmd.busy.is_busy;
+                               if (cmd.cmd.busy.is_busy) {
+                                       rspamd_strlcpy(worker->hb.busy_reason, cmd.cmd.busy.reason,
+                                                                  sizeof(worker->hb.busy_reason));
+                                       msg_info_main("worker type %s with pid %P marked as busy: %s",
+                                                                 g_quark_to_string(worker->type), worker->pid,
+                                                                 worker->hb.busy_reason);
+                               }
+                               else {
+                                       msg_info_main("worker type %s with pid %P finished: %s",
+                                                                 g_quark_to_string(worker->type), worker->pid,
+                                                                 worker->hb.busy_reason);
+                                       worker->hb.busy_reason[0] = '\0';
+                               }
+                               break;
                        case RSPAMD_SRV_HEALTH:
                                rspamd_fill_health_reply(rspamd_main, &rdata->rep);
                                break;
@@ -1596,7 +1612,65 @@ const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd)
        case RSPAMD_SRV_MULTIPATTERN_LOADED:
                reply = "multipattern_loaded";
                break;
+       case RSPAMD_SRV_BUSY:
+               reply = "busy";
+               break;
        }
 
        return reply;
 }
+
+struct rspamd_busy_cb_data {
+       gboolean completed;
+};
+
+static void
+rspamd_worker_busy_reply_handler(struct rspamd_worker *worker,
+                                                                struct rspamd_srv_reply *rep,
+                                                                int rep_fd,
+                                                                gpointer ud)
+{
+       struct rspamd_busy_cb_data *cbd = (struct rspamd_busy_cb_data *) ud;
+       cbd->completed = TRUE;
+}
+
+void rspamd_worker_set_busy(struct rspamd_worker *worker,
+                                                       struct ev_loop *event_loop,
+                                                       const char *reason)
+{
+       struct rspamd_srv_command srv_cmd;
+       struct rspamd_busy_cb_data cbd;
+       int max_iterations = 100; /* Safety limit */
+
+       /* Don't send if worker is terminating */
+       if (worker->state != rspamd_worker_state_running) {
+               return;
+       }
+
+       memset(&srv_cmd, 0, sizeof(srv_cmd));
+       srv_cmd.type = RSPAMD_SRV_BUSY;
+       srv_cmd.cmd.busy.is_busy = (reason != NULL);
+       if (reason) {
+               rspamd_strlcpy(srv_cmd.cmd.busy.reason, reason,
+                                          sizeof(srv_cmd.cmd.busy.reason));
+       }
+
+       cbd.completed = FALSE;
+       rspamd_srv_send_command(worker, event_loop, &srv_cmd, -1,
+                                                       rspamd_worker_busy_reply_handler, &cbd);
+
+       /* Run the event loop until the notification is acknowledged
+        * Also stop if worker starts terminating (signal received during wait) */
+       while (!cbd.completed && max_iterations-- > 0 &&
+                  worker->state == rspamd_worker_state_running) {
+               ev_run(event_loop, EVRUN_ONCE);
+       }
+
+       /* If worker is terminating, propagate the break to the outer event loop */
+       if (worker->state != rspamd_worker_state_running) {
+               ev_break(event_loop, EVBREAK_ALL);
+       }
+       else if (!cbd.completed) {
+               msg_warn("busy notification may not have reached main process");
+       }
+}
index d4db9547324e29260fc79bae3ab69be671ca0392..72c896fc9215142ca06144f2156af28637d52bca 100644 (file)
@@ -55,6 +55,7 @@ enum rspamd_srv_type {
        RSPAMD_SRV_FUZZY_BLOCKED,       /* Used to notify main process about a blocked ip */
        RSPAMD_SRV_WORKERS_SPAWNED,     /* Used to notify workers that all workers have been spawned */
        RSPAMD_SRV_MULTIPATTERN_LOADED, /* Multipattern HS compiled and ready */
+       RSPAMD_SRV_BUSY,                /* Worker is busy with long-running operation, suspend heartbeat */
 };
 
 enum rspamd_log_pipe_type {
@@ -241,6 +242,11 @@ struct rspamd_srv_command {
                        char name[64];
                        char cache_dir[CONTROL_PATHLEN];
                } mp_loaded;
+               /* Sent when worker starts/finishes long-running operation */
+               struct {
+                       gboolean is_busy;
+                       char reason[32]; /* Short reason like "compile hyperscan" */
+               } busy;
        } cmd;
 };
 
@@ -366,6 +372,17 @@ const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd);
  */
 void rspamd_pending_control_free(gpointer p);
 
+/**
+ * Notify main process that worker is busy with long-running operation
+ * Main process will skip heartbeat checks while worker is busy
+ * @param worker worker instance
+ * @param event_loop worker event loop
+ * @param reason short reason string (e.g., "compile hyperscan"), NULL to clear
+ */
+void rspamd_worker_set_busy(struct rspamd_worker *worker,
+                                                       struct ev_loop *event_loop,
+                                                       const char *reason);
+
 G_END_DECLS
 
 #endif
index 698b542b29aafbaf17214b31f270dbe1f719c898..3923d0938e18a0c156192d6f740ae2708ffa65ff 100644 (file)
@@ -904,6 +904,12 @@ rspamd_main_heartbeat_cb(EV_P_ ev_timer *w, int revents)
        time_from_last -= wrk->hb.last_event;
        rspamd_main = wrk->srv;
 
+       if (wrk->hb.is_busy || rspamd_main->wanna_die) {
+               /* Worker is doing long-running operation or we're shutting down,
+                * skip heartbeat check */
+               return;
+       }
+
        if (wrk->hb.last_event > 0 &&
                time_from_last > 0 &&
                time_from_last >= rspamd_main->cfg->heartbeat_interval * 2) {
@@ -1909,18 +1915,8 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
 
-       /*
-        * Check if we received an FD for shared memory hyperscan database.
-        * FD-based loading is used when hs_helper sends a pre-deserialized
-        * database via SCM_RIGHTS. This allows workers to mmap the database
-        * directly without disk I/O.
-        */
+       /* FD-based loading infrastructure - close unused FD for now */
        if (attached_fd >= 0 && cmd->cmd.hs_loaded.fd_size > 0) {
-               msg_info("received hyperscan fd %d with size %z (scope: %s) - "
-                                "FD-based loading infrastructure ready, using file-based for now",
-                                attached_fd, cmd->cmd.hs_loaded.fd_size,
-                                cmd->cmd.hs_loaded.scope[0] != '\0' ? cmd->cmd.hs_loaded.scope : "all");
-               /* Close the FD since we're not using it yet */
                close(attached_fd);
                attached_fd = -1;
        }
@@ -1953,7 +1949,6 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
                                strerror(errno));
        }
 
-       /* Close any remaining FD we didn't use */
        if (attached_fd >= 0) {
                close(attached_fd);
        }
index a4eb2e0f2c438a655b28e1ec4a7d7616e9ce01c5..e0a0702d76bd767b9a8e6c7b80b5f95df2d67f8c 100644 (file)
@@ -819,8 +819,16 @@ rspamd_worker_wait(struct rspamd_worker *w)
                                }
                        }
                        else {
-                               msg_warn_main("terminate worker %s(%P) with SIGKILL",
-                                                         g_quark_to_string(w->type), w->pid);
+                               if (w->hb.is_busy && w->hb.busy_reason[0]) {
+                                       msg_warn_main("terminate worker %s(%P) with SIGKILL; "
+                                                                 "worker was busy: %s",
+                                                                 g_quark_to_string(w->type), w->pid,
+                                                                 w->hb.busy_reason);
+                               }
+                               else {
+                                       msg_warn_main("terminate worker %s(%P) with SIGKILL",
+                                                                 g_quark_to_string(w->type), w->pid);
+                               }
                        }
                }
                else {
@@ -830,9 +838,18 @@ rspamd_worker_wait(struct rspamd_worker *w)
                                return;
                        }
                        else {
-                               msg_err_main("data corruption warning: terminating "
-                                                        "special worker %s(%P) with SIGKILL",
-                                                        g_quark_to_string(w->type), w->pid);
+                               if (w->hb.is_busy && w->hb.busy_reason[0]) {
+                                       msg_err_main("data corruption warning: terminating "
+                                                                "special worker %s(%P) with SIGKILL; "
+                                                                "worker was busy: %s",
+                                                                g_quark_to_string(w->type), w->pid,
+                                                                w->hb.busy_reason);
+                               }
+                               else {
+                                       msg_err_main("data corruption warning: terminating "
+                                                                "special worker %s(%P) with SIGKILL",
+                                                                g_quark_to_string(w->type), w->pid);
+                               }
                        }
                }
        }
@@ -1031,6 +1048,39 @@ start_srv_ev(gpointer key, gpointer value, gpointer ud)
        rspamd_attach_worker(rspamd_main, cur);
 }
 
+static void
+rspamd_log_pending_worker(gpointer key, gpointer value, gpointer ud)
+{
+       struct rspamd_worker *w = (struct rspamd_worker *) value;
+       struct rspamd_main *rspamd_main = w->srv;
+
+       if (w->hb.is_busy && w->hb.busy_reason[0]) {
+               msg_info_main("  - %s(%P): busy with %s",
+                                         g_quark_to_string(w->type), w->pid,
+                                         w->hb.busy_reason);
+       }
+       else {
+               msg_info_main("  - %s(%P): shutting down",
+                                         g_quark_to_string(w->type), w->pid);
+       }
+}
+
+/* Soft monitoring timer - logs shutdown status periodically */
+static void
+rspamd_shutdown_monitor_handler(EV_P_ ev_timer *w, int revents)
+{
+       struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
+       unsigned int nworkers = g_hash_table_size(rspamd_main->workers);
+
+       if (nworkers > 0) {
+               msg_info_main("shutdown: waiting for %d worker(s):", nworkers);
+               g_hash_table_foreach(rspamd_main->workers, rspamd_log_pending_worker, NULL);
+       }
+       else {
+               ev_timer_stop(EV_A_ w);
+       }
+}
+
 static void
 rspamd_final_timer_handler(EV_P_ ev_timer *w, int revents)
 {
@@ -1038,6 +1088,12 @@ rspamd_final_timer_handler(EV_P_ ev_timer *w, int revents)
 
        term_attempts--;
 
+       /* Log pending workers when we're about to force kill them */
+       if (term_attempts == 0 && g_hash_table_size(rspamd_main->workers) > 0) {
+               msg_warn_main("shutdown timeout reached, %d worker(s) still running - sending SIGKILL",
+                                         (int) g_hash_table_size(rspamd_main->workers));
+       }
+
        g_hash_table_foreach(rspamd_main->workers, hash_worker_wait_callback,
                                                 NULL);
 
@@ -1047,11 +1103,15 @@ rspamd_final_timer_handler(EV_P_ ev_timer *w, int revents)
 }
 
 /* Signal handlers */
+#define SHUTDOWN_MONITOR_INITIAL 1.0
+#define SHUTDOWN_MONITOR_REPEAT 10.0
+
 static void
 rspamd_term_handler(struct ev_loop *loop, ev_signal *w, int revents)
 {
        struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
        static ev_timer ev_finale;
+       static ev_timer ev_monitor;
        ev_tstamp shutdown_ts;
 
        if (!rspamd_main->wanna_die) {
@@ -1087,6 +1147,11 @@ rspamd_term_handler(struct ev_loop *loop, ev_signal *w, int revents)
                ev_timer_init(&ev_finale, rspamd_final_timer_handler,
                                          TERMINATION_INTERVAL, TERMINATION_INTERVAL);
                ev_timer_start(rspamd_main->event_loop, &ev_finale);
+
+               ev_monitor.data = rspamd_main;
+               ev_timer_init(&ev_monitor, rspamd_shutdown_monitor_handler,
+                                         SHUTDOWN_MONITOR_INITIAL, SHUTDOWN_MONITOR_REPEAT);
+               ev_timer_start(rspamd_main->event_loop, &ev_monitor);
        }
 }
 
index 7aeccae98305183e4554467344655a302b6bfee5..bec7835982e20ac23233e7d871ec402e8e37bcbe 100644 (file)
@@ -86,6 +86,8 @@ struct rspamd_worker_heartbeat {
        ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */
        ev_tstamp last_event;  /**< last heartbeat received timestamp */
        int64_t nbeats;        /**< positive for beats received, negative for beats missed */
+       gboolean is_busy;      /**< worker is doing long-running operation, skip heartbeat checks */
+       char busy_reason[32];  /**< reason for being busy (for logging) */
 };
 
 enum rspamd_worker_state {