struct hs_helper_ctx *ctx = cbd->ctx;
static struct rspamd_srv_command srv_cmd;
+ /* Don't send if worker is terminating */
+ if (worker->state != rspamd_worker_state_running) {
+ msg_info("skipping final notification, worker is terminating");
+ g_free(cbd);
+ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+ return;
+ }
+
memset(&srv_cmd, 0, sizeof(srv_cmd));
srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
struct hs_helper_ctx *ctx = compile_cbd->ctx;
static struct rspamd_srv_command srv_cmd;
+ /* Don't send notifications if worker is terminating */
+ if (worker->state != rspamd_worker_state_running) {
+ compile_cbd->scopes_remaining--;
+ if (compile_cbd->scopes_remaining == 0) {
+ g_free(compile_cbd);
+ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+ }
+ return;
+ }
+
if (err != NULL) {
- /* Failed to compile: log and continue */
msg_err("cannot compile Hyperscan database for scope %s: %e",
scope ? scope : "default", err);
}
if (ncompiled > 0) {
compile_cbd->total_compiled += ncompiled;
- /* Send notification for this specific scope */
+ /* Re-check state before sending - could have changed during compilation */
+ if (worker->state != rspamd_worker_state_running) {
+ msg_info("skipping scope notification for %s, worker is terminating",
+ scope ? scope : "default");
+ compile_cbd->scopes_remaining--;
+ if (compile_cbd->scopes_remaining == 0) {
+ g_free(compile_cbd);
+ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+ }
+ return;
+ }
+
memset(&srv_cmd, 0, sizeof(srv_cmd));
srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
compile_cbd->scopes_remaining--;
- /* Check if all scopes are done */
if (compile_cbd->scopes_remaining == 0) {
if (compile_cbd->workers_ready) {
- /* Workers are ready, send notification immediately */
msg_info("compiled %d total regular expressions to the hyperscan tree, "
"send final notification",
compile_cbd->total_compiled);
rspamd_rs_send_final_notification(compile_cbd);
}
else {
- /* Workers not ready yet, notification will be sent when workers_spawned event is received */
msg_info("compiled %d total regular expressions to the hyperscan tree, "
"waiting for workers to be ready before sending notification",
compile_cbd->total_compiled);
struct hs_helper_ctx *ctx;
ctx = (struct hs_helper_ctx *) worker->ctx;
+
+ /* Don't send if worker is terminating */
+ if (worker->state != rspamd_worker_state_running) {
+ msg_info("skipping single notification, worker is terminating");
+ g_free(cbd);
+ return;
+ }
+
memset(&srv_cmd, 0, sizeof(srv_cmd));
srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
ctx = (struct hs_helper_ctx *) worker->ctx;
+ /* Don't send notifications if worker is terminating */
+ if (worker->state != rspamd_worker_state_running) {
+ g_free(compile_cbd);
+ return;
+ }
+
if (err != NULL) {
- /* Failed to compile: log and go out */
msg_err("cannot compile Hyperscan database: %e", err);
g_free(compile_cbd);
return;
timer_cbd->workers_ready = compile_cbd->workers_ready;
if (timer_cbd->workers_ready) {
- /* Workers are ready, send notification immediately */
msg_info("compiled %d regular expressions to the hyperscan tree, "
"send loaded notification",
ncompiled);
rspamd_rs_send_single_notification(timer_cbd);
}
else {
- /* Workers not ready yet, notification will be sent when workers_spawned event is received */
msg_info("compiled %d regular expressions to the hyperscan tree, "
"waiting for workers to be ready before sending notification",
ncompiled);
rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
gboolean forced)
{
+ if (worker->state != rspamd_worker_state_running) {
+ return FALSE;
+ }
+
msg_info("starting hyperscan compilation (forced: %s, workers_ready: %s)",
forced ? "yes" : "no", ctx->workers_ready ? "yes" : "no");
rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
ctx->hs_dir, ctx->max_time, !forced,
ctx->event_loop,
+ worker,
rspamd_rs_compile_cb,
(void *) single_cbd);
return TRUE;
rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
ctx->hs_dir, ctx->max_time, !forced,
ctx->event_loop,
+ worker,
rspamd_rs_compile_cb,
(void *) single_cbd);
return TRUE;
rspamd_re_cache_compile_hyperscan_scoped_single(scope, scope_for_compile,
ctx->hs_dir, ctx->max_time, !forced,
ctx->event_loop,
+ worker,
rspamd_rs_compile_scoped_cb,
compile_cbd);
}
char fp[PATH_MAX];
GError *err = NULL;
+ if (worker->state != rspamd_worker_state_running) {
+ msg_info("worker terminating, stopping multipattern compilation");
+ break;
+ }
+
npatterns = rspamd_multipattern_get_npatterns(mp);
msg_info("compiling multipattern '%s' with %ud patterns", entry->name, npatterns);
fp, entry->name);
}
else {
- /* Compile and save to cache */
+ rspamd_worker_set_busy(worker, ctx->event_loop, "compile multipattern");
+
+ if (worker->state != rspamd_worker_state_running) {
+ rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
+ break;
+ }
+
if (!rspamd_multipattern_compile_hs_to_cache(mp, ctx->hs_dir, &err)) {
+ rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
msg_err("failed to compile multipattern '%s': %e", entry->name, err);
if (err) {
g_error_free(err);
}
continue;
}
+
+ rspamd_worker_set_busy(worker, ctx->event_loop, NULL);
+
+ if (worker->state != rspamd_worker_state_running) {
+ break;
+ }
+ }
+
+ if (worker->state != rspamd_worker_state_running) {
+ break;
}
- /* Send notification to main process */
struct rspamd_srv_command srv_cmd;
memset(&srv_cmd, 0, sizeof(srv_cmd));
srv_cmd.type = RSPAMD_SRV_MULTIPATTERN_LOADED;
}
/* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */
- if (ctx->loaded) {
+ if (ctx->loaded && worker->state == rspamd_worker_state_running) {
static struct rspamd_srv_command srv_cmd;
memset(&srv_cmd, 0, sizeof(srv_cmd));
msg_info("sent delayed hyperscan loaded notification after workers spawned");
ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */
}
- else {
+ else if (!ctx->loaded && worker->state == rspamd_worker_state_running) {
/* Start initial compilation now that workers are ready */
msg_info("starting initial hyperscan compilation after workers spawned");
if (!rspamd_rs_compile(ctx, worker, FALSE)) {
double tim;
ctx = worker->ctx;
+
+ if (worker->state != rspamd_worker_state_running) {
+ ev_timer_stop(EV_A_ w);
+ return;
+ }
+
tim = rspamd_time_jitter(ctx->recompile_time, 0);
w->repeat = tim;
return tl::make_unexpected(error{is_valid.error(), -1, error_category::IMPORTANT});
}
- hs_cache.add_cached_file(map.get_file());
+ /* Use rspamd_hyperscan_notice_known() to both add to local cache AND notify main process
+ * This ensures .unser files are known to main and won't be deleted during cleanup */
+ rspamd_hyperscan_notice_known(map.get_file().get_name().data());
return tl::expected<hs_shared_database, error>{tl::in_place, std::move(map), db};
}
1.0, /* max_time */
FALSE, /* silent */
worker->ctx ? ((struct rspamd_abstract_worker_ctx *) worker->ctx)->event_loop : NULL,
+ worker,
NULL, /* callback */
NULL); /* cbdata */
}
#ifdef WITH_HYPERSCAN
#include "hs.h"
#include "hyperscan_tools.h"
+#include "rspamd_control.h"
#endif
#include "unix-std.h"
double max_time;
gboolean silent;
unsigned int total;
+ struct rspamd_worker *worker;
void (*cb)(unsigned int ncompiled, GError *err, void *cbd);
cache = cbdata->cache;
+ /* Stop if worker is terminating */
+ if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) {
+ ev_timer_stop(EV_A_ w);
+ cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ g_free(w);
+ g_free(cbdata);
+
+ return;
+ }
+
if (!g_hash_table_iter_next(&cbdata->it, &k, &v)) {
/* All done */
ev_timer_stop(EV_A_ w);
} while (0)
if (n > 0) {
- /* Create the hs tree */
hs_errors = NULL;
- if (hs_compile_ext_multi((const char **) hs_pats,
- hs_flags,
- hs_ids,
- hs_exts,
- n,
- HS_MODE_BLOCK,
- &cache->plt,
- &test_db,
- &hs_errors) != HS_SUCCESS) {
+
+ if (cbdata->worker) {
+ rspamd_worker_set_busy(cbdata->worker, EV_A, "compile hyperscan");
+
+ if (cbdata->worker->state != rspamd_worker_state_running) {
+ rspamd_worker_set_busy(cbdata->worker, EV_A, NULL);
+ CLEANUP_ALLOCATED(false);
+ ev_timer_stop(EV_A_ w);
+ cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ g_free(w);
+ g_free(cbdata);
+ return;
+ }
+ }
+
+ hs_error_t compile_result = hs_compile_ext_multi((const char **) hs_pats,
+ hs_flags,
+ hs_ids,
+ hs_exts,
+ n,
+ HS_MODE_BLOCK,
+ &cache->plt,
+ &test_db,
+ &hs_errors);
+
+ if (cbdata->worker) {
+ rspamd_worker_set_busy(cbdata->worker, EV_A, NULL);
+
+ if (cbdata->worker->state != rspamd_worker_state_running) {
+ if (test_db) {
+ hs_free_database(test_db);
+ }
+ CLEANUP_ALLOCATED(false);
+ ev_timer_stop(EV_A_ w);
+ cbdata->cb(cbdata->total, NULL, cbdata->cbd);
+ g_free(w);
+ g_free(cbdata);
+ return;
+ }
+ }
+
+ if (compile_result != HS_SUCCESS) {
err = g_error_new(rspamd_re_cache_quark(), EINVAL,
"cannot create tree of regexp when processing '%s': %s",
hs_pats[hs_errors->expression], hs_errors->message);
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
void *cbd)
{
cbdata->max_time = max_time;
cbdata->silent = silent;
cbdata->total = 0;
+ cbdata->worker = worker;
timer = g_malloc0(sizeof(*timer));
timer->data = (void *) cbdata; /* static */
unsigned int completed_scopes;
unsigned int total_compiled;
GError *first_error;
+ struct rspamd_worker *worker;
void (*final_cb)(unsigned int ncompiled, GError *err, void *cbd);
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
void *cbd)
{
coord_data->completed_scopes = 0;
coord_data->total_compiled = 0;
coord_data->first_error = NULL;
+ coord_data->worker = worker;
coord_data->final_cb = cb;
coord_data->final_cbd = cbd;
DL_FOREACH(cache_head, cur)
{
result = rspamd_re_cache_compile_hyperscan(cur, cache_dir, max_time, silent,
- event_loop, rspamd_re_cache_compile_scoped_coordination_cb, coord_data);
+ event_loop, worker, rspamd_re_cache_compile_scoped_coordination_cb, coord_data);
if (result < 0) {
/* If we failed to start compilation for this scope, treat it as completed with error */
GError *start_error = g_error_new(rspamd_re_cache_quark(), result,
double max_time;
gboolean silent;
int lock_fd;
+ struct rspamd_worker *worker;
void (*cb)(const char *scope, unsigned int ncompiled, GError *err, void *cbd);
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(const char *scope, unsigned int ncompiled, GError *err,
void *cbd),
void *cbd)
scoped_cbd->max_time = max_time;
scoped_cbd->silent = silent;
scoped_cbd->lock_fd = lock_fd;
+ scoped_cbd->worker = worker;
scoped_cbd->cb = cb;
scoped_cbd->cbd = cbd;
return rspamd_re_cache_compile_hyperscan(cache, cache_dir, max_time, silent,
- event_loop, rspamd_re_cache_compile_scoped_cb, scoped_cbd);
+ event_loop, worker, rspamd_re_cache_compile_scoped_cb, scoped_cbd);
}
#else
/* Non hyperscan version stub */
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(const char *scope, unsigned int ncompiled, GError *err, void *cbd),
void *cbd)
{
enum rspamd_re_type rspamd_re_cache_type_from_string(const char *str);
struct ev_loop;
+struct rspamd_worker;
/**
* Compile expressions to the hyperscan tree and store in the `cache_dir`
*/
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
void *cbd);
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(unsigned int ncompiled, GError *err, void *cbd),
void *cbd);
double max_time,
gboolean silent,
struct ev_loop *event_loop,
+ struct rspamd_worker *worker,
void (*cb)(const char *scope, unsigned int ncompiled, GError *err, void *cbd),
void *cbd);
worker->hb.last_event = ev_time();
rdata->rep.reply.heartbeat.status = 0;
break;
+ case RSPAMD_SRV_BUSY:
+ worker->hb.is_busy = cmd.cmd.busy.is_busy;
+ if (cmd.cmd.busy.is_busy) {
+ rspamd_strlcpy(worker->hb.busy_reason, cmd.cmd.busy.reason,
+ sizeof(worker->hb.busy_reason));
+ msg_info_main("worker type %s with pid %P marked as busy: %s",
+ g_quark_to_string(worker->type), worker->pid,
+ worker->hb.busy_reason);
+ }
+ else {
+ msg_info_main("worker type %s with pid %P finished: %s",
+ g_quark_to_string(worker->type), worker->pid,
+ worker->hb.busy_reason);
+ worker->hb.busy_reason[0] = '\0';
+ }
+ break;
case RSPAMD_SRV_HEALTH:
rspamd_fill_health_reply(rspamd_main, &rdata->rep);
break;
case RSPAMD_SRV_MULTIPATTERN_LOADED:
reply = "multipattern_loaded";
break;
+ case RSPAMD_SRV_BUSY:
+ reply = "busy";
+ break;
}
return reply;
}
+
+struct rspamd_busy_cb_data {
+ gboolean completed;
+};
+
+static void
+rspamd_worker_busy_reply_handler(struct rspamd_worker *worker,
+ struct rspamd_srv_reply *rep,
+ int rep_fd,
+ gpointer ud)
+{
+ struct rspamd_busy_cb_data *cbd = (struct rspamd_busy_cb_data *) ud;
+ cbd->completed = TRUE;
+}
+
+void rspamd_worker_set_busy(struct rspamd_worker *worker,
+ struct ev_loop *event_loop,
+ const char *reason)
+{
+ struct rspamd_srv_command srv_cmd;
+ struct rspamd_busy_cb_data cbd;
+ int max_iterations = 100; /* Safety limit */
+
+ /* Don't send if worker is terminating */
+ if (worker->state != rspamd_worker_state_running) {
+ return;
+ }
+
+ memset(&srv_cmd, 0, sizeof(srv_cmd));
+ srv_cmd.type = RSPAMD_SRV_BUSY;
+ srv_cmd.cmd.busy.is_busy = (reason != NULL);
+ if (reason) {
+ rspamd_strlcpy(srv_cmd.cmd.busy.reason, reason,
+ sizeof(srv_cmd.cmd.busy.reason));
+ }
+
+ cbd.completed = FALSE;
+ rspamd_srv_send_command(worker, event_loop, &srv_cmd, -1,
+ rspamd_worker_busy_reply_handler, &cbd);
+
+ /* Run the event loop until the notification is acknowledged
+ * Also stop if worker starts terminating (signal received during wait) */
+ while (!cbd.completed && max_iterations-- > 0 &&
+ worker->state == rspamd_worker_state_running) {
+ ev_run(event_loop, EVRUN_ONCE);
+ }
+
+ /* If worker is terminating, propagate the break to the outer event loop */
+ if (worker->state != rspamd_worker_state_running) {
+ ev_break(event_loop, EVBREAK_ALL);
+ }
+ else if (!cbd.completed) {
+ msg_warn("busy notification may not have reached main process");
+ }
+}
RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
RSPAMD_SRV_WORKERS_SPAWNED, /* Used to notify workers that all workers have been spawned */
RSPAMD_SRV_MULTIPATTERN_LOADED, /* Multipattern HS compiled and ready */
+ RSPAMD_SRV_BUSY, /* Worker is busy with long-running operation, suspend heartbeat */
};
enum rspamd_log_pipe_type {
char name[64];
char cache_dir[CONTROL_PATHLEN];
} mp_loaded;
+ /* Sent when worker starts/finishes long-running operation */
+ struct {
+ gboolean is_busy;
+ char reason[32]; /* Short reason like "compile hyperscan" */
+ } busy;
} cmd;
};
*/
void rspamd_pending_control_free(gpointer p);
+/**
+ * Notify main process that worker is busy with long-running operation
+ * Main process will skip heartbeat checks while worker is busy
+ * @param worker worker instance
+ * @param event_loop worker event loop
+ * @param reason short reason string (e.g., "compile hyperscan"), NULL to clear
+ */
+void rspamd_worker_set_busy(struct rspamd_worker *worker,
+ struct ev_loop *event_loop,
+ const char *reason);
+
G_END_DECLS
#endif
time_from_last -= wrk->hb.last_event;
rspamd_main = wrk->srv;
+ if (wrk->hb.is_busy || rspamd_main->wanna_die) {
+ /* Worker is doing long-running operation or we're shutting down,
+ * skip heartbeat check */
+ return;
+ }
+
if (wrk->hb.last_event > 0 &&
time_from_last > 0 &&
time_from_last >= rspamd_main->cfg->heartbeat_interval * 2) {
memset(&rep, 0, sizeof(rep));
rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- /*
- * Check if we received an FD for shared memory hyperscan database.
- * FD-based loading is used when hs_helper sends a pre-deserialized
- * database via SCM_RIGHTS. This allows workers to mmap the database
- * directly without disk I/O.
- */
+ /* FD-based loading infrastructure - close unused FD for now */
if (attached_fd >= 0 && cmd->cmd.hs_loaded.fd_size > 0) {
- msg_info("received hyperscan fd %d with size %z (scope: %s) - "
- "FD-based loading infrastructure ready, using file-based for now",
- attached_fd, cmd->cmd.hs_loaded.fd_size,
- cmd->cmd.hs_loaded.scope[0] != '\0' ? cmd->cmd.hs_loaded.scope : "all");
- /* Close the FD since we're not using it yet */
close(attached_fd);
attached_fd = -1;
}
strerror(errno));
}
- /* Close any remaining FD we didn't use */
if (attached_fd >= 0) {
close(attached_fd);
}
}
}
else {
- msg_warn_main("terminate worker %s(%P) with SIGKILL",
- g_quark_to_string(w->type), w->pid);
+ if (w->hb.is_busy && w->hb.busy_reason[0]) {
+ msg_warn_main("terminate worker %s(%P) with SIGKILL; "
+ "worker was busy: %s",
+ g_quark_to_string(w->type), w->pid,
+ w->hb.busy_reason);
+ }
+ else {
+ msg_warn_main("terminate worker %s(%P) with SIGKILL",
+ g_quark_to_string(w->type), w->pid);
+ }
}
}
else {
return;
}
else {
- msg_err_main("data corruption warning: terminating "
- "special worker %s(%P) with SIGKILL",
- g_quark_to_string(w->type), w->pid);
+ if (w->hb.is_busy && w->hb.busy_reason[0]) {
+ msg_err_main("data corruption warning: terminating "
+ "special worker %s(%P) with SIGKILL; "
+ "worker was busy: %s",
+ g_quark_to_string(w->type), w->pid,
+ w->hb.busy_reason);
+ }
+ else {
+ msg_err_main("data corruption warning: terminating "
+ "special worker %s(%P) with SIGKILL",
+ g_quark_to_string(w->type), w->pid);
+ }
}
}
}
rspamd_attach_worker(rspamd_main, cur);
}
+static void
+rspamd_log_pending_worker(gpointer key, gpointer value, gpointer ud)
+{
+ struct rspamd_worker *w = (struct rspamd_worker *) value;
+ struct rspamd_main *rspamd_main = w->srv;
+
+ if (w->hb.is_busy && w->hb.busy_reason[0]) {
+ msg_info_main(" - %s(%P): busy with %s",
+ g_quark_to_string(w->type), w->pid,
+ w->hb.busy_reason);
+ }
+ else {
+ msg_info_main(" - %s(%P): shutting down",
+ g_quark_to_string(w->type), w->pid);
+ }
+}
+
+/* Soft monitoring timer - logs shutdown status periodically */
+static void
+rspamd_shutdown_monitor_handler(EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
+ unsigned int nworkers = g_hash_table_size(rspamd_main->workers);
+
+ if (nworkers > 0) {
+ msg_info_main("shutdown: waiting for %d worker(s):", nworkers);
+ g_hash_table_foreach(rspamd_main->workers, rspamd_log_pending_worker, NULL);
+ }
+ else {
+ ev_timer_stop(EV_A_ w);
+ }
+}
+
static void
rspamd_final_timer_handler(EV_P_ ev_timer *w, int revents)
{
term_attempts--;
+ /* Log pending workers when we're about to force kill them */
+ if (term_attempts == 0 && g_hash_table_size(rspamd_main->workers) > 0) {
+ msg_warn_main("shutdown timeout reached, %d worker(s) still running - sending SIGKILL",
+ (int) g_hash_table_size(rspamd_main->workers));
+ }
+
g_hash_table_foreach(rspamd_main->workers, hash_worker_wait_callback,
NULL);
}
/* Signal handlers */
+#define SHUTDOWN_MONITOR_INITIAL 1.0
+#define SHUTDOWN_MONITOR_REPEAT 10.0
+
static void
rspamd_term_handler(struct ev_loop *loop, ev_signal *w, int revents)
{
struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
static ev_timer ev_finale;
+ static ev_timer ev_monitor;
ev_tstamp shutdown_ts;
if (!rspamd_main->wanna_die) {
ev_timer_init(&ev_finale, rspamd_final_timer_handler,
TERMINATION_INTERVAL, TERMINATION_INTERVAL);
ev_timer_start(rspamd_main->event_loop, &ev_finale);
+
+ ev_monitor.data = rspamd_main;
+ ev_timer_init(&ev_monitor, rspamd_shutdown_monitor_handler,
+ SHUTDOWN_MONITOR_INITIAL, SHUTDOWN_MONITOR_REPEAT);
+ ev_timer_start(rspamd_main->event_loop, &ev_monitor);
}
}
ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */
ev_tstamp last_event; /**< last heartbeat received timestamp */
int64_t nbeats; /**< positive for beats received, negative for beats missed */
+ gboolean is_busy; /**< worker is doing long-running operation, skip heartbeat checks */
+ char busy_reason[32]; /**< reason for being busy (for logging) */
};
enum rspamd_worker_state {