]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Refactor control socket to use ID-based request/reply matching
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 13 Jan 2026 21:52:13 +0000 (21:52 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 13 Jan 2026 21:52:13 +0000 (21:52 +0000)
Replace the serialization-based control command handling with an ID-based
approach using khash, mirroring the existing rspamd_srv_requests pattern.

Key changes:
- Add uint64_t id field to control command/reply structs
- Use khash for O(1) request lookup by ID instead of GHashTable
- Add rspamd_control_reply_handler() for centralized reply processing
- Add rspamd_control_pending_new/destroy/remove_all() API functions
- Add control_ev watcher to worker struct for reply monitoring
- Call rspamd_srv_pipe_cleanup() on worker shutdown to prevent leaks
- Handle ID collisions gracefully (warn and free old entry)

This fixes hash table iterator corruption crashes that occurred when
modifying the hash during iteration, and provides more robust concurrent
command handling.

src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/libserver/worker_util.c
src/rspamd.c
src/rspamd.h

index 83f275bb81dd1ba33ed79b4c71200c2879e5ec91..f730aaf9ef10c2d39315c36007e7a812794c9458 100644 (file)
@@ -45,20 +45,20 @@ struct rspamd_control_session;
 
 struct rspamd_control_reply_elt {
        struct rspamd_control_reply reply;
-       struct rspamd_io_ev ev;
-       struct ev_loop *event_loop;
        struct rspamd_worker *worker;
        GQuark wrk_type;
        pid_t wrk_pid;
        rspamd_ev_cb handler;
        gpointer ud;
        int attached_fd;
-       bool sent;
        struct rspamd_control_command cmd;
-       GHashTable *pending_elts;
        struct rspamd_control_reply_elt *prev, *next;
 };
 
+/* Hash table keyed by command id for pending control requests */
+KHASH_INIT(rspamd_control_requests, uint64_t, struct rspamd_control_reply_elt *, 1,
+                  kh_int64_hash_func, kh_int64_hash_equal);
+
 struct rspamd_control_session {
        int fd;
        struct ev_loop *event_loop;
@@ -88,7 +88,6 @@ static const struct rspamd_control_cmd_match {
 };
 
 static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
-static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
 
 INIT_LOG_MODULE(control)
 
@@ -155,7 +154,14 @@ rspamd_control_connection_close(struct rspamd_control_session *session)
 
        DL_FOREACH_SAFE(session->replies, elt, telt)
        {
-               rspamd_control_stop_pending(elt);
+               /* Remove from worker's pending hash if still there */
+               khash_t(rspamd_control_requests) *h =
+                       (khash_t(rspamd_control_requests) *) elt->worker->control_events_pending;
+               khiter_t k = kh_get(rspamd_control_requests, h, elt->cmd.id);
+               if (k != kh_end(h)) {
+                       kh_del(rspamd_control_requests, h, k);
+               }
+               g_free(elt);
        }
 
        rspamd_inet_address_free(session->addr);
@@ -326,44 +332,21 @@ rspamd_control_wrk_io(int fd, short what, gpointer ud)
 {
        struct rspamd_control_reply_elt *elt = ud;
        struct rspamd_control_session *session;
-       unsigned char fdspace[CMSG_SPACE(sizeof(int))];
-       struct iovec iov;
-       struct msghdr msg;
-       gssize r;
 
        session = elt->ud;
-       elt->attached_fd = -1;
-
-       if (what == EV_READ) {
-               iov.iov_base = &elt->reply;
-               iov.iov_len = sizeof(elt->reply);
-               memset(&msg, 0, sizeof(msg));
-               msg.msg_control = fdspace;
-               msg.msg_controllen = sizeof(fdspace);
-               msg.msg_iov = &iov;
-               msg.msg_iovlen = 1;
 
-               r = recvmsg(fd, &msg, 0);
-               if (r == -1) {
-                       msg_err("cannot read reply from the worker %P (%s): %s",
-                                       elt->wrk_pid, g_quark_to_string(elt->wrk_type),
-                                       strerror(errno));
-               }
-               else if (r >= (gssize) sizeof(elt->reply)) {
-                       if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
-                               elt->attached_fd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
-                       }
-               }
-       }
-       else {
+       /*
+        * Note: In the new design, the reply is already read by
+        * rspamd_control_reply_handler() and stored in elt->reply.
+        * We just need to process it here.
+        */
+       if (what != EV_READ) {
                /* Timeout waiting */
                msg_warn("timeout waiting reply from %P (%s)",
                                 elt->wrk_pid, g_quark_to_string(elt->wrk_type));
        }
 
        session->replies_remain--;
-       rspamd_ev_watcher_stop(session->event_loop,
-                                                  &elt->ev);
 
        if (session->replies_remain == 0) {
                rspamd_control_write_reply(session);
@@ -388,22 +371,58 @@ rspamd_control_error_handler(struct rspamd_http_connection *conn, GError *err)
        }
 }
 
-void rspamd_pending_control_free(gpointer p)
+static void
+rspamd_pending_control_free(struct rspamd_control_reply_elt *rep_elt)
 {
-       struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p;
-
-       if (rep_elt->sent) {
-               rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
-       }
-       else if (rep_elt->attached_fd != -1) {
-               /* Only for non-sent requests! */
+       if (rep_elt->attached_fd != -1) {
                close(rep_elt->attached_fd);
        }
 
-       g_hash_table_unref(rep_elt->pending_elts);
        g_free(rep_elt);
 }
 
+void *
+rspamd_control_pending_new(void)
+{
+       return kh_init(rspamd_control_requests);
+}
+
+void rspamd_control_pending_destroy(void *p)
+{
+       khash_t(rspamd_control_requests) *h = (khash_t(rspamd_control_requests) *) p;
+       khiter_t k;
+
+       if (h == NULL) {
+               return;
+       }
+
+       for (k = kh_begin(h); k != kh_end(h); ++k) {
+               if (kh_exist(h, k)) {
+                       rspamd_pending_control_free(kh_val(h, k));
+               }
+       }
+
+       kh_destroy(rspamd_control_requests, h);
+}
+
+void rspamd_control_pending_remove_all(void *p)
+{
+       khash_t(rspamd_control_requests) *h = (khash_t(rspamd_control_requests) *) p;
+       khiter_t k;
+
+       if (h == NULL) {
+               return;
+       }
+
+       for (k = kh_begin(h); k != kh_end(h); ++k) {
+               if (kh_exist(h, k)) {
+                       rspamd_pending_control_free(kh_val(h, k));
+               }
+       }
+
+       kh_clear(rspamd_control_requests, h);
+}
+
 static inline void
 rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
                                                   int attached_fd, struct msghdr *msg,
@@ -433,76 +452,71 @@ rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
        msg->msg_iovlen = 1;
 }
 
+/* Handler for control replies on main side - dispatches by ID */
 static void
-rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
+rspamd_control_reply_handler(EV_P_ ev_io *w, int revents)
 {
-       GHashTable *htb;
-       struct rspamd_main *rspamd_main;
-       gsize pending;
-
-       /* It stops event and frees hash */
-       htb = elt->pending_elts;
-
-       pending = g_hash_table_size(htb);
-       msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
-                                         g_quark_to_string(elt->wrk_type),
-                                         (int) pending);
-
-       if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) {
-               /* Invoke another event from the queue */
-               GHashTableIter it;
-               gpointer k, v;
-
-               g_hash_table_iter_init(&it, elt->pending_elts);
-
-               while (g_hash_table_iter_next(&it, &k, &v)) {
-                       struct rspamd_control_reply_elt *cur = v;
-
-                       if (!cur->sent) {
-                               struct msghdr msg;
-                               struct iovec iov;
-
-                               rspamd_main = cur->worker->srv;
-                               /* Provide a control buffer with correct lifetime for sendmsg */
-                               unsigned char fdspace[CMSG_SPACE(sizeof(int))];
-                               rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg, &iov, fdspace, sizeof(fdspace));
-                               ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
-
-                               if (r == sizeof(cur->cmd)) {
-                                       msg_debug_control("restarting pending event for %P(%s), %d events pending",
-                                                                         cur->wrk_pid,
-                                                                         g_quark_to_string(cur->wrk_type),
-                                                                         (int) pending - 1);
-                                       rspamd_ev_watcher_init(&cur->ev,
-                                                                                  cur->worker->control_pipe[0],
-                                                                                  EV_READ, cur->handler,
-                                                                                  cur);
-                                       rspamd_ev_watcher_start(cur->event_loop,
-                                                                                       &cur->ev, worker_io_timeout);
-                                       cur->sent = true;
-                                       if (cur->attached_fd != -1) {
-                                               /* Since `sendmsg` performs `dup` for us, we need to remove our own descriptor */
-                                               close(cur->attached_fd);
-                                               cur->attached_fd = -1;
-                                       }
+       struct rspamd_worker *wrk = (struct rspamd_worker *) w->data;
+       khash_t(rspamd_control_requests) *h =
+               (khash_t(rspamd_control_requests) *) wrk->control_events_pending;
+       struct rspamd_control_reply rep;
+       struct rspamd_control_reply_elt *elt;
+       struct rspamd_main *rspamd_main = wrk->srv;
+       gssize r;
+       khiter_t k;
 
-                                       break; /* Exit the outer loop as we have invoked something */
-                               }
-                               else {
-                                       msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
-                                                                (int) cur->cmd.type,
-                                                                cur->wrk_pid,
-                                                                g_quark_to_string(cur->wrk_type),
-                                                                cur->worker->control_pipe[0],
-                                                                strerror(errno));
-                                       g_hash_table_remove(elt->pending_elts, cur);
-                               }
+       for (;;) {
+               r = read(w->fd, &rep, sizeof(rep));
+
+               if (r == -1) {
+                       if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                               /* No more data */
+                               break;
                        }
+                       msg_err_main("cannot read control reply from worker %P: %s",
+                                                wrk->pid, strerror(errno));
+                       break;
+               }
+
+               if (r == 0) {
+                       /* Connection closed */
+                       msg_debug_control("control pipe closed for worker %P", wrk->pid);
+                       ev_io_stop(EV_A_ w);
+                       break;
+               }
+
+               if (r != (gssize) sizeof(rep)) {
+                       msg_err_main("incomplete control reply from worker %P: %d != %d",
+                                                wrk->pid, (int) r, (int) sizeof(rep));
+                       continue;
+               }
+
+               /* Look up request by ID */
+               k = kh_get(rspamd_control_requests, h, rep.id);
+               if (k == kh_end(h)) {
+                       msg_warn_main("received control reply for unknown request id %" G_GUINT64_FORMAT
+                                                 " from worker %P",
+                                                 rep.id, wrk->pid);
+                       continue;
+               }
+
+               elt = kh_val(h, k);
+               kh_del(rspamd_control_requests, h, k);
+
+               msg_debug_control("received reply for command %d id %" G_GUINT64_FORMAT " from worker %P(%s)",
+                                                 (int) rep.type, rep.id, wrk->pid, g_quark_to_string(wrk->type));
+
+               /* Copy reply to element and call handler */
+               memcpy(&elt->reply, &rep, sizeof(rep));
+               if (elt->handler) {
+                       elt->handler(w->fd, EV_READ, elt);
                }
        }
 
-       /* Remove from hash and performs the cleanup */
-       g_hash_table_remove(elt->pending_elts, elt);
+       /* Stop watcher if no more pending requests */
+       if (kh_size(h) == 0) {
+               ev_io_stop(EV_A_ w);
+       }
 }
 
 static struct rspamd_control_reply_elt *
@@ -516,8 +530,11 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
        GHashTableIter it;
        struct rspamd_worker *wrk;
        struct rspamd_control_reply_elt *rep_elt, *res = NULL;
+       khash_t(rspamd_control_requests) * h;
        gpointer k, v;
        gssize r;
+       khiter_t kh;
+       int ret;
 
        g_hash_table_iter_init(&it, rspamd_main->workers);
 
@@ -538,90 +555,72 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
                        continue;
                }
 
+               h = (khash_t(rspamd_control_requests) *) wrk->control_events_pending;
+
+               /* Generate unique ID for this command */
+               cmd->id = ottery_rand_uint64();
+
                rep_elt = g_malloc0(sizeof(*rep_elt));
                rep_elt->worker = wrk;
                rep_elt->wrk_pid = wrk->pid;
                rep_elt->wrk_type = wrk->type;
-               rep_elt->event_loop = rspamd_main->event_loop;
                rep_elt->ud = ud;
                rep_elt->handler = handler;
-               memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
-               rep_elt->sent = false;
                rep_elt->attached_fd = -1;
+               memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
 
-               if (g_hash_table_size(wrk->control_events_pending) == 0) {
-                       /* We can send command */
-                       struct msghdr msg;
-                       struct iovec iov;
-                       unsigned char fdspace[CMSG_SPACE(sizeof(int))];
-
-                       rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov, fdspace, sizeof(fdspace));
-                       r = sendmsg(wrk->control_pipe[0], &msg, 0);
-
-                       if (r == sizeof(*cmd)) {
-                               rspamd_ev_watcher_init(&rep_elt->ev,
-                                                                          wrk->control_pipe[0],
-                                                                          EV_READ, handler,
-                                                                          rep_elt);
-                               rspamd_ev_watcher_start(rspamd_main->event_loop,
-                                                                               &rep_elt->ev, worker_io_timeout);
-                               rep_elt->sent = true;
-                               rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
-                               g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-
-                               DL_APPEND(res, rep_elt);
-                               msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
-                                                                 (int) cmd->type,
-                                                                 wrk->pid,
-                                                                 g_quark_to_string(wrk->type),
-                                                                 wrk->control_pipe[0]);
-                       }
-                       else {
-                               msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
-                                                        (int) cmd->type,
-                                                        wrk->pid,
-                                                        g_quark_to_string(wrk->type),
-                                                        wrk->control_pipe[0],
-                                                        strerror(errno));
+               /* Send command immediately */
+               struct msghdr msg;
+               struct iovec iov;
+               unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+
+               rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov, fdspace, sizeof(fdspace));
+               r = sendmsg(wrk->control_pipe[0], &msg, 0);
+
+               if (r == sizeof(*cmd)) {
+                       /* Add to hash for reply matching */
+                       kh = kh_put(rspamd_control_requests, h, cmd->id, &ret);
+                       if (ret < 0) {
+                               msg_err_main("cannot add control request to hash table");
                                g_free(rep_elt);
+                               continue;
                        }
-               }
-               else {
-                       /* We need to wait till the last command is processed, or it will mess up all serialization */
-                       msg_debug_control("pending event for %P(%s), %d events pending",
-                                                         wrk->pid,
-                                                         g_quark_to_string(wrk->type),
-                                                         (int) g_hash_table_size(wrk->control_events_pending));
-                       rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
-                       /*
-                        * Here are dragons:
-                        * If we have a descriptor to send, the callee expects that we follow
-                        * sendmsg semantics that performs `dup` on it. So we need to clone fd and keep it there.
-                        */
-                       if (attached_fd != -1) {
-                               rep_elt->attached_fd = dup(attached_fd);
-
-                               if (rep_elt->attached_fd == -1) {
-                                       /*
-                                        * We have a problem: file descriptors limit is reached, so we cannot really deal with this
-                                        * request
-                                        */
-                                       msg_err_main("cannot duplicate file descriptor to send command to worker %P(%s): %s; failed to send command",
-                                                                wrk->pid,
-                                                                g_quark_to_string(wrk->type),
-                                                                strerror(errno));
-                                       g_hash_table_unref(rep_elt->pending_elts);
-                                       g_free(rep_elt);
-                               }
-                               else {
-                                       g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-                                       DL_APPEND(res, rep_elt);
+                       if (ret == 0) {
+                               /* Key already exists - ID collision (extremely unlikely with 64-bit random) */
+                               msg_warn_main("control command ID collision for %" G_GUINT64_FORMAT
+                                                         ", previous request will be orphaned",
+                                                         cmd->id);
+                               /* Free the old entry to prevent memory leak */
+                               struct rspamd_control_reply_elt *old_elt = kh_val(h, kh);
+                               if (old_elt) {
+                                       g_free(old_elt);
                                }
                        }
-                       else {
-                               g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-                               DL_APPEND(res, rep_elt);
+                       kh_val(h, kh) = rep_elt;
+
+                       /* Start control reply watcher if not already active */
+                       if (!ev_is_active(&wrk->control_ev)) {
+                               wrk->control_ev.data = wrk;
+                               ev_io_init(&wrk->control_ev, rspamd_control_reply_handler,
+                                                  wrk->control_pipe[0], EV_READ);
+                               ev_io_start(rspamd_main->event_loop, &wrk->control_ev);
                        }
+
+                       DL_APPEND(res, rep_elt);
+                       msg_debug_control("sent command %d id %" G_GUINT64_FORMAT " to worker %P(%s), fd: %d",
+                                                         (int) cmd->type, cmd->id,
+                                                         wrk->pid,
+                                                         g_quark_to_string(wrk->type),
+                                                         wrk->control_pipe[0]);
+               }
+               else {
+                       msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+                                                (int) cmd->type,
+                                                wrk->pid,
+                                                g_quark_to_string(wrk->type),
+                                                wrk->control_pipe[0],
+                                                strerror(errno));
+                       g_free(rep_elt);
                }
        }
 
@@ -735,6 +734,7 @@ rspamd_control_default_cmd_handler(int fd,
 
        memset(&rep, 0, sizeof(rep));
        rep.type = cmd->type;
+       rep.id = cmd->id;
        rspamd_main = cd->worker->srv;
 
        switch (cmd->type) {
@@ -928,13 +928,9 @@ rspamd_control_ignore_io_handler(int fd, short what, void *ud)
        struct rspamd_control_reply_elt *elt =
                (struct rspamd_control_reply_elt *) ud;
 
-       struct rspamd_control_reply rep;
-
-       /* At this point we just ignore replies from the workers */
-       if (read(fd, &rep, sizeof(rep)) == -1) {
-               msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
-       }
-       rspamd_control_stop_pending(elt);
+       /* Reply already read by rspamd_control_reply_handler and stored in elt->reply */
+       /* Just free the element - nothing to do with the reply */
+       g_free(elt);
 }
 
 static void
@@ -942,11 +938,10 @@ rspamd_control_log_pipe_io_handler(int fd, short what, void *ud)
 {
        struct rspamd_control_reply_elt *elt =
                (struct rspamd_control_reply_elt *) ud;
-       struct rspamd_control_reply rep;
 
-       /* At this point we just ignore replies from the workers */
-       (void) !read(fd, &rep, sizeof(rep));
-       rspamd_control_stop_pending(elt);
+       /* Reply already read by rspamd_control_reply_handler and stored in elt->reply */
+       /* Just free the element - nothing to do with the reply */
+       g_free(elt);
 }
 
 static void
@@ -980,7 +975,7 @@ rspamd_control_handle_on_fork(struct rspamd_srv_command *cmd,
                REF_RELEASE(child->cf);
                g_hash_table_remove(srv->workers,
                                                        GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid));
-               g_hash_table_unref(child->control_events_pending);
+               rspamd_control_pending_destroy(child->control_events_pending);
                g_free(child);
        }
        else {
@@ -995,8 +990,7 @@ rspamd_control_handle_on_fork(struct rspamd_srv_command *cmd,
                child->cf = parent->cf;
                child->ppid = parent->pid;
                REF_RETAIN(child->cf);
-               child->control_events_pending = g_hash_table_new_full(g_direct_hash, g_direct_equal,
-                                                                                                                         NULL, rspamd_pending_control_free);
+               child->control_events_pending = rspamd_control_pending_new();
                g_hash_table_insert(srv->workers,
                                                        GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid), child);
        }
index e85bb57905aa2a32de0fb1b4ef4288bd0fdf0348..3fc89b988fd024b9b45109176afb1d504ab56c4a 100644 (file)
@@ -66,6 +66,7 @@ enum rspamd_log_pipe_type {
 
 struct rspamd_control_command {
        enum rspamd_control_type type;
+       uint64_t id;
        union {
                struct {
                        unsigned int unused;
@@ -131,6 +132,7 @@ struct rspamd_control_command {
 
 struct rspamd_control_reply {
        enum rspamd_control_type type;
+       uint64_t id;
        union {
                struct {
                        unsigned int conns;
@@ -369,10 +371,22 @@ const char *rspamd_control_command_to_string(enum rspamd_control_type cmd);
 const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd);
 
 /**
- * Used to cleanup pending events
- * @param p
+ * Create a new control events pending hash table
+ * @return opaque pointer to khash
+ */
+void *rspamd_control_pending_new(void);
+
+/**
+ * Destroy control events pending hash table
+ * @param p opaque pointer to khash
+ */
+void rspamd_control_pending_destroy(void *p);
+
+/**
+ * Remove all pending control events
+ * @param p opaque pointer to khash
  */
-void rspamd_pending_control_free(gpointer p);
+void rspamd_control_pending_remove_all(void *p);
 
 void rspamd_worker_set_busy(struct rspamd_worker *worker,
                                                        struct ev_loop *event_loop,
index ef4734a35cc97cf88d171acb4483f22668725d1c..80bd6500a8071e1361127d0c3f22b75db34d304f 100644 (file)
@@ -608,6 +608,9 @@ void rspamd_worker_stop_accept(struct rspamd_worker *worker)
 
        g_hash_table_unref (worker->signal_events);
 #endif
+
+       /* Clean up srv_pipe context to prevent memory leaks */
+       rspamd_srv_pipe_cleanup();
 }
 
 static rspamd_fstring_t *
@@ -1450,8 +1453,7 @@ rspamd_fork_worker(struct rspamd_main *rspamd_main,
        wrk->pid = fork();
        wrk->cores_throttled = rspamd_main->cores_throttling;
        wrk->term_handler = term_handler;
-       wrk->control_events_pending = g_hash_table_new_full(g_direct_hash, g_direct_equal,
-                                                                                                               NULL, rspamd_pending_control_free);
+       wrk->control_events_pending = rspamd_control_pending_new();
 
        switch (wrk->pid) {
        case 0:
@@ -2057,6 +2059,7 @@ rspamd_worker_hyperscan_ready(struct rspamd_main *rspamd_main,
 
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
+       rep.id = cmd->id;
 
        msg_debug_hyperscan("received hyperscan loaded notification, forced=%d",
                                                cmd->cmd.hs_loaded.forced);
@@ -2112,6 +2115,7 @@ rspamd_worker_multipattern_ready(struct rspamd_main *rspamd_main,
 
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_MULTIPATTERN_LOADED;
+       rep.id = cmd->id;
 
        msg_debug_hyperscan("received multipattern loaded notification for '%s'", name);
 
@@ -2195,6 +2199,7 @@ rspamd_worker_regexp_map_ready(struct rspamd_main *rspamd_main,
 
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_REGEXP_MAP_LOADED;
+       rep.id = cmd->id;
 
        msg_debug_hyperscan("received regexp map loaded notification for '%s'", name);
 
@@ -2262,6 +2267,7 @@ rspamd_worker_log_pipe_handler(struct rspamd_main *rspamd_main,
 
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_LOG_PIPE;
+       rep.id = cmd->id;
 
        if (attached_fd != -1) {
                lp = g_malloc0(sizeof(*lp));
@@ -2298,6 +2304,7 @@ rspamd_worker_monitored_handler(struct rspamd_main *rspamd_main,
 
        memset(&rep, 0, sizeof(rep));
        rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+       rep.id = cmd->id;
 
        if (cmd->cmd.monitored_change.sender != getpid()) {
                m = rspamd_monitored_by_tag(mctx, cmd->cmd.monitored_change.tag);
index a4eb2e0f2c438a655b28e1ec4a7d7616e9ce01c5..6e131b9b57d6608e3c6588743adb5584a8da97dc 100644 (file)
@@ -784,7 +784,8 @@ kill_old_workers(gpointer key, gpointer value, gpointer unused)
                w->state = rspamd_worker_state_terminating;
                kill(w->pid, SIGUSR2);
                ev_io_stop(rspamd_main->event_loop, &w->srv_ev);
-               g_hash_table_remove_all(w->control_events_pending);
+               ev_io_stop(rspamd_main->event_loop, &w->control_ev);
+               rspamd_control_pending_remove_all(w->control_events_pending);
                msg_info_main("send signal to worker %P", w->pid);
        }
        else if (w->state != rspamd_worker_state_running) {
@@ -1199,7 +1200,8 @@ rspamd_cld_handler(EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
 
        /* Remove dead child form children list */
        g_hash_table_remove(rspamd_main->workers, GSIZE_TO_POINTER(wrk->pid));
-       g_hash_table_remove_all(wrk->control_events_pending);
+       ev_io_stop(rspamd_main->event_loop, &wrk->control_ev);
+       rspamd_control_pending_remove_all(wrk->control_events_pending);
 
        if (wrk->srv_pipe[0] != -1) {
                /* Ugly workaround */
@@ -1241,7 +1243,7 @@ rspamd_cld_handler(EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
        }
 
        REF_RELEASE(wrk->cf);
-       g_hash_table_unref(wrk->control_events_pending);
+       rspamd_control_pending_destroy(wrk->control_events_pending);
        g_free(wrk);
 }
 
index 1d444698e08d7051ba13841201cb340f21503cbc..4c266cb50bc92c692d97ead9a1ffc1dfd089fa59 100644 (file)
@@ -122,12 +122,13 @@ struct rspamd_worker {
        int srv_pipe[2];                                  /**< used by workers to request something from the
                                             main process. [0] - main, [1] - worker                     */
        ev_io srv_ev;                                     /**< used by main for read workers' requests          */
+       ev_io control_ev;                                 /**< used by main for read control replies                    */
        struct rspamd_worker_heartbeat hb;                /**< heartbeat data */
        gpointer control_data;                            /**< used by control protocol to handle commands      */
        gpointer tmp_data;                                /**< used to avoid race condition to deal with control messages */
        ev_child cld_ev;                                  /**< to allow reaping                                                         */
        rspamd_worker_term_cb term_handler;               /**< custom term handler                                              */
-       GHashTable *control_events_pending;               /**< control events pending indexed by ptr            */
+       void *control_events_pending;                     /**< khash of pending control requests by id  */
 };
 
 struct rspamd_abstract_worker_ctx {