#include "libutil/libev_helper.h"
#include "unix-std.h"
#include "utlist.h"
+#include "khash.h"
#include "composites/composites.h"
#ifdef HAVE_SYS_RESOURCE_H
if (cmd.cmd.hs_loaded.scope[0] != '\0') {
/* Scoped loading */
const char *scope = cmd.cmd.hs_loaded.scope;
- msg_info_main("received scoped hyperscan cache loaded from %s for scope: %s",
- cmd.cmd.hs_loaded.cache_dir, scope);
+ msg_info_main("received scoped hyperscan cache loaded for scope: %s", scope);
/* Broadcast scoped command to all workers */
memset(&wcmd, 0, sizeof(wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- rspamd_strlcpy(wcmd.cmd.hs_loaded.cache_dir,
- cmd.cmd.hs_loaded.cache_dir,
- sizeof(wcmd.cmd.hs_loaded.cache_dir));
rspamd_strlcpy(wcmd.cmd.hs_loaded.scope,
cmd.cmd.hs_loaded.scope,
sizeof(wcmd.cmd.hs_loaded.scope));
/* After getting this notice, we can clean up old hyperscan files */
rspamd_hyperscan_notice_loaded();
- msg_info_main("received hyperscan cache loaded from %s",
- cmd.cmd.hs_loaded.cache_dir);
+ msg_info_main("received hyperscan cache loaded");
/* Broadcast command to all workers */
memset(&wcmd, 0, sizeof(wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- rspamd_strlcpy(wcmd.cmd.hs_loaded.cache_dir,
- cmd.cmd.hs_loaded.cache_dir,
- sizeof(wcmd.cmd.hs_loaded.cache_dir));
wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
wcmd.cmd.hs_loaded.scope[0] = '\0'; /* Empty scope for legacy */
rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
r = sendmsg(w->fd, &msg, 0);
if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* Socket buffer full, retry on next event loop iteration.
+ * Keep watcher on EV_WRITE and keep rdata intact.
+ */
+ return;
+ }
msg_err_main("cannot write to worker's srv pipe when writing reply: %s; command = %s",
strerror(errno), rspamd_srv_command_to_string(rdata->rep.type));
}
ev_io_start(ev_base, &worker->srv_ev);
}
+/*
+ * Worker-side srv_pipe infrastructure.
+ * Uses a single watcher with ID-keyed hash for proper reply matching.
+ */
+
struct rspamd_srv_request_data {
- struct rspamd_worker *worker;
+ uint64_t id;
struct rspamd_srv_command cmd;
int attached_fd;
- struct rspamd_srv_reply rep;
rspamd_srv_reply_handler handler;
- ev_io io_ev;
gpointer ud;
+ gboolean sent; /* true if command has been sent */
+ struct rspamd_srv_request_data *next; /* for send queue */
+};
+
+/* Hash table keyed by command id */
+KHASH_INIT(rspamd_srv_requests, uint64_t, struct rspamd_srv_request_data *, 1,
+ kh_int64_hash_func, kh_int64_hash_equal);
+
+struct rspamd_srv_pipe_ctx {
+ ev_io io_ev;
+ struct rspamd_worker *worker;
+ struct ev_loop *ev_base;
+ khash_t(rspamd_srv_requests) * requests; /* pending requests by id */
+ struct rspamd_srv_request_data *send_queue; /* queue of requests to send */
};
+static void rspamd_srv_pipe_handler(EV_P_ ev_io *w, int revents);
+
+static void
+rspamd_srv_pipe_update_watcher(struct rspamd_srv_pipe_ctx *ctx)
+{
+ int events = 0;
+
+ /* Need to write if we have unsent requests in the queue */
+ if (ctx->send_queue != NULL) {
+ events |= EV_WRITE;
+ }
+
+ /* Need to read if we have any pending requests awaiting replies */
+ if (kh_size(ctx->requests) > 0) {
+ events |= EV_READ;
+ }
+
+ if (events == 0) {
+ /* No pending work, stop watcher */
+ if (ev_is_active(&ctx->io_ev)) {
+ ev_io_stop(ctx->ev_base, &ctx->io_ev);
+ }
+ }
+ else {
+ /* Update watcher events if needed */
+ if (!ev_is_active(&ctx->io_ev) || (ctx->io_ev.events & (EV_READ | EV_WRITE)) != events) {
+ ev_io_stop(ctx->ev_base, &ctx->io_ev);
+ ev_io_set(&ctx->io_ev, ctx->worker->srv_pipe[1], events);
+ ev_io_start(ctx->ev_base, &ctx->io_ev);
+ }
+ }
+}
+
static void
-rspamd_srv_request_handler(EV_P_ ev_io *w, int revents)
+rspamd_srv_pipe_handler(EV_P_ ev_io *w, int revents)
{
- struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *) w->data;
+ struct rspamd_srv_pipe_ctx *ctx = (struct rspamd_srv_pipe_ctx *) w->data;
+ struct rspamd_srv_request_data *rd;
struct msghdr msg;
struct iovec iov;
unsigned char fdspace[CMSG_SPACE(sizeof(int))];
struct cmsghdr *cmsg;
+ struct rspamd_srv_reply rep;
gssize r;
+ khiter_t k;
int rfd = -1;
- if (revents == EV_WRITE) {
- /* Send request to server */
- memset(&msg, 0, sizeof(msg));
+ if (revents & EV_WRITE) {
+ /* Send queued requests */
+ while (ctx->send_queue != NULL) {
+ rd = ctx->send_queue;
+
+ memset(&msg, 0, sizeof(msg));
+
+ /* Attach fd to the message */
+ if (rd->attached_fd != -1) {
+ memset(fdspace, 0, sizeof(fdspace));
+ msg.msg_control = fdspace;
+ msg.msg_controllen = sizeof(fdspace);
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(cmsg), &rd->attached_fd, sizeof(int));
+ }
- /* Attach fd to the message */
- if (rd->attached_fd != -1) {
- memset(fdspace, 0, sizeof(fdspace));
+ iov.iov_base = &rd->cmd;
+ iov.iov_len = sizeof(rd->cmd);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ r = sendmsg(w->fd, &msg, 0);
+
+ if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) {
+ /* Socket buffer full, try again later */
+ break;
+ }
+ msg_err("cannot write to server pipe: %s; command = %s",
+ strerror(errno), rspamd_srv_command_to_string(rd->cmd.type));
+ /* Remove from queue and hash, free */
+ LL_DELETE(ctx->send_queue, rd);
+ k = kh_get(rspamd_srv_requests, ctx->requests, rd->id);
+ if (k != kh_end(ctx->requests)) {
+ kh_del(rspamd_srv_requests, ctx->requests, k);
+ }
+ g_free(rd);
+ continue;
+ }
+ else if (r != (gssize) sizeof(rd->cmd)) {
+ msg_err("incomplete write to server pipe: %d != %d; command = %s",
+ (int) r, (int) sizeof(rd->cmd),
+ rspamd_srv_command_to_string(rd->cmd.type));
+ LL_DELETE(ctx->send_queue, rd);
+ k = kh_get(rspamd_srv_requests, ctx->requests, rd->id);
+ if (k != kh_end(ctx->requests)) {
+ kh_del(rspamd_srv_requests, ctx->requests, k);
+ }
+ g_free(rd);
+ continue;
+ }
+
+ /* Successfully sent, remove from send queue but keep in hash */
+ LL_DELETE(ctx->send_queue, rd);
+ rd->sent = TRUE;
+ }
+ }
+
+ if (revents & EV_READ) {
+ /* Read replies */
+ for (;;) {
+ iov.iov_base = &rep;
+ iov.iov_len = sizeof(rep);
+ memset(&msg, 0, sizeof(msg));
msg.msg_control = fdspace;
msg.msg_controllen = sizeof(fdspace);
- cmsg = CMSG_FIRSTHDR(&msg);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = CMSG_LEN(sizeof(int));
- memcpy(CMSG_DATA(cmsg), &rd->attached_fd, sizeof(int));
- }
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
- iov.iov_base = &rd->cmd;
- iov.iov_len = sizeof(rd->cmd);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
+ r = recvmsg(w->fd, &msg, 0);
- r = sendmsg(w->fd, &msg, 0);
+ if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* No more data */
+ break;
+ }
+ msg_err("cannot read from server pipe: %s", strerror(errno));
+ break;
+ }
- if (r == -1) {
- if (r == ENOBUFS) {
- /* On BSD derived systems we can have this error when trying to send
- * requests too fast.
- * It might be good to retry...
- */
- msg_info("cannot write to server pipe: %s; command = %s; retrying sending",
- strerror(errno),
- rspamd_srv_command_to_string(rd->cmd.type));
- return;
+ if (r == 0) {
+ /* Connection closed */
+ msg_err("server pipe closed unexpectedly");
+ break;
}
- msg_err("cannot write to server pipe: %s; command = %s", strerror(errno),
- rspamd_srv_command_to_string(rd->cmd.type));
- goto cleanup;
- }
- else if (r != sizeof(rd->cmd)) {
- msg_err("incomplete write to the server pipe: %d != %d, command = %s",
- (int) r, (int) sizeof(rd->cmd), rspamd_srv_command_to_string(rd->cmd.type));
- goto cleanup;
- }
- ev_io_stop(EV_A_ w);
- ev_io_set(w, rd->worker->srv_pipe[1], EV_READ);
- ev_io_start(EV_A_ w);
- }
- else {
- iov.iov_base = &rd->rep;
- iov.iov_len = sizeof(rd->rep);
- memset(&msg, 0, sizeof(msg));
- msg.msg_control = fdspace;
- msg.msg_controllen = sizeof(fdspace);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
+ if (r != (gssize) sizeof(rep)) {
+ msg_err("incomplete read from server pipe: %d != %d",
+ (int) r, (int) sizeof(rep));
+ continue;
+ }
- r = recvmsg(w->fd, &msg, 0);
+ /* Look up request by ID */
+ k = kh_get(rspamd_srv_requests, ctx->requests, rep.id);
+ if (k == kh_end(ctx->requests)) {
+ msg_warn("received reply for unknown request id %" G_GUINT64_FORMAT,
+ rep.id);
+ continue;
+ }
- if (r == -1) {
- msg_err("cannot read from server pipe: %s; command = %s", strerror(errno),
- rspamd_srv_command_to_string(rd->cmd.type));
- goto cleanup;
- }
+ rd = kh_val(ctx->requests, k);
+ kh_del(rspamd_srv_requests, ctx->requests, k);
- if (r != (int) sizeof(rd->rep)) {
- msg_err("cannot read from server pipe, invalid length: %d != %d; command = %s",
- (int) r, (int) sizeof(rd->rep), rspamd_srv_command_to_string(rd->cmd.type));
- goto cleanup;
- }
+ /* Extract attached fd if present */
+ rfd = -1;
+ if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
+ rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
+ }
- if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
- rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
- }
+ /* Call handler */
+ if (rd->handler) {
+ rd->handler(ctx->worker, &rep, rfd, rd->ud);
+ }
- /* Reply has been received */
- if (rd->handler) {
- rd->handler(rd->worker, &rd->rep, rfd, rd->ud);
+ g_free(rd);
}
+ }
+
+ rspamd_srv_pipe_update_watcher(ctx);
+}
- goto cleanup;
+static struct rspamd_srv_pipe_ctx *
+rspamd_srv_pipe_ctx_create(struct rspamd_worker *worker, struct ev_loop *ev_base)
+{
+ struct rspamd_srv_pipe_ctx *ctx;
+
+ ctx = g_malloc0(sizeof(*ctx));
+ ctx->worker = worker;
+ ctx->ev_base = ev_base;
+ ctx->requests = kh_init(rspamd_srv_requests);
+ ctx->send_queue = NULL;
+
+ ctx->io_ev.data = ctx;
+ ev_io_init(&ctx->io_ev, rspamd_srv_pipe_handler, worker->srv_pipe[1], 0);
+ /* Watcher starts inactive, will be activated when requests are added */
+
+ return ctx;
+}
+
+static void
+rspamd_srv_pipe_ctx_destroy(struct rspamd_srv_pipe_ctx *ctx)
+{
+ struct rspamd_srv_request_data *rd, *tmp;
+ khiter_t k;
+
+ if (ctx == NULL) {
+ return;
}
- return;
+ ev_io_stop(ctx->ev_base, &ctx->io_ev);
+ /* Free send queue */
+ LL_FOREACH_SAFE(ctx->send_queue, rd, tmp)
+ {
+ g_free(rd);
+ }
+
+ /* Free pending requests */
+ for (k = kh_begin(ctx->requests); k != kh_end(ctx->requests); ++k) {
+ if (kh_exist(ctx->requests, k)) {
+ g_free(kh_val(ctx->requests, k));
+ }
+ }
+ kh_destroy(rspamd_srv_requests, ctx->requests);
-cleanup:
- ev_io_stop(EV_A_ w);
- g_free(rd);
+ g_free(ctx);
+}
+
+/* Per-worker srv_pipe context (each worker process has its own copy after fork) */
+static struct rspamd_srv_pipe_ctx *srv_pipe_ctx = NULL;
+
+void rspamd_srv_pipe_init(struct rspamd_worker *worker, struct ev_loop *ev_base)
+{
+ if (srv_pipe_ctx != NULL) {
+ /* Already initialized */
+ return;
+ }
+ srv_pipe_ctx = rspamd_srv_pipe_ctx_create(worker, ev_base);
+}
+
+void rspamd_srv_pipe_cleanup(void)
+{
+ rspamd_srv_pipe_ctx_destroy(srv_pipe_ctx);
+ srv_pipe_ctx = NULL;
}
void rspamd_srv_send_command(struct rspamd_worker *worker,
gpointer ud)
{
struct rspamd_srv_request_data *rd;
+ int ret;
+ khiter_t k;
g_assert(cmd != NULL);
g_assert(worker != NULL);
+ /* Lazy initialization of srv_pipe context */
+ if (srv_pipe_ctx == NULL) {
+ rspamd_srv_pipe_init(worker, ev_base);
+ }
+
rd = g_malloc0(sizeof(*rd));
cmd->id = ottery_rand_uint64();
+ rd->id = cmd->id;
memcpy(&rd->cmd, cmd, sizeof(rd->cmd));
rd->handler = handler;
rd->ud = ud;
- rd->worker = worker;
- rd->rep.id = cmd->id;
- rd->rep.type = cmd->type;
rd->attached_fd = attached_fd;
+ rd->sent = FALSE;
+ rd->next = NULL;
+
+ /* Add to hash for reply matching */
+ k = kh_put(rspamd_srv_requests, srv_pipe_ctx->requests, rd->id, &ret);
+ if (ret < 0) {
+ msg_err("cannot add request to hash table");
+ g_free(rd);
+ return;
+ }
+ kh_val(srv_pipe_ctx->requests, k) = rd;
+
+ /* Add to send queue */
+ LL_APPEND(srv_pipe_ctx->send_queue, rd);
- rd->io_ev.data = rd;
- ev_io_init(&rd->io_ev, rspamd_srv_request_handler,
- rd->worker->srv_pipe[1], EV_WRITE);
- ev_io_start(ev_base, &rd->io_ev);
+ /* Activate watcher */
+ rspamd_srv_pipe_update_watcher(srv_pipe_ctx);
}
enum rspamd_control_type
* is blocking.
*/
rspamd_socket_nonblocking(wrk->control_pipe[1]);
-#if 0
- rspamd_socket_nonblocking (wrk->srv_pipe[1]);
-#endif
+ /*
+ * srv_pipe must be non-blocking on the worker side to avoid deadlocks
+ * when multiple rspamd_srv_send_command calls create multiple ev_io
+ * watchers on the same fd. With a blocking socket, if multiple watchers
+ * wait for replies and only one reply arrives, the second watcher's
+ * recvmsg() would block forever.
+ */
+ rspamd_socket_nonblocking(wrk->srv_pipe[1]);
rspamd_main->cfg->cur_worker = wrk;
/* Execute worker (this function should not return normally!) */
cf->worker->worker_start_func(wrk);
close(wrk->srv_pipe[1]);
/*
- * There are no reasons why control pipes are blocking: the messages
- * there are rare and are strictly bounded by command sizes, so if we block
- * on some pipe, it is ok, as we still poll that for all operations.
- * It is also impossible to block on writing in normal conditions.
- * And if the conditions are not normal, e.g. a worker is unresponsive, then
- * we can safely think that the non-blocking behaviour as it is implemented
- * currently will not make things better, as it would lead to incomplete
- * reads/writes that are not handled anyhow and are totally broken from the
- * beginning.
+ * Both control and srv pipes are non-blocking. This is necessary because
+ * multiple commands can be queued (e.g., during hyperscan compilation with
+ * rspamd_worker_set_busy calls), creating multiple ev_io watchers on the
+ * same fd. With blocking sockets, if multiple watchers wait for replies
+ * and only one arrives, other watchers' recvmsg() would block forever.
+ * EAGAIN is handled properly in the event handlers.
*/
-#if 0
- rspamd_socket_nonblocking (wrk->srv_pipe[0]);
-#endif
+ rspamd_socket_nonblocking(wrk->srv_pipe[0]);
rspamd_socket_nonblocking(wrk->control_pipe[0]);
rspamd_srv_start_watching(rspamd_main, wrk, ev_base);
{
struct rspamd_control_reply rep;
struct rspamd_re_cache *cache = worker->srv->cfg->re_cache;
+ const char *cache_dir = worker->srv->cfg->hs_cache_dir;
memset(&rep, 0, sizeof(rep));
rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
- msg_debug_hyperscan("received hyperscan loaded notification, cache_dir=%s, forced=%d",
- cmd->cmd.hs_loaded.cache_dir, cmd->cmd.hs_loaded.forced);
+ msg_debug_hyperscan("received hyperscan loaded notification, forced=%d",
+ cmd->cmd.hs_loaded.forced);
if (rspamd_hs_cache_has_lua_backend()) {
msg_debug_hyperscan("using async backend-based hyperscan loading");
rspamd_re_cache_load_hyperscan_scoped_async(cache, worker->srv->event_loop,
- cmd->cmd.hs_loaded.cache_dir, false);
+ cache_dir, false);
rep.reply.hs_loaded.status = 0;
}
else {
const char *scope = cmd->cmd.hs_loaded.scope;
msg_debug_hyperscan("loading hyperscan expressions for scope '%s' after receiving compilation notice", scope);
rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan_scoped(
- cache, cmd->cmd.hs_loaded.cache_dir, false);
+ cache, cache_dir, false);
}
else {
if (rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
msg_debug_hyperscan("loading hyperscan expressions after receiving compilation notice: %s",
(rspamd_re_cache_is_hs_loaded(cache) != RSPAMD_HYPERSCAN_LOADED_FULL) ? "new db" : "forced update");
rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan(
- worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir, false);
+ worker->srv->cfg->re_cache, cache_dir, false);
}
}
}