]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Refactor srv_pipe to use queue-based architecture with ID dispatch
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 13 Jan 2026 10:20:58 +0000 (10:20 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 13 Jan 2026 10:20:58 +0000 (10:20 +0000)
Replace per-request ev_io watchers with a single watcher using khash
for ID-based reply matching. This fixes potential deadlocks when multiple
commands are queued rapidly (e.g., during hyperscan compilation).

Changes:
- Add rspamd_srv_pipe_ctx with single watcher, send queue, and ID hash
- Make srv_pipe non-blocking on both ends with proper EAGAIN handling
- Add EAGAIN handling to main process write path
- Remove cache_dir from hs_loaded commands (available from config)

src/hs_helper.c
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/libserver/worker_util.c
src/lua/lua_worker.c

index 9b8821332f4bdb314c22eaf4dbe4ad769b2e3db6..f2eb772ffe9c81ed2eccdaefa2634ef96ec391f6 100644 (file)
@@ -493,8 +493,6 @@ rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd)
 
        memset(&srv_cmd, 0, sizeof(srv_cmd));
        srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
-       rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
-                                  sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
        srv_cmd.cmd.hs_loaded.forced = cbd->forced;
        srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */
 
@@ -549,8 +547,6 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e
 
                        memset(&srv_cmd, 0, sizeof(srv_cmd));
                        srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
-                       rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
-                                                  sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
                        srv_cmd.cmd.hs_loaded.forced = compile_cbd->forced;
                        if (scope) {
                                rspamd_strlcpy(srv_cmd.cmd.hs_loaded.scope, scope,
@@ -610,8 +606,6 @@ rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata
 
        memset(&srv_cmd, 0, sizeof(srv_cmd));
        srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
-       rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
-                                  sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
        srv_cmd.cmd.hs_loaded.forced = cbd->forced;
        srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */
 
@@ -1172,8 +1166,6 @@ rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main,
 
                memset(&srv_cmd, 0, sizeof(srv_cmd));
                srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
-               rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
-                                          sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
                srv_cmd.cmd.hs_loaded.forced = FALSE;
                srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */
 
index 47c797ea096d9c69bb5bb3e7f3fae732759585bf..011f6220ed98f2f74d852f17ec47e7d38db8467a 100644 (file)
@@ -22,6 +22,7 @@
 #include "libutil/libev_helper.h"
 #include "unix-std.h"
 #include "utlist.h"
+#include "khash.h"
 #include "composites/composites.h"
 
 #ifdef HAVE_SYS_RESOURCE_H
@@ -1121,15 +1122,11 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents)
                                if (cmd.cmd.hs_loaded.scope[0] != '\0') {
                                        /* Scoped loading */
                                        const char *scope = cmd.cmd.hs_loaded.scope;
-                                       msg_info_main("received scoped hyperscan cache loaded from %s for scope: %s",
-                                                                 cmd.cmd.hs_loaded.cache_dir, scope);
+                                       msg_info_main("received scoped hyperscan cache loaded for scope: %s", scope);
 
                                        /* Broadcast scoped command to all workers */
                                        memset(&wcmd, 0, sizeof(wcmd));
                                        wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
-                                       rspamd_strlcpy(wcmd.cmd.hs_loaded.cache_dir,
-                                                                  cmd.cmd.hs_loaded.cache_dir,
-                                                                  sizeof(wcmd.cmd.hs_loaded.cache_dir));
                                        rspamd_strlcpy(wcmd.cmd.hs_loaded.scope,
                                                                   cmd.cmd.hs_loaded.scope,
                                                                   sizeof(wcmd.cmd.hs_loaded.scope));
@@ -1143,15 +1140,11 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents)
                                        /* After getting this notice, we can clean up old hyperscan files */
                                        rspamd_hyperscan_notice_loaded();
 
-                                       msg_info_main("received hyperscan cache loaded from %s",
-                                                                 cmd.cmd.hs_loaded.cache_dir);
+                                       msg_info_main("received hyperscan cache loaded");
 
                                        /* Broadcast command to all workers */
                                        memset(&wcmd, 0, sizeof(wcmd));
                                        wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
-                                       rspamd_strlcpy(wcmd.cmd.hs_loaded.cache_dir,
-                                                                  cmd.cmd.hs_loaded.cache_dir,
-                                                                  sizeof(wcmd.cmd.hs_loaded.cache_dir));
                                        wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
                                        wcmd.cmd.hs_loaded.scope[0] = '\0'; /* Empty scope for legacy */
                                        rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
@@ -1308,6 +1301,12 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents)
                r = sendmsg(w->fd, &msg, 0);
 
                if (r == -1) {
+                       if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                               /* Socket buffer full, retry on next event loop iteration.
+                                * Keep watcher on EV_WRITE and keep rdata intact.
+                                */
+                               return;
+                       }
                        msg_err_main("cannot write to worker's srv pipe when writing reply: %s; command = %s",
                                                 strerror(errno), rspamd_srv_command_to_string(rdata->rep.type));
                }
@@ -1337,116 +1336,267 @@ void rspamd_srv_start_watching(struct rspamd_main *srv,
        ev_io_start(ev_base, &worker->srv_ev);
 }
 
+/*
+ * Worker-side srv_pipe infrastructure.
+ * Uses a single watcher with ID-keyed hash for proper reply matching.
+ */
+
 struct rspamd_srv_request_data {
-       struct rspamd_worker *worker;
+       uint64_t id;
        struct rspamd_srv_command cmd;
        int attached_fd;
-       struct rspamd_srv_reply rep;
        rspamd_srv_reply_handler handler;
-       ev_io io_ev;
        gpointer ud;
+       gboolean sent;                        /* true if command has been sent */
+       struct rspamd_srv_request_data *next; /* for send queue */
+};
+
+/* Hash table keyed by command id */
+KHASH_INIT(rspamd_srv_requests, uint64_t, struct rspamd_srv_request_data *, 1,
+                  kh_int64_hash_func, kh_int64_hash_equal);
+
+struct rspamd_srv_pipe_ctx {
+       ev_io io_ev;
+       struct rspamd_worker *worker;
+       struct ev_loop *ev_base;
+       khash_t(rspamd_srv_requests) * requests;    /* pending requests by id */
+       struct rspamd_srv_request_data *send_queue; /* queue of requests to send */
 };
 
+static void rspamd_srv_pipe_handler(EV_P_ ev_io *w, int revents);
+
+static void
+rspamd_srv_pipe_update_watcher(struct rspamd_srv_pipe_ctx *ctx)
+{
+       int events = 0;
+
+       /* Need to write if we have unsent requests in the queue */
+       if (ctx->send_queue != NULL) {
+               events |= EV_WRITE;
+       }
+
+       /* Need to read if we have any pending requests awaiting replies */
+       if (kh_size(ctx->requests) > 0) {
+               events |= EV_READ;
+       }
+
+       if (events == 0) {
+               /* No pending work, stop watcher */
+               if (ev_is_active(&ctx->io_ev)) {
+                       ev_io_stop(ctx->ev_base, &ctx->io_ev);
+               }
+       }
+       else {
+               /* Update watcher events if needed */
+               if (!ev_is_active(&ctx->io_ev) || (ctx->io_ev.events & (EV_READ | EV_WRITE)) != events) {
+                       ev_io_stop(ctx->ev_base, &ctx->io_ev);
+                       ev_io_set(&ctx->io_ev, ctx->worker->srv_pipe[1], events);
+                       ev_io_start(ctx->ev_base, &ctx->io_ev);
+               }
+       }
+}
+
 static void
-rspamd_srv_request_handler(EV_P_ ev_io *w, int revents)
+rspamd_srv_pipe_handler(EV_P_ ev_io *w, int revents)
 {
-       struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *) w->data;
+       struct rspamd_srv_pipe_ctx *ctx = (struct rspamd_srv_pipe_ctx *) w->data;
+       struct rspamd_srv_request_data *rd;
        struct msghdr msg;
        struct iovec iov;
        unsigned char fdspace[CMSG_SPACE(sizeof(int))];
        struct cmsghdr *cmsg;
+       struct rspamd_srv_reply rep;
        gssize r;
+       khiter_t k;
        int rfd = -1;
 
-       if (revents == EV_WRITE) {
-               /* Send request to server */
-               memset(&msg, 0, sizeof(msg));
+       if (revents & EV_WRITE) {
+               /* Send queued requests */
+               while (ctx->send_queue != NULL) {
+                       rd = ctx->send_queue;
+
+                       memset(&msg, 0, sizeof(msg));
+
+                       /* Attach fd to the message */
+                       if (rd->attached_fd != -1) {
+                               memset(fdspace, 0, sizeof(fdspace));
+                               msg.msg_control = fdspace;
+                               msg.msg_controllen = sizeof(fdspace);
+                               cmsg = CMSG_FIRSTHDR(&msg);
+                               cmsg->cmsg_level = SOL_SOCKET;
+                               cmsg->cmsg_type = SCM_RIGHTS;
+                               cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+                               memcpy(CMSG_DATA(cmsg), &rd->attached_fd, sizeof(int));
+                       }
 
-               /* Attach fd to the message */
-               if (rd->attached_fd != -1) {
-                       memset(fdspace, 0, sizeof(fdspace));
+                       iov.iov_base = &rd->cmd;
+                       iov.iov_len = sizeof(rd->cmd);
+                       msg.msg_iov = &iov;
+                       msg.msg_iovlen = 1;
+
+                       r = sendmsg(w->fd, &msg, 0);
+
+                       if (r == -1) {
+                               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) {
+                                       /* Socket buffer full, try again later */
+                                       break;
+                               }
+                               msg_err("cannot write to server pipe: %s; command = %s",
+                                               strerror(errno), rspamd_srv_command_to_string(rd->cmd.type));
+                               /* Remove from queue and hash, free */
+                               LL_DELETE(ctx->send_queue, rd);
+                               k = kh_get(rspamd_srv_requests, ctx->requests, rd->id);
+                               if (k != kh_end(ctx->requests)) {
+                                       kh_del(rspamd_srv_requests, ctx->requests, k);
+                               }
+                               g_free(rd);
+                               continue;
+                       }
+                       else if (r != (gssize) sizeof(rd->cmd)) {
+                               msg_err("incomplete write to server pipe: %d != %d; command = %s",
+                                               (int) r, (int) sizeof(rd->cmd),
+                                               rspamd_srv_command_to_string(rd->cmd.type));
+                               LL_DELETE(ctx->send_queue, rd);
+                               k = kh_get(rspamd_srv_requests, ctx->requests, rd->id);
+                               if (k != kh_end(ctx->requests)) {
+                                       kh_del(rspamd_srv_requests, ctx->requests, k);
+                               }
+                               g_free(rd);
+                               continue;
+                       }
+
+                       /* Successfully sent, remove from send queue but keep in hash */
+                       LL_DELETE(ctx->send_queue, rd);
+                       rd->sent = TRUE;
+               }
+       }
+
+       if (revents & EV_READ) {
+               /* Read replies */
+               for (;;) {
+                       iov.iov_base = &rep;
+                       iov.iov_len = sizeof(rep);
+                       memset(&msg, 0, sizeof(msg));
                        msg.msg_control = fdspace;
                        msg.msg_controllen = sizeof(fdspace);
-                       cmsg = CMSG_FIRSTHDR(&msg);
-                       cmsg->cmsg_level = SOL_SOCKET;
-                       cmsg->cmsg_type = SCM_RIGHTS;
-                       cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-                       memcpy(CMSG_DATA(cmsg), &rd->attached_fd, sizeof(int));
-               }
+                       msg.msg_iov = &iov;
+                       msg.msg_iovlen = 1;
 
-               iov.iov_base = &rd->cmd;
-               iov.iov_len = sizeof(rd->cmd);
-               msg.msg_iov = &iov;
-               msg.msg_iovlen = 1;
+                       r = recvmsg(w->fd, &msg, 0);
 
-               r = sendmsg(w->fd, &msg, 0);
+                       if (r == -1) {
+                               if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                                       /* No more data */
+                                       break;
+                               }
+                               msg_err("cannot read from server pipe: %s", strerror(errno));
+                               break;
+                       }
 
-               if (r == -1) {
-                       if (r == ENOBUFS) {
-                               /* On BSD derived systems we can have this error when trying to send
-                                * requests too fast.
-                                * It might be good to retry...
-                                */
-                               msg_info("cannot write to server pipe: %s; command = %s; retrying sending",
-                                                strerror(errno),
-                                                rspamd_srv_command_to_string(rd->cmd.type));
-                               return;
+                       if (r == 0) {
+                               /* Connection closed */
+                               msg_err("server pipe closed unexpectedly");
+                               break;
                        }
-                       msg_err("cannot write to server pipe: %s; command = %s", strerror(errno),
-                                       rspamd_srv_command_to_string(rd->cmd.type));
-                       goto cleanup;
-               }
-               else if (r != sizeof(rd->cmd)) {
-                       msg_err("incomplete write to the server pipe: %d != %d, command = %s",
-                                       (int) r, (int) sizeof(rd->cmd), rspamd_srv_command_to_string(rd->cmd.type));
-                       goto cleanup;
-               }
 
-               ev_io_stop(EV_A_ w);
-               ev_io_set(w, rd->worker->srv_pipe[1], EV_READ);
-               ev_io_start(EV_A_ w);
-       }
-       else {
-               iov.iov_base = &rd->rep;
-               iov.iov_len = sizeof(rd->rep);
-               memset(&msg, 0, sizeof(msg));
-               msg.msg_control = fdspace;
-               msg.msg_controllen = sizeof(fdspace);
-               msg.msg_iov = &iov;
-               msg.msg_iovlen = 1;
+                       if (r != (gssize) sizeof(rep)) {
+                               msg_err("incomplete read from server pipe: %d != %d",
+                                               (int) r, (int) sizeof(rep));
+                               continue;
+                       }
 
-               r = recvmsg(w->fd, &msg, 0);
+                       /* Look up request by ID */
+                       k = kh_get(rspamd_srv_requests, ctx->requests, rep.id);
+                       if (k == kh_end(ctx->requests)) {
+                               msg_warn("received reply for unknown request id %" G_GUINT64_FORMAT,
+                                                rep.id);
+                               continue;
+                       }
 
-               if (r == -1) {
-                       msg_err("cannot read from server pipe: %s; command = %s", strerror(errno),
-                                       rspamd_srv_command_to_string(rd->cmd.type));
-                       goto cleanup;
-               }
+                       rd = kh_val(ctx->requests, k);
+                       kh_del(rspamd_srv_requests, ctx->requests, k);
 
-               if (r != (int) sizeof(rd->rep)) {
-                       msg_err("cannot read from server pipe, invalid length: %d != %d; command = %s",
-                                       (int) r, (int) sizeof(rd->rep), rspamd_srv_command_to_string(rd->cmd.type));
-                       goto cleanup;
-               }
+                       /* Extract attached fd if present */
+                       rfd = -1;
+                       if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
+                               rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
+                       }
 
-               if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
-                       rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
-               }
+                       /* Call handler */
+                       if (rd->handler) {
+                               rd->handler(ctx->worker, &rep, rfd, rd->ud);
+                       }
 
-               /* Reply has been received */
-               if (rd->handler) {
-                       rd->handler(rd->worker, &rd->rep, rfd, rd->ud);
+                       g_free(rd);
                }
+       }
+
+       rspamd_srv_pipe_update_watcher(ctx);
+}
 
-               goto cleanup;
+static struct rspamd_srv_pipe_ctx *
+rspamd_srv_pipe_ctx_create(struct rspamd_worker *worker, struct ev_loop *ev_base)
+{
+       struct rspamd_srv_pipe_ctx *ctx;
+
+       ctx = g_malloc0(sizeof(*ctx));
+       ctx->worker = worker;
+       ctx->ev_base = ev_base;
+       ctx->requests = kh_init(rspamd_srv_requests);
+       ctx->send_queue = NULL;
+
+       ctx->io_ev.data = ctx;
+       ev_io_init(&ctx->io_ev, rspamd_srv_pipe_handler, worker->srv_pipe[1], 0);
+       /* Watcher starts inactive, will be activated when requests are added */
+
+       return ctx;
+}
+
+static void
+rspamd_srv_pipe_ctx_destroy(struct rspamd_srv_pipe_ctx *ctx)
+{
+       struct rspamd_srv_request_data *rd, *tmp;
+       khiter_t k;
+
+       if (ctx == NULL) {
+               return;
        }
 
-       return;
+       ev_io_stop(ctx->ev_base, &ctx->io_ev);
 
+       /* Free send queue */
+       LL_FOREACH_SAFE(ctx->send_queue, rd, tmp)
+       {
+               g_free(rd);
+       }
+
+       /* Free pending requests */
+       for (k = kh_begin(ctx->requests); k != kh_end(ctx->requests); ++k) {
+               if (kh_exist(ctx->requests, k)) {
+                       g_free(kh_val(ctx->requests, k));
+               }
+       }
+       kh_destroy(rspamd_srv_requests, ctx->requests);
 
-cleanup:
-       ev_io_stop(EV_A_ w);
-       g_free(rd);
+       g_free(ctx);
+}
+
+/* Per-worker srv_pipe context (each worker process has its own copy after fork) */
+static struct rspamd_srv_pipe_ctx *srv_pipe_ctx = NULL;
+
+void rspamd_srv_pipe_init(struct rspamd_worker *worker, struct ev_loop *ev_base)
+{
+       if (srv_pipe_ctx != NULL) {
+               /* Already initialized */
+               return;
+       }
+       srv_pipe_ctx = rspamd_srv_pipe_ctx_create(worker, ev_base);
+}
+
+void rspamd_srv_pipe_cleanup(void)
+{
+       rspamd_srv_pipe_ctx_destroy(srv_pipe_ctx);
+       srv_pipe_ctx = NULL;
 }
 
 void rspamd_srv_send_command(struct rspamd_worker *worker,
@@ -1457,24 +1607,41 @@ void rspamd_srv_send_command(struct rspamd_worker *worker,
                                                         gpointer ud)
 {
        struct rspamd_srv_request_data *rd;
+       int ret;
+       khiter_t k;
 
        g_assert(cmd != NULL);
        g_assert(worker != NULL);
 
+       /* Lazy initialization of srv_pipe context */
+       if (srv_pipe_ctx == NULL) {
+               rspamd_srv_pipe_init(worker, ev_base);
+       }
+
        rd = g_malloc0(sizeof(*rd));
        cmd->id = ottery_rand_uint64();
+       rd->id = cmd->id;
        memcpy(&rd->cmd, cmd, sizeof(rd->cmd));
        rd->handler = handler;
        rd->ud = ud;
-       rd->worker = worker;
-       rd->rep.id = cmd->id;
-       rd->rep.type = cmd->type;
        rd->attached_fd = attached_fd;
+       rd->sent = FALSE;
+       rd->next = NULL;
+
+       /* Add to hash for reply matching */
+       k = kh_put(rspamd_srv_requests, srv_pipe_ctx->requests, rd->id, &ret);
+       if (ret < 0) {
+               msg_err("cannot add request to hash table");
+               g_free(rd);
+               return;
+       }
+       kh_val(srv_pipe_ctx->requests, k) = rd;
+
+       /* Add to send queue */
+       LL_APPEND(srv_pipe_ctx->send_queue, rd);
 
-       rd->io_ev.data = rd;
-       ev_io_init(&rd->io_ev, rspamd_srv_request_handler,
-                          rd->worker->srv_pipe[1], EV_WRITE);
-       ev_io_start(ev_base, &rd->io_ev);
+       /* Activate watcher */
+       rspamd_srv_pipe_update_watcher(srv_pipe_ctx);
 }
 
 enum rspamd_control_type
index 6b1835c6097e3d8ef957823b108356df1a8ad1d4..92cf5747d56e8202577feba27f6071c3e90c4127 100644 (file)
@@ -81,7 +81,6 @@ struct rspamd_control_command {
                } recompile;
                struct {
                        gboolean forced;
-                       char cache_dir[CONTROL_PATHLEN];
                        char scope[64]; /* Scope name, NULL means all scopes */
                } hs_loaded;
                struct {
@@ -196,7 +195,6 @@ struct rspamd_srv_command {
                } spair;
                struct {
                        gboolean forced;
-                       char cache_dir[CONTROL_PATHLEN];
                        char scope[64]; /* Scope name, NULL means all scopes */
                } hs_loaded;
                struct {
@@ -380,6 +378,18 @@ void rspamd_worker_set_busy(struct rspamd_worker *worker,
                                                        struct ev_loop *event_loop,
                                                        const char *reason);
 
+/**
+ * Initialize srv_pipe handling for a worker
+ * Called automatically by rspamd_srv_send_command if not initialized
+ */
+void rspamd_srv_pipe_init(struct rspamd_worker *worker, struct ev_loop *ev_base);
+
+/**
+ * Cleanup srv_pipe handling for a worker
+ * Should be called during worker shutdown
+ */
+void rspamd_srv_pipe_cleanup(void);
+
 G_END_DECLS
 
 #endif
index 9aa68a96a6eaebd097c7098d72c6d7c058431363..ef4734a35cc97cf88d171acb4483f22668725d1c 100644 (file)
@@ -1334,9 +1334,14 @@ rspamd_handle_child_fork(struct rspamd_worker *wrk,
         * is blocking.
         */
        rspamd_socket_nonblocking(wrk->control_pipe[1]);
-#if 0
-       rspamd_socket_nonblocking (wrk->srv_pipe[1]);
-#endif
+       /*
+        * srv_pipe must be non-blocking on the worker side to avoid deadlocks
+        * when multiple rspamd_srv_send_command calls create multiple ev_io
+        * watchers on the same fd. With a blocking socket, if multiple watchers
+        * wait for replies and only one reply arrives, the second watcher's
+        * recvmsg() would block forever.
+        */
+       rspamd_socket_nonblocking(wrk->srv_pipe[1]);
        rspamd_main->cfg->cur_worker = wrk;
        /* Execute worker (this function should not return normally!) */
        cf->worker->worker_start_func(wrk);
@@ -1355,19 +1360,14 @@ rspamd_handle_main_fork(struct rspamd_worker *wrk,
        close(wrk->srv_pipe[1]);
 
        /*
-        * There are no reasons why control pipes are blocking: the messages
-        * there are rare and are strictly bounded by command sizes, so if we block
-        * on some pipe, it is ok, as we still poll that for all operations.
-        * It is also impossible to block on writing in normal conditions.
-        * And if the conditions are not normal, e.g. a worker is unresponsive, then
-        * we can safely think that the non-blocking behaviour as it is implemented
-        * currently will not make things better, as it would lead to incomplete
-        * reads/writes that are not handled anyhow and are totally broken from the
-        * beginning.
+        * Both control and srv pipes are non-blocking. This is necessary because
+        * multiple commands can be queued (e.g., during hyperscan compilation with
+        * rspamd_worker_set_busy calls), creating multiple ev_io watchers on the
+        * same fd. With blocking sockets, if multiple watchers wait for replies
+        * and only one arrives, other watchers' recvmsg() would block forever.
+        * EAGAIN is handled properly in the event handlers.
         */
-#if 0
-       rspamd_socket_nonblocking (wrk->srv_pipe[0]);
-#endif
+       rspamd_socket_nonblocking(wrk->srv_pipe[0]);
        rspamd_socket_nonblocking(wrk->control_pipe[0]);
 
        rspamd_srv_start_watching(rspamd_main, wrk, ev_base);
@@ -2053,17 +2053,18 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
 {
        struct rspamd_control_reply rep;
        struct rspamd_re_cache *cache = worker->srv->cfg->re_cache;
+       const char *cache_dir = worker->srv->cfg->hs_cache_dir;
 
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
 
-       msg_debug_hyperscan("received hyperscan loaded notification, cache_dir=%s, forced=%d",
-                                               cmd->cmd.hs_loaded.cache_dir, cmd->cmd.hs_loaded.forced);
+       msg_debug_hyperscan("received hyperscan loaded notification, forced=%d",
+                                               cmd->cmd.hs_loaded.forced);
 
        if (rspamd_hs_cache_has_lua_backend()) {
                msg_debug_hyperscan("using async backend-based hyperscan loading");
                rspamd_re_cache_load_hyperscan_scoped_async(cache, worker->srv->event_loop,
-                                                                                                       cmd->cmd.hs_loaded.cache_dir, false);
+                                                                                                       cache_dir, false);
                rep.reply.hs_loaded.status = 0;
        }
        else {
@@ -2072,7 +2073,7 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
                        const char *scope = cmd->cmd.hs_loaded.scope;
                        msg_debug_hyperscan("loading hyperscan expressions for scope '%s' after receiving compilation notice", scope);
                        rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan_scoped(
-                               cache, cmd->cmd.hs_loaded.cache_dir, false);
+                               cache, cache_dir, false);
                }
                else {
                        if (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
@@ -2080,7 +2081,7 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
                                msg_debug_hyperscan("loading hyperscan expressions after receiving compilation notice: %s",
                                                                        (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update");
                                rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan(
-                                       worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false);
+                                       worker->srv->cfg->re_cache, cache_dir, false);
                        }
                }
        }
index 358727714d6470a767cd27fdcaef3fbfc0c83e64..7666cf1d952c95869176cfbbbbd910793383e8b1 100644 (file)
@@ -362,7 +362,7 @@ lua_worker_control_handler(struct rspamd_main *rspamd_main,
                lua_setfield(L, -2, "tag");
                break;
        case RSPAMD_CONTROL_HYPERSCAN_LOADED:
-               lua_pushstring(L, cmd->cmd.hs_loaded.cache_dir);
+               lua_pushstring(L, worker->srv->cfg->hs_cache_dir);
                lua_setfield(L, -2, "cache_dir");
                lua_pushboolean(L, cmd->cmd.hs_loaded.forced);
                lua_setfield(L, -2, "forced");