-/*-
- * Copyright 2016 Vsevolod Stakhov
+/*
+ * Copyright 2025 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
unsigned int total_compiled;
unsigned int scopes_remaining;
gboolean forced;
+ gboolean workers_ready;
};
static void
-rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents)
+rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd)
{
- struct rspamd_hs_helper_compile_cbdata *cbd = (struct rspamd_hs_helper_compile_cbdata *) w->data;
struct rspamd_worker *worker = cbd->worker;
struct hs_helper_ctx *ctx = cbd->ctx;
static struct rspamd_srv_command srv_cmd;
rspamd_srv_send_command(worker,
ctx->event_loop, &srv_cmd, -1, NULL, NULL);
- ev_timer_stop(EV_A_ w);
- g_free(w);
- g_free(cbd);
- ev_timer_again(EV_A_ & ctx->recompile_timer);
+ msg_info("sent final hyperscan loaded notification (%d total expressions compiled)",
+ cbd->total_compiled);
+
+ g_free(cbd);
+ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
}
static void
/* Check if all scopes are done */
if (compile_cbd->scopes_remaining == 0) {
- ev_timer *tm;
- ev_tstamp when = 0.0;
-
- /*
- * Do not send notification unless all other workers are started
- * XXX: now we just sleep for 1 seconds to ensure that
- */
- if (!ctx->loaded) {
- when = 1.0; /* Postpone */
- ctx->loaded = TRUE;
+ if (compile_cbd->workers_ready) {
+ /* Workers are ready, send notification immediately */
msg_info("compiled %d total regular expressions to the hyperscan tree, "
- "postpone final notification for %.0f seconds to avoid races",
- compile_cbd->total_compiled,
- when);
+ "send final notification",
+ compile_cbd->total_compiled);
+ rspamd_rs_send_final_notification(compile_cbd);
}
else {
+ /* Workers not ready yet, notification will be sent when workers_spawned event is received */
msg_info("compiled %d total regular expressions to the hyperscan tree, "
- "send final notification",
+ "waiting for workers to be ready before sending notification",
compile_cbd->total_compiled);
+ ctx->loaded = TRUE;
}
-
- tm = g_malloc0(sizeof(*tm));
- tm->data = (void *) compile_cbd;
- ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, when, 0);
- ev_timer_start(ctx->event_loop, tm);
}
}
struct rspamd_hs_helper_single_compile_cbdata {
struct rspamd_worker *worker;
gboolean forced;
+ gboolean workers_ready;
};
static void
-rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents)
+rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata *cbd)
{
- struct rspamd_hs_helper_single_compile_cbdata *cbd =
- (struct rspamd_hs_helper_single_compile_cbdata *) w->data;
struct rspamd_worker *worker = cbd->worker;
static struct rspamd_srv_command srv_cmd;
struct hs_helper_ctx *ctx;
rspamd_srv_send_command(worker,
ctx->event_loop, &srv_cmd, -1, NULL, NULL);
- ev_timer_stop(EV_A_ w);
- g_free(w);
- g_free(cbd);
- ev_timer_again(EV_A_ & ctx->recompile_timer);
+ msg_info("sent hyperscan loaded notification");
+
+ g_free(cbd);
+ ev_timer_again(ctx->event_loop, &ctx->recompile_timer);
}
static void
struct rspamd_hs_helper_single_compile_cbdata *compile_cbd =
(struct rspamd_hs_helper_single_compile_cbdata *) cbd;
struct rspamd_worker *worker = compile_cbd->worker;
- ev_timer *tm;
- ev_tstamp when = 0.0;
struct hs_helper_ctx *ctx;
struct rspamd_hs_helper_single_compile_cbdata *timer_cbd;
return;
}
- /*
- * Do not send notification unless all other workers are started
- * XXX: now we just sleep for 1 seconds to ensure that
- */
- if (!ctx->loaded) {
- when = 1.0; /* Postpone */
- ctx->loaded = TRUE;
+ timer_cbd = g_malloc0(sizeof(*timer_cbd));
+ timer_cbd->worker = worker;
+ timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced;
+ timer_cbd->workers_ready = compile_cbd->workers_ready;
+
+ if (timer_cbd->workers_ready) {
+ /* Workers are ready, send notification immediately */
msg_info("compiled %d regular expressions to the hyperscan tree, "
- "postpone loaded notification for %.0f seconds to avoid races",
- ncompiled,
- when);
+ "send loaded notification",
+ ncompiled);
+ rspamd_rs_send_single_notification(timer_cbd);
}
else {
+ /* Workers not ready yet, notification will be sent when workers_spawned event is received */
msg_info("compiled %d regular expressions to the hyperscan tree, "
- "send loaded notification",
+ "waiting for workers to be ready before sending notification",
ncompiled);
+ ctx->loaded = TRUE;
}
- timer_cbd = g_malloc0(sizeof(*timer_cbd));
- timer_cbd->worker = worker;
- timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced;
-
- tm = g_malloc0(sizeof(*tm));
- tm->data = (void *) timer_cbd;
- ev_timer_init(tm, rspamd_rs_delayed_cb, when, 0);
- ev_timer_start(ctx->event_loop, tm);
-
g_free(compile_cbd);
}
g_malloc0(sizeof(*single_cbd));
single_cbd->worker = worker;
single_cbd->forced = forced;
+ single_cbd->workers_ready = ctx->loaded;
rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
ctx->hs_dir, ctx->max_time, !forced,
g_malloc0(sizeof(*single_cbd));
single_cbd->worker = worker;
single_cbd->forced = forced;
+ single_cbd->workers_ready = ctx->loaded;
rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
ctx->hs_dir, ctx->max_time, !forced,
compile_cbd->total_compiled = 0;
compile_cbd->scopes_remaining = names_count;
compile_cbd->forced = forced;
+ compile_cbd->workers_ready = ctx->loaded;
/* Compile each scope */
for (unsigned int i = 0; i < names_count; i++) {
/* Check if we're done */
if (compile_cbd->scopes_remaining == 0) {
/* No scopes to compile, send final notification immediately */
- ev_timer *tm = g_malloc0(sizeof(*tm));
- tm->data = (void *) compile_cbd;
- ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, 0.0, 0);
- ev_timer_start(ctx->event_loop, tm);
+ if (compile_cbd->workers_ready) {
+ rspamd_rs_send_final_notification(compile_cbd);
+ }
+ else {
+ ctx->loaded = TRUE;
+ }
}
}
}
/* Stop recompile */
ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+ ctx->loaded = FALSE; /* Reset flag for forced recompile */
rspamd_rs_compile(ctx, worker, TRUE);
return TRUE;
}
+static gboolean
+rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, int fd,
+ int attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_control_reply rep;
+ struct hs_helper_ctx *ctx = ud;
+
+ msg_info("received workers_spawned notification (%d workers); hyperscan ready: %s",
+ cmd->cmd.workers_spawned.workers_count,
+ ctx->loaded ? "yes" : "no");
+
+ memset(&rep, 0, sizeof(rep));
+ rep.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+ rep.reply.workers_spawned.status = 0;
+
+ /* Write reply */
+ if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) {
+ msg_err("cannot write reply to the control socket: %s",
+ strerror(errno));
+ }
+
+ /* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */
+ if (ctx->loaded) {
+ static struct rspamd_srv_command srv_cmd;
+
+ memset(&srv_cmd, 0, sizeof(srv_cmd));
+ srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
+ rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
+ sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
+ srv_cmd.cmd.hs_loaded.forced = FALSE;
+ srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */
+
+ rspamd_srv_send_command(worker,
+ ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+
+ msg_info("sent delayed hyperscan loaded notification after workers spawned");
+ ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */
+ }
+
+ if (attached_fd != -1) {
+ close(attached_fd);
+ }
+
+ return TRUE;
+}
+
static void
rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents)
{
"hs_helper",
NULL);
- if (!rspamd_rs_compile(ctx, worker, FALSE)) {
- /* Tell main not to respawn more workers */
- exit(EXIT_SUCCESS);
- }
-
rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_RECOMPILE,
rspamd_hs_helper_reload, ctx);
+ rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_WORKERS_SPAWNED,
+ rspamd_hs_helper_workers_spawned, ctx);
ctx->recompile_timer.data = worker;
tim = rspamd_time_jitter(ctx->recompile_time, 0);