]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
shared: extend socket-forward to support fd-pairs too
authorMichael Vogt <michael@amutable.com>
Tue, 24 Mar 2026 08:48:26 +0000 (09:48 +0100)
committerMichael Vogt <michael@amutable.com>
Tue, 31 Mar 2026 16:25:09 +0000 (18:25 +0200)
Now that the socket forward code is extracted we can
extend it to not just support bidirectional sockets
but also input/output fd-pairs. This will be needed
for e.g. the varlinkctl protocol upgrade support where
one side of the connection is a fd-pair (stdin/stdout).

This is done by creating two half-duplex forwarders
that operate independantly. This also allows to simplify
some state tracking, e.g. because each fd serves only one
direction we don't need to dynamically create the event mask
with EPOLLIN etc, its enough to set it once. It also handles
non-pollable FDs transparently.

Thanks to Lennart for his excellent suggestions here.

src/shared/socket-forward.c
src/shared/socket-forward.h

index 2601b25e6daab0b327dbe2725cf6884308fb9598..5ad8c5b2b60f15cbff6e70bd904d06583dbf1c4b 100644 (file)
@@ -1,6 +1,7 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
 #include <fcntl.h>
+#include <sys/socket.h>
 #include <unistd.h>
 
 #include "sd-event.h"
 
 #define SOCKET_FORWARD_BUFFER_SIZE (256 * 1024)
 
-struct SocketForward {
+/* Half-duplex forwarder: splices data from server_fd to client_fd via a kernel pipe buffer.
+ * Each direction of a full-duplex SocketForward is handled by one of these. */
+typedef struct SimplexForward {
         sd_event *event;
 
         int server_fd, client_fd;
 
         int server_to_client_buffer[2]; /* a pipe */
-        int client_to_server_buffer[2]; /* a pipe */
 
-        size_t server_to_client_buffer_full, client_to_server_buffer_full;
-        size_t server_to_client_buffer_size, client_to_server_buffer_size;
+        size_t server_to_client_buffer_full, server_to_client_buffer_size;
 
         sd_event_source *server_event_source, *client_event_source;
 
-        socket_forward_done_t on_done;
+        int (*on_done)(struct SimplexForward *fwd, int error, void *userdata);
         void *userdata;
-};
+} SimplexForward;
 
-SocketForward* socket_forward_free(SocketForward *sf) {
-        if (!sf)
+static SimplexForward* simplex_forward_free(SimplexForward *fwd) {
+        if (!fwd)
                 return NULL;
 
-        sd_event_source_unref(sf->server_event_source);
-        sd_event_source_unref(sf->client_event_source);
+        sd_event_source_unref(fwd->server_event_source);
+        sd_event_source_unref(fwd->client_event_source);
 
-        safe_close(sf->server_fd);
-        safe_close(sf->client_fd);
+        safe_close(fwd->server_fd);
+        safe_close(fwd->client_fd);
 
-        safe_close_pair(sf->server_to_client_buffer);
-        safe_close_pair(sf->client_to_server_buffer);
+        safe_close_pair(fwd->server_to_client_buffer);
 
-        sd_event_unref(sf->event);
+        sd_event_unref(fwd->event);
 
-        return mfree(sf);
+        return mfree(fwd);
 }
 
+DEFINE_TRIVIAL_CLEANUP_FUNC(SimplexForward*, simplex_forward_free);
+
 static int socket_forward_create_pipes(int buffer[static 2], size_t *ret_size) {
         int r;
 
@@ -73,7 +75,7 @@ static int socket_forward_create_pipes(int buffer[static 2], size_t *ret_size) {
         return 0;
 }
 
-static int socket_forward_shovel(
+static int simplex_forward_shovel(
                 int *from, int buffer[2], int *to,
                 size_t *full, size_t *sz,
                 sd_event_source **from_source, sd_event_source **to_source) {
@@ -123,134 +125,282 @@ static int socket_forward_shovel(
         return 0;
 }
 
-static int socket_forward_enable_event_sources(SocketForward *sf);
+static int simplex_forward_enable_event_sources(SimplexForward *fwd);
 
-static int socket_forward_traffic_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
-        SocketForward *sf = ASSERT_PTR(userdata);
+static int simplex_forward_traffic(SimplexForward *fwd) {
         int r;
 
-        assert(s);
-        assert(fd >= 0);
-
-        r = socket_forward_shovel(
-                        &sf->server_fd, sf->server_to_client_buffer, &sf->client_fd,
-                        &sf->server_to_client_buffer_full, &sf->server_to_client_buffer_size,
-                        &sf->server_event_source, &sf->client_event_source);
-        if (r < 0)
-                goto quit;
-
-        r = socket_forward_shovel(
-                        &sf->client_fd, sf->client_to_server_buffer, &sf->server_fd,
-                        &sf->client_to_server_buffer_full, &sf->client_to_server_buffer_size,
-                        &sf->client_event_source, &sf->server_event_source);
+        r = simplex_forward_shovel(
+                        &fwd->server_fd, fwd->server_to_client_buffer, &fwd->client_fd,
+                        &fwd->server_to_client_buffer_full, &fwd->server_to_client_buffer_size,
+                        &fwd->server_event_source, &fwd->client_event_source);
         if (r < 0)
                 goto quit;
 
-        /* EOF on both sides? */
-        if (sf->server_fd < 0 && sf->client_fd < 0)
-                goto quit;
-
         /* Server closed, and all data written to client? */
-        if (sf->server_fd < 0 && sf->server_to_client_buffer_full <= 0)
+        if (fwd->server_fd < 0 && fwd->server_to_client_buffer_full <= 0)
                 goto quit;
 
-        /* Client closed, and all data written to server? */
-        if (sf->client_fd < 0 && sf->client_to_server_buffer_full <= 0)
+        /* Write side closed? */
+        if (fwd->client_fd < 0)
                 goto quit;
 
-        r = socket_forward_enable_event_sources(sf);
+        r = simplex_forward_enable_event_sources(fwd);
         if (r < 0)
                 goto quit;
 
         return 1;
 
 quit:
-        return sf->on_done(sf, r, sf->userdata);
+        fwd->server_event_source = sd_event_source_disable_unref(fwd->server_event_source);
+        fwd->client_event_source = sd_event_source_disable_unref(fwd->client_event_source);
+        return fwd->on_done(fwd, r, fwd->userdata);
 }
 
-static int socket_forward_enable_event_sources(SocketForward *sf) {
-        uint32_t a = 0, b = 0;
+static int simplex_forward_io_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
+        SimplexForward *fwd = ASSERT_PTR(userdata);
+
+        return simplex_forward_traffic(fwd);
+}
+
+static int simplex_forward_defer_cb(sd_event_source *s, void *userdata) {
+        SimplexForward *fwd = ASSERT_PTR(userdata);
+
+        return simplex_forward_traffic(fwd);
+}
+
+static int simplex_forward_enable_event_sources(SimplexForward *fwd) {
+        bool can_read, can_write;
         int r;
 
-        assert(sf);
+        assert(fwd);
 
-        if (sf->server_to_client_buffer_full > 0)
-                b |= EPOLLOUT;
-        if (sf->server_to_client_buffer_full < sf->server_to_client_buffer_size)
-                a |= EPOLLIN;
+        can_read = fwd->server_to_client_buffer_full < fwd->server_to_client_buffer_size;
+        can_write = fwd->server_to_client_buffer_full > 0;
 
-        if (sf->client_to_server_buffer_full > 0)
-                a |= EPOLLOUT;
-        if (sf->client_to_server_buffer_full < sf->client_to_server_buffer_size)
-                b |= EPOLLIN;
+        /* Event sources may have been unref'd by the shovel on EOF/disconnect */
+        if (fwd->server_event_source) {
+                r = sd_event_source_set_enabled(fwd->server_event_source, can_read ? SD_EVENT_ONESHOT : SD_EVENT_OFF);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to update server event source: %m");
+        }
 
-        if (sf->server_event_source)
-                r = sd_event_source_set_io_events(sf->server_event_source, a);
-        else if (sf->server_fd >= 0)
-                r = sd_event_add_io(sf->event, &sf->server_event_source, sf->server_fd, a, socket_forward_traffic_cb, sf);
-        else
-                r = 0;
-        if (r < 0)
-                return log_debug_errno(r, "Failed to set up server event source: %m");
-
-        if (sf->client_event_source)
-                r = sd_event_source_set_io_events(sf->client_event_source, b);
-        else if (sf->client_fd >= 0)
-                r = sd_event_add_io(sf->event, &sf->client_event_source, sf->client_fd, b, socket_forward_traffic_cb, sf);
-        else
-                r = 0;
-        if (r < 0)
-                return log_debug_errno(r, "Failed to set up client event source: %m");
+        if (fwd->client_event_source) {
+                r = sd_event_source_set_enabled(fwd->client_event_source, can_write ? SD_EVENT_ONESHOT : SD_EVENT_OFF);
+                if (r < 0)
+                        return log_debug_errno(r, "Failed to update client event source: %m");
+        }
 
         return 0;
 }
 
-int socket_forward_new(
+static int simplex_forward_create_event_source(
+                SimplexForward *fwd,
+                sd_event_source **ret,
+                int fd,
+                uint32_t events) {
+
+        int r;
+
+        r = sd_event_add_io(fwd->event, ret, fd, events, simplex_forward_io_cb, fwd);
+        if (r == -EPERM)
+                /* fd is not pollable (e.g. regular file). Fall back to a defer event source
+                 * which fires on each event loop iteration. This works because regular
+                 * file are always ready for I/O so we don't need to poll. */
+                r = sd_event_add_defer(fwd->event, ret, simplex_forward_defer_cb, fwd);
+
+        return r;
+}
+
+static int simplex_forward_new(
                 sd_event *event,
                 int server_fd,
                 int client_fd,
-                socket_forward_done_t on_done,
+                int (*on_done)(SimplexForward *fwd, int error, void *userdata),
                 void *userdata,
-                SocketForward **ret) {
+                SimplexForward **ret) {
 
-        _cleanup_(socket_forward_freep) SocketForward *sf = NULL;
+        _cleanup_(simplex_forward_freep) SimplexForward *fwd = NULL;
         int r;
 
         assert(event);
         assert(server_fd >= 0);
         assert(client_fd >= 0);
+        assert(server_fd != client_fd);
         assert(on_done);
         assert(ret);
 
-        sf = new(SocketForward, 1);
-        if (!sf) {
+        fwd = new(SimplexForward, 1);
+        if (!fwd) {
                 safe_close(server_fd);
                 safe_close(client_fd);
                 return log_oom_debug();
         }
 
-        *sf = (SocketForward) {
+        *fwd = (SimplexForward) {
                 .event = sd_event_ref(event),
                 .server_fd = server_fd,
                 .client_fd = client_fd,
                 .server_to_client_buffer = EBADF_PAIR,
-                .client_to_server_buffer = EBADF_PAIR,
                 .on_done = on_done,
                 .userdata = userdata,
         };
 
-        r = socket_forward_create_pipes(sf->server_to_client_buffer, &sf->server_to_client_buffer_size);
+        r = socket_forward_create_pipes(fwd->server_to_client_buffer, &fwd->server_to_client_buffer_size);
         if (r < 0)
                 return r;
 
-        r = socket_forward_create_pipes(sf->client_to_server_buffer, &sf->client_to_server_buffer_size);
+        r = simplex_forward_create_event_source(fwd, &fwd->server_event_source, fwd->server_fd, EPOLLIN);
         if (r < 0)
                 return r;
 
-        r = socket_forward_enable_event_sources(sf);
+        r = simplex_forward_create_event_source(fwd, &fwd->client_event_source, fwd->client_fd, EPOLLOUT);
+        if (r < 0)
+                return r;
+
+        r = simplex_forward_enable_event_sources(fwd);
+        if (r < 0)
+                return r;
+
+        *ret = TAKE_PTR(fwd);
+        return 0;
+}
+
+/* Full-duplex forwarder from two SimplexForward instances */
+struct SocketForward {
+        SimplexForward *server_to_client;
+        SimplexForward *client_to_server;
+
+        socket_forward_done_t on_done;
+        void *userdata;
+
+        int first_error;
+        unsigned directions_done;
+};
+
+SocketForward* socket_forward_free(SocketForward *sf) {
+        if (!sf)
+                return NULL;
+
+        simplex_forward_free(sf->server_to_client);
+        simplex_forward_free(sf->client_to_server);
+
+        return mfree(sf);
+}
+
+static int socket_forward_direction_done(SimplexForward *fwd, int error, void *userdata) {
+        SocketForward *sf = ASSERT_PTR(userdata);
+
+        /* Half-close the write side so the remote end sees EOF. For sockets,
+         * shutdown(SHUT_WR) sends FIN while keeping the fd open for the read side
+         * (which belongs to the other direction's dup'd fd). For pipes/FIFOs,
+         * shutdown() fails with ENOTSOCK - close the fd instead, which is the
+         * only way to signal EOF on a pipe. */
+        if (fwd->client_fd >= 0 && shutdown(fwd->client_fd, SHUT_WR) < 0) {
+                if (errno == ENOTSOCK)
+                        fwd->client_fd = safe_close(fwd->client_fd);
+                else
+                        log_debug_errno(errno, "Failed to shutdown write side of fd %d: %m, ignoring",
+                                        fwd->client_fd);
+        }
+
+        if (error < 0 && sf->first_error >= 0)
+                sf->first_error = error;
+
+        sf->directions_done++;
+
+        if (sf->directions_done >= 2)
+                return sf->on_done(sf, sf->first_error, sf->userdata);
+
+        return 0;
+}
+
+int socket_forward_new_pair(
+                sd_event *event,
+                int server_read_fd,
+                int server_write_fd,
+                int client_read_fd,
+                int client_write_fd,
+                socket_forward_done_t on_done,
+                void *userdata,
+                SocketForward **ret) {
+
+        _cleanup_close_ int server_read_fd_close = server_read_fd,
+                            server_write_fd_close = server_write_fd,
+                            client_read_fd_close = client_read_fd,
+                            client_write_fd_close = client_write_fd;
+        _cleanup_(socket_forward_freep) SocketForward *sf = NULL;
+        int r;
+
+        assert(event);
+        assert(server_read_fd >= 0);
+        assert(server_write_fd >= 0);
+        assert(client_read_fd >= 0);
+        assert(client_write_fd >= 0);
+        assert(server_read_fd != server_write_fd);
+        assert(client_read_fd != client_write_fd);
+        assert(server_read_fd != client_read_fd);
+        assert(server_read_fd != client_write_fd);
+        assert(server_write_fd != client_read_fd);
+        assert(server_write_fd != client_write_fd);
+        assert(on_done);
+        assert(ret);
+
+        sf = new(SocketForward, 1);
+        if (!sf)
+                return log_oom_debug();
+
+        *sf = (SocketForward) {
+                .on_done = on_done,
+                .userdata = userdata,
+        };
+
+        r = simplex_forward_new(event,
+                                TAKE_FD(server_read_fd_close), TAKE_FD(client_write_fd_close),
+                                socket_forward_direction_done, sf,
+                                &sf->server_to_client);
+        if (r < 0)
+                return r;
+
+        r = simplex_forward_new(event,
+                                TAKE_FD(client_read_fd_close), TAKE_FD(server_write_fd_close),
+                                socket_forward_direction_done, sf,
+                                &sf->client_to_server);
         if (r < 0)
                 return r;
 
         *ret = TAKE_PTR(sf);
         return 0;
 }
+
+int socket_forward_new(
+                sd_event *event,
+                int server_fd,
+                int client_fd,
+                socket_forward_done_t on_done,
+                void *userdata,
+                SocketForward **ret) {
+
+        _cleanup_close_ int server_fd_close = server_fd, client_fd_close = client_fd,
+                            server_write_fd = -EBADF, client_write_fd = -EBADF;
+
+        assert(event);
+        assert(server_fd >= 0);
+        assert(client_fd >= 0);
+        assert(on_done);
+        assert(ret);
+
+        server_write_fd = fcntl(server_fd, F_DUPFD_CLOEXEC, 3);
+        if (server_write_fd < 0)
+                return -errno;
+
+        client_write_fd = fcntl(client_fd, F_DUPFD_CLOEXEC, 3);
+        if (client_write_fd < 0)
+                return -errno;
+
+        return socket_forward_new_pair(
+                        event,
+                        TAKE_FD(server_fd_close), TAKE_FD(server_write_fd),
+                        TAKE_FD(client_fd_close), TAKE_FD(client_write_fd),
+                        on_done, userdata, ret);
+}
index a2d34da38c6b9a245881c5a7065d6f266b4e03bc..a5b81b4a44f437d84adcaa762a797e24e891228a 100644 (file)
@@ -3,20 +3,22 @@
 
 #include "shared-forward.h"
 
-/* Bidirectional socket forwarder using splice().
+/* Bidirectional forwarder using splice().
  *
- * Forwards data between two bidirectional sockets ("server" and "client") via kernel pipe buffers,
- * avoiding userspace copies.
+ * Forwards data between two sides ("server" and "client") via kernel pipe buffers,
+ * avoiding userspace copies. Internally uses two independent half-duplex forwarders,
+ * one per direction. All four fds must be distinct - use dup()/fcntl(fd, F_DUPFD_CLOEXEC, 3) for bidirectional sockets.
  *
  * When forwarding completes (both directions reach EOF or error), the completion callback is invoked.
  *
- * The SocketForward takes ownership of both fds - they are closed when the SocketForward is freed
+ * The SocketForward takes ownership of all fds - they are closed when the SocketForward is freed
  * (or earlier, during normal forwarding when EOF/disconnect is detected). */
 
 typedef struct SocketForward SocketForward;
 
 typedef int (*socket_forward_done_t)(SocketForward *sf, int error, void *userdata);
 
+/* Create a forwarder between two bidirectional sockets. */
 int socket_forward_new(
                 sd_event *event,
                 int server_fd,
@@ -25,5 +27,17 @@ int socket_forward_new(
                 void *userdata,
                 SocketForward **ret);
 
+/* Create a forwarder between two fd pairs (e.g. stdin/stdout on one side, socket on the other).
+ * All four fds must be distinct - use dup()/fcntl(fd, F_DUPFD_CLOEXEC, 3) for bidirectional sockets. */
+int socket_forward_new_pair(
+                sd_event *event,
+                int server_read_fd,
+                int server_write_fd,
+                int client_read_fd,
+                int client_write_fd,
+                socket_forward_done_t on_done,
+                void *userdata,
+                SocketForward **ret);
+
 SocketForward* socket_forward_free(SocketForward *sf);
 DEFINE_TRIVIAL_CLEANUP_FUNC(SocketForward*, socket_forward_free);