struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
- struct rspamd_io_ev ev;
- struct ev_loop *event_loop;
struct rspamd_worker *worker;
GQuark wrk_type;
pid_t wrk_pid;
rspamd_ev_cb handler;
gpointer ud;
int attached_fd;
- bool sent;
struct rspamd_control_command cmd;
- GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
+/* Hash table keyed by command id for pending control requests */
+KHASH_INIT(rspamd_control_requests, uint64_t, struct rspamd_control_reply_elt *, 1,
+ kh_int64_hash_func, kh_int64_hash_equal);
+
struct rspamd_control_session {
int fd;
struct ev_loop *event_loop;
};
static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
-static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
INIT_LOG_MODULE(control)
DL_FOREACH_SAFE(session->replies, elt, telt)
{
- rspamd_control_stop_pending(elt);
+ /* Remove from worker's pending hash if still there */
+ khash_t(rspamd_control_requests) *h =
+ (khash_t(rspamd_control_requests) *) elt->worker->control_events_pending;
+ khiter_t k = kh_get(rspamd_control_requests, h, elt->cmd.id);
+ if (k != kh_end(h)) {
+ kh_del(rspamd_control_requests, h, k);
+ }
+ g_free(elt);
}
rspamd_inet_address_free(session->addr);
{
struct rspamd_control_reply_elt *elt = ud;
struct rspamd_control_session *session;
- unsigned char fdspace[CMSG_SPACE(sizeof(int))];
- struct iovec iov;
- struct msghdr msg;
- gssize r;
session = elt->ud;
- elt->attached_fd = -1;
-
- if (what == EV_READ) {
- iov.iov_base = &elt->reply;
- iov.iov_len = sizeof(elt->reply);
- memset(&msg, 0, sizeof(msg));
- msg.msg_control = fdspace;
- msg.msg_controllen = sizeof(fdspace);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
- r = recvmsg(fd, &msg, 0);
- if (r == -1) {
- msg_err("cannot read reply from the worker %P (%s): %s",
- elt->wrk_pid, g_quark_to_string(elt->wrk_type),
- strerror(errno));
- }
- else if (r >= (gssize) sizeof(elt->reply)) {
- if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
- elt->attached_fd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
- }
- }
- }
- else {
+ /*
+ * Note: In the new design, the reply is already read by
+ * rspamd_control_reply_handler() and stored in elt->reply.
+ * We just need to process it here.
+ */
+ if (what != EV_READ) {
/* Timeout waiting */
msg_warn("timeout waiting reply from %P (%s)",
elt->wrk_pid, g_quark_to_string(elt->wrk_type));
}
session->replies_remain--;
- rspamd_ev_watcher_stop(session->event_loop,
- &elt->ev);
if (session->replies_remain == 0) {
rspamd_control_write_reply(session);
}
}
-void rspamd_pending_control_free(gpointer p)
+static void
+rspamd_pending_control_free(struct rspamd_control_reply_elt *rep_elt)
{
- struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p;
-
- if (rep_elt->sent) {
- rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
- }
- else if (rep_elt->attached_fd != -1) {
- /* Only for non-sent requests! */
+ if (rep_elt->attached_fd != -1) {
close(rep_elt->attached_fd);
}
- g_hash_table_unref(rep_elt->pending_elts);
g_free(rep_elt);
}
+void *
+rspamd_control_pending_new(void)
+{
+ return kh_init(rspamd_control_requests);
+}
+
+void rspamd_control_pending_destroy(void *p)
+{
+ khash_t(rspamd_control_requests) *h = (khash_t(rspamd_control_requests) *) p;
+ khiter_t k;
+
+ if (h == NULL) {
+ return;
+ }
+
+ for (k = kh_begin(h); k != kh_end(h); ++k) {
+ if (kh_exist(h, k)) {
+ rspamd_pending_control_free(kh_val(h, k));
+ }
+ }
+
+ kh_destroy(rspamd_control_requests, h);
+}
+
+void rspamd_control_pending_remove_all(void *p)
+{
+ khash_t(rspamd_control_requests) *h = (khash_t(rspamd_control_requests) *) p;
+ khiter_t k;
+
+ if (h == NULL) {
+ return;
+ }
+
+ for (k = kh_begin(h); k != kh_end(h); ++k) {
+ if (kh_exist(h, k)) {
+ rspamd_pending_control_free(kh_val(h, k));
+ }
+ }
+
+ kh_clear(rspamd_control_requests, h);
+}
+
static inline void
rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
int attached_fd, struct msghdr *msg,
msg->msg_iovlen = 1;
}
+/* Handler for control replies on main side - dispatches by ID */
static void
-rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
+rspamd_control_reply_handler(EV_P_ ev_io *w, int revents)
{
- GHashTable *htb;
- struct rspamd_main *rspamd_main;
- gsize pending;
-
- /* It stops event and frees hash */
- htb = elt->pending_elts;
-
- pending = g_hash_table_size(htb);
- msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
- g_quark_to_string(elt->wrk_type),
- (int) pending);
-
- if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) {
- /* Invoke another event from the queue */
- GHashTableIter it;
- gpointer k, v;
-
- g_hash_table_iter_init(&it, elt->pending_elts);
-
- while (g_hash_table_iter_next(&it, &k, &v)) {
- struct rspamd_control_reply_elt *cur = v;
-
- if (!cur->sent) {
- struct msghdr msg;
- struct iovec iov;
-
- rspamd_main = cur->worker->srv;
- /* Provide a control buffer with correct lifetime for sendmsg */
- unsigned char fdspace[CMSG_SPACE(sizeof(int))];
- rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg, &iov, fdspace, sizeof(fdspace));
- ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
-
- if (r == sizeof(cur->cmd)) {
- msg_debug_control("restarting pending event for %P(%s), %d events pending",
- cur->wrk_pid,
- g_quark_to_string(cur->wrk_type),
- (int) pending - 1);
- rspamd_ev_watcher_init(&cur->ev,
- cur->worker->control_pipe[0],
- EV_READ, cur->handler,
- cur);
- rspamd_ev_watcher_start(cur->event_loop,
- &cur->ev, worker_io_timeout);
- cur->sent = true;
- if (cur->attached_fd != -1) {
- /* Since `sendmsg` performs `dup` for us, we need to remove our own descriptor */
- close(cur->attached_fd);
- cur->attached_fd = -1;
- }
+ struct rspamd_worker *wrk = (struct rspamd_worker *) w->data;
+ khash_t(rspamd_control_requests) *h =
+ (khash_t(rspamd_control_requests) *) wrk->control_events_pending;
+ struct rspamd_control_reply rep;
+ struct rspamd_control_reply_elt *elt;
+ struct rspamd_main *rspamd_main = wrk->srv;
+ gssize r;
+ khiter_t k;
- break; /* Exit the outer loop as we have invoked something */
- }
- else {
- msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
- (int) cur->cmd.type,
- cur->wrk_pid,
- g_quark_to_string(cur->wrk_type),
- cur->worker->control_pipe[0],
- strerror(errno));
- g_hash_table_remove(elt->pending_elts, cur);
- }
+ for (;;) {
+ r = read(w->fd, &rep, sizeof(rep));
+
+ if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* No more data */
+ break;
}
+ msg_err_main("cannot read control reply from worker %P: %s",
+ wrk->pid, strerror(errno));
+ break;
+ }
+
+ if (r == 0) {
+ /* Connection closed */
+ msg_debug_control("control pipe closed for worker %P", wrk->pid);
+ ev_io_stop(EV_A_ w);
+ break;
+ }
+
+ if (r != (gssize) sizeof(rep)) {
+ msg_err_main("incomplete control reply from worker %P: %d != %d",
+ wrk->pid, (int) r, (int) sizeof(rep));
+ continue;
+ }
+
+ /* Look up request by ID */
+ k = kh_get(rspamd_control_requests, h, rep.id);
+ if (k == kh_end(h)) {
+ msg_warn_main("received control reply for unknown request id %" G_GUINT64_FORMAT
+ " from worker %P",
+ rep.id, wrk->pid);
+ continue;
+ }
+
+ elt = kh_val(h, k);
+ kh_del(rspamd_control_requests, h, k);
+
+ msg_debug_control("received reply for command %d id %" G_GUINT64_FORMAT " from worker %P(%s)",
+ (int) rep.type, rep.id, wrk->pid, g_quark_to_string(wrk->type));
+
+ /* Copy reply to element and call handler */
+ memcpy(&elt->reply, &rep, sizeof(rep));
+ if (elt->handler) {
+ elt->handler(w->fd, EV_READ, elt);
}
}
- /* Remove from hash and performs the cleanup */
- g_hash_table_remove(elt->pending_elts, elt);
+ /* Stop watcher if no more pending requests */
+ if (kh_size(h) == 0) {
+ ev_io_stop(EV_A_ w);
+ }
}
static struct rspamd_control_reply_elt *
GHashTableIter it;
struct rspamd_worker *wrk;
struct rspamd_control_reply_elt *rep_elt, *res = NULL;
+ khash_t(rspamd_control_requests) * h;
gpointer k, v;
gssize r;
+ khiter_t kh;
+ int ret;
g_hash_table_iter_init(&it, rspamd_main->workers);
continue;
}
+ h = (khash_t(rspamd_control_requests) *) wrk->control_events_pending;
+
+ /* Generate unique ID for this command */
+ cmd->id = ottery_rand_uint64();
+
rep_elt = g_malloc0(sizeof(*rep_elt));
rep_elt->worker = wrk;
rep_elt->wrk_pid = wrk->pid;
rep_elt->wrk_type = wrk->type;
- rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
rep_elt->handler = handler;
- memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
- rep_elt->sent = false;
rep_elt->attached_fd = -1;
+ memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
- if (g_hash_table_size(wrk->control_events_pending) == 0) {
- /* We can send command */
- struct msghdr msg;
- struct iovec iov;
- unsigned char fdspace[CMSG_SPACE(sizeof(int))];
-
- rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov, fdspace, sizeof(fdspace));
- r = sendmsg(wrk->control_pipe[0], &msg, 0);
-
- if (r == sizeof(*cmd)) {
- rspamd_ev_watcher_init(&rep_elt->ev,
- wrk->control_pipe[0],
- EV_READ, handler,
- rep_elt);
- rspamd_ev_watcher_start(rspamd_main->event_loop,
- &rep_elt->ev, worker_io_timeout);
- rep_elt->sent = true;
- rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
- g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-
- DL_APPEND(res, rep_elt);
- msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
- (int) cmd->type,
- wrk->pid,
- g_quark_to_string(wrk->type),
- wrk->control_pipe[0]);
- }
- else {
- msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
- (int) cmd->type,
- wrk->pid,
- g_quark_to_string(wrk->type),
- wrk->control_pipe[0],
- strerror(errno));
+ /* Send command immediately */
+ struct msghdr msg;
+ struct iovec iov;
+ unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+
+ rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov, fdspace, sizeof(fdspace));
+ r = sendmsg(wrk->control_pipe[0], &msg, 0);
+
+ if (r == sizeof(*cmd)) {
+ /* Add to hash for reply matching */
+ kh = kh_put(rspamd_control_requests, h, cmd->id, &ret);
+ if (ret < 0) {
+ msg_err_main("cannot add control request to hash table");
g_free(rep_elt);
+ continue;
}
- }
- else {
- /* We need to wait till the last command is processed, or it will mess up all serialization */
- msg_debug_control("pending event for %P(%s), %d events pending",
- wrk->pid,
- g_quark_to_string(wrk->type),
- (int) g_hash_table_size(wrk->control_events_pending));
- rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
- /*
- * Here are dragons:
- * If we have a descriptor to send, the callee expects that we follow
- * sendmsg semantics that performs `dup` on it. So we need to clone fd and keep it there.
- */
- if (attached_fd != -1) {
- rep_elt->attached_fd = dup(attached_fd);
-
- if (rep_elt->attached_fd == -1) {
- /*
- * We have a problem: file descriptors limit is reached, so we cannot really deal with this
- * request
- */
- msg_err_main("cannot duplicate file descriptor to send command to worker %P(%s): %s; failed to send command",
- wrk->pid,
- g_quark_to_string(wrk->type),
- strerror(errno));
- g_hash_table_unref(rep_elt->pending_elts);
- g_free(rep_elt);
- }
- else {
- g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
- DL_APPEND(res, rep_elt);
+ if (ret == 0) {
+ /* Key already exists - ID collision (extremely unlikely with 64-bit random) */
+ msg_warn_main("control command ID collision for %" G_GUINT64_FORMAT
+ ", previous request will be orphaned",
+ cmd->id);
+ /* Free the old entry to prevent memory leak */
+ struct rspamd_control_reply_elt *old_elt = kh_val(h, kh);
+ if (old_elt) {
+ g_free(old_elt);
}
}
- else {
- g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
- DL_APPEND(res, rep_elt);
+ kh_val(h, kh) = rep_elt;
+
+ /* Start control reply watcher if not already active */
+ if (!ev_is_active(&wrk->control_ev)) {
+ wrk->control_ev.data = wrk;
+ ev_io_init(&wrk->control_ev, rspamd_control_reply_handler,
+ wrk->control_pipe[0], EV_READ);
+ ev_io_start(rspamd_main->event_loop, &wrk->control_ev);
}
+
+ DL_APPEND(res, rep_elt);
+ msg_debug_control("sent command %d id %" G_GUINT64_FORMAT " to worker %P(%s), fd: %d",
+ (int) cmd->type, cmd->id,
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ wrk->control_pipe[0]);
+ }
+ else {
+ msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+ (int) cmd->type,
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ wrk->control_pipe[0],
+ strerror(errno));
+ g_free(rep_elt);
}
}
memset(&rep, 0, sizeof(rep));
rep.type = cmd->type;
+ rep.id = cmd->id;
rspamd_main = cd->worker->srv;
switch (cmd->type) {
struct rspamd_control_reply_elt *elt =
(struct rspamd_control_reply_elt *) ud;
- struct rspamd_control_reply rep;
-
- /* At this point we just ignore replies from the workers */
- if (read(fd, &rep, sizeof(rep)) == -1) {
- msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
- }
- rspamd_control_stop_pending(elt);
+ /* Reply already read by rspamd_control_reply_handler and stored in elt->reply */
+ /* Just free the element - nothing to do with the reply */
+ g_free(elt);
}
static void
{
struct rspamd_control_reply_elt *elt =
(struct rspamd_control_reply_elt *) ud;
- struct rspamd_control_reply rep;
- /* At this point we just ignore replies from the workers */
- (void) !read(fd, &rep, sizeof(rep));
- rspamd_control_stop_pending(elt);
+ /* Reply already read by rspamd_control_reply_handler and stored in elt->reply */
+ /* Just free the element - nothing to do with the reply */
+ g_free(elt);
}
static void
REF_RELEASE(child->cf);
g_hash_table_remove(srv->workers,
GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid));
- g_hash_table_unref(child->control_events_pending);
+ rspamd_control_pending_destroy(child->control_events_pending);
g_free(child);
}
else {
child->cf = parent->cf;
child->ppid = parent->pid;
REF_RETAIN(child->cf);
- child->control_events_pending = g_hash_table_new_full(g_direct_hash, g_direct_equal,
- NULL, rspamd_pending_control_free);
+ child->control_events_pending = rspamd_control_pending_new();
g_hash_table_insert(srv->workers,
GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid), child);
}