]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
flush_ports: flush POSIX message queues properly
authorTodd C. Miller <Todd.Miller@sudo.ws>
Tue, 6 May 2025 22:39:14 +0000 (16:39 -0600)
committerYu Watanabe <watanabe.yu+github@gmail.com>
Wed, 7 May 2025 17:37:45 +0000 (02:37 +0900)
On Linux, read() on a message queue descriptor returns the message
queue statistics, not the actual message queue data.  We need to use
mq_receive() to drain the queues instead.

Fixes a problem where a POSIX message queue socket unit with messages
in the queue at shutdown time could result in a hang on reboot/shutdown.

src/basic/socket-util.c
src/basic/socket-util.h
src/core/socket.c

index 61a77370eb430c04d14eea9cb3e589723cf9110b..5e6195e108e5fec21cfe74f4a418c8e0be1dcfac 100644 (file)
@@ -5,6 +5,7 @@
 #include <limits.h>
 #include <linux/if.h>
 #include <linux/if_arp.h>
+#include <mqueue.h>
 #include <net/if.h>
 #include <netdb.h>
 #include <netinet/ip.h>
@@ -1319,6 +1320,54 @@ int flush_accept(int fd) {
         }
 }
 
+ssize_t flush_mqueue(int fd) {
+        _cleanup_free_ char *buf = NULL;
+        struct mq_attr attr;
+        ssize_t count = 0;
+        int r;
+
+        assert(fd >= 0);
+
+        /* Similar to flush_fd() but flushes all messages from a POSIX message queue. */
+
+        for (;;) {
+                ssize_t l;
+
+                r = fd_wait_for_event(fd, POLLIN, /* timeout= */ 0);
+                if (r < 0) {
+                        if (r == -EINTR)
+                                continue;
+
+                        return r;
+                }
+                if (r == 0)
+                        return count;
+
+                if (!buf) {
+                        /* Buffer must be at least as large as mq_msgsize. */
+                        if (mq_getattr(fd, &attr) < 0)
+                                return -errno;
+
+                        buf = malloc(attr.mq_msgsize);
+                        if (!buf)
+                                return -ENOMEM;
+                }
+
+                l = mq_receive(fd, buf, attr.mq_msgsize, /* msg_prio = */ NULL);
+                if (l < 0) {
+                        if (errno == EINTR)
+                                continue;
+
+                        if (errno == EAGAIN)
+                                return count;
+
+                        return -errno;
+                }
+
+                count += l;
+        }
+}
+
 struct cmsghdr* cmsg_find(struct msghdr *mh, int level, int type, socklen_t length) {
         struct cmsghdr *cmsg;
 
index a69277c63966c895f6a11f1c4080590572445a3b..e9671c38333b87d7ea388566811253d681f605e2 100644 (file)
@@ -199,6 +199,7 @@ int receive_many_fds(int transport_fd, int **ret_fds_array, size_t *ret_n_fds_ar
 ssize_t next_datagram_size_fd(int fd);
 
 int flush_accept(int fd);
+ssize_t flush_mqueue(int fd);
 
 #define CMSG_FOREACH(cmsg, mh)                                          \
         for ((cmsg) = CMSG_FIRSTHDR(mh); (cmsg); (cmsg) = CMSG_NXTHDR((mh), (cmsg)))
index 1583c33cf48af3a2e68dd4182b99d0afdcbc0690..0de430b97ed3bc47f6eb77c5ed6538aca015e287 100644 (file)
@@ -2188,8 +2188,12 @@ static void flush_ports(Socket *s) {
                 if (p->fd < 0)
                         continue;
 
-                (void) flush_accept(p->fd);
-                (void) flush_fd(p->fd);
+                if (p->type == SOCKET_MQUEUE)
+                        (void) flush_mqueue(p->fd);
+                else {
+                        (void) flush_accept(p->fd);
+                        (void) flush_fd(p->fd);
+                }
         }
 }