]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-future: add fiber-aware non-blocking I/O wrappers
authorDaan De Meyer <daan@amutable.com>
Sat, 25 Apr 2026 20:31:58 +0000 (22:31 +0200)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
Add a family of sd_fiber_*() I/O wrappers that, when called from a
fiber, behave like blocking I/O from the caller's perspective but
yield to the event loop instead of blocking the thread:

  sd_fiber_read / sd_fiber_write
  sd_fiber_readv / sd_fiber_writev
  sd_fiber_recv / sd_fiber_send
  sd_fiber_connect
  sd_fiber_recvmsg / sd_fiber_sendmsg
  sd_fiber_recvfrom / sd_fiber_sendto
  sd_fiber_accept
  sd_fiber_ppoll

Most of them share a single helper, fiber_io_operation(), which when
invoked outside a fiber falls through to the underlying syscall
directly, preserving the regular blocking behaviour. Inside a fiber
the helper flips the fd to non-blocking (restoring its original mode
on return), tries the syscall once on the fast path, and on EAGAIN/
EWOULDBLOCK creates an sd-event-backed IO future via future_new_io(),
suspends the fiber, and retries the syscall once the event source
fires.

future_new_io() itself is added to sd-event/event-future.{c,h} as a
new IoFuture kind. It wraps sd_event_add_io() into an sd_future:
oneshot enable, EPOLLERR translated via SO_ERROR (suppressed for
non-sockets), and the fd duplicated with F_DUPFD_CLOEXEC to avoid
EEXIST when multiple sources watch the same descriptor.

Together these let fiber-using code write straight-line socket and
pipe I/O without bundling state into callbacks.

src/basic/io-util.c
src/basic/io-util.h
src/libsystemd/meson.build
src/libsystemd/sd-event/event-future.c
src/libsystemd/sd-event/event-future.h
src/libsystemd/sd-future/fiber-io.c [new file with mode: 0644]
src/libsystemd/sd-future/test-fiber-io.c [new file with mode: 0644]
src/systemd/sd-future.h

index 103aa2a7cde03a7f38d2605e151ccc7896676df3..b4f643b5721aa191ee4ec892e6f9f6757b458c30 100644 (file)
@@ -3,6 +3,7 @@
 #include <poll.h>
 #include <stdio.h>
 #include <string.h>
+#include <sys/epoll.h>          /* IWYU pragma: keep */
 #include <time.h>
 #include <unistd.h>
 
 #include "io-util.h"
 #include "time-util.h"
 
+/* EPOLL_POLL_COMMON_MASK in io-util.h treats POLL* and EPOLL* as interchangeable; verify it. */
+assert_cc((uint32_t) POLLIN    == EPOLLIN);
+assert_cc((uint32_t) POLLOUT   == EPOLLOUT);
+assert_cc((uint32_t) POLLERR   == EPOLLERR);
+assert_cc((uint32_t) POLLHUP   == EPOLLHUP);
+assert_cc((uint32_t) POLLPRI   == EPOLLPRI);
+assert_cc((uint32_t) POLLRDHUP == EPOLLRDHUP);
+
 int flush_fd(int fd) {
         int count = 0;
 
index 918108c023b68c50398b4a4f88cda54d536c6335..ab52dc8db6506d9ae8df3cbc9898c0e2e51c9bd8 100644 (file)
@@ -3,6 +3,12 @@
 
 #include "basic-forward.h"
 
+/* The intersection of poll() and epoll_wait() event masks. Linux defines POLL* and EPOLL* with the
+ * same numeric values for these — see the assert_cc()s in io-util.c — so this mask can be used
+ * interchangeably as a `revents` (poll) or `events` (epoll) bitset. */
+#define EPOLL_POLL_COMMON_MASK \
+        (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLRDHUP)
+
 int flush_fd(int fd);
 
 ssize_t loop_read(int fd, void *buf, size_t nbytes, bool do_poll);
index 061ff6213f61e5569b037ade7acdfaebc565ad4f..3c624051f0ded8a45ac5d08d093f30370974f78e 100644 (file)
@@ -77,6 +77,7 @@ sd_device_sources = files(
 ############################################################
 
 sd_future_sources = files(
+        'sd-future/fiber-io.c',
         'sd-future/fiber.c',
         'sd-future/sd-future.c',
 )
@@ -190,6 +191,7 @@ simple_tests += files(
         'sd-device/test-device-util.c',
         'sd-device/test-sd-device-monitor.c',
         'sd-future/test-fiber.c',
+        'sd-future/test-fiber-io.c',
         'sd-hwdb/test-sd-hwdb.c',
         'sd-id128/test-id128.c',
         'sd-journal/test-audit-type.c',
index 4595a7a6fb0b556fd0e323b0bc5deaf5299cacbc..8c9960fcdd9d0eff52b9ab1596a79e1301b8a98f 100644 (file)
@@ -6,6 +6,124 @@
 #include "alloc-util.h"
 #include "errno-util.h"
 #include "event-future.h"
+#include "fd-util.h"
+
+typedef struct IoFuture {
+        sd_event_source *source;
+} IoFuture;
+
+static void* io_future_alloc(void) {
+        return new0(IoFuture, 1);
+}
+
+static void io_future_free(sd_future *f) {
+        IoFuture *iof = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+        sd_event_source_unref(iof->source);
+        free(iof);
+}
+
+static int io_future_cancel(sd_future *f) {
+        IoFuture *iof = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+        int r = 0;
+
+        RET_GATHER(r, sd_event_source_set_enabled(iof->source, SD_EVENT_OFF));
+        RET_GATHER(r, sd_future_resolve(f, -ECANCELED));
+        return r;
+}
+
+static int io_future_set_priority(sd_future *f, int64_t priority) {
+        IoFuture *iof = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+        return sd_event_source_set_priority(iof->source, priority);
+}
+
+static const sd_future_ops io_future_ops = {
+        .size = sizeof(sd_future_ops),
+        .alloc = io_future_alloc,
+        .free = io_future_free,
+        .cancel = io_future_cancel,
+        .set_priority = io_future_set_priority,
+};
+
+static int io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
+        sd_future *f = ASSERT_PTR(userdata);
+
+        /* Resolve with the revents mask on success (matching io_uring poll_add's CQE convention) so
+         * callers can read it directly off the future result. EPOLLERR is the one exception: surface
+         * the actual socket error via SO_ERROR so callers like sd_fiber_connect() can return -errno
+         * directly without re-querying. */
+        if (FLAGS_SET(revents, EPOLLERR)) {
+                int error = 0;
+                socklen_t len = sizeof(error);
+
+                int r = RET_NERRNO(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len));
+                if (r == -ENOTSOCK)
+                        return sd_future_resolve(f, (int) revents);
+                if (r >= 0 && error != 0)
+                        return sd_future_resolve(f, -error);
+                if (r >= 0)
+                        /* EPOLLERR was reported but SO_ERROR returned no pending error (e.g.
+                         * already consumed elsewhere). Surface the revents mask so the caller
+                         * still sees the error condition rather than mistaking it for success. */
+                        return sd_future_resolve(f, (int) revents);
+                /* On any other getsockopt() error fall through and resolve the future with that
+                 * error so the waiting fiber wakes up rather than hanging forever. */
+                return sd_future_resolve(f, r);
+        }
+
+        return sd_future_resolve(f, (int) revents);
+}
+
+int future_new_io(sd_event *e, int fd, uint32_t events, sd_future **ret) {
+        int r;
+
+        assert(e);
+        assert(fd >= 0);
+        assert(ret);
+
+        if (IN_SET(sd_event_get_state(e), SD_EVENT_EXITING, SD_EVENT_FINISHED))
+                return -ECANCELED;
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        r = sd_future_new(&io_future_ops, &f);
+        if (r < 0)
+                return r;
+
+        IoFuture *iof = sd_future_get_private(f);
+
+        /* Duplicate fd to avoid EEXIST from epoll when adding the same fd multiple times */
+        _cleanup_close_ int fd_copy = fcntl(fd, F_DUPFD_CLOEXEC, 3);
+        if (fd_copy < 0)
+                return -errno;
+
+        r = sd_event_add_io(e, &iof->source, fd_copy, events, io_handler, f);
+        if (r < 0)
+                return r;
+
+        r = sd_event_source_set_io_fd_own(iof->source, true);
+        if (r < 0)
+                return r;
+
+        TAKE_FD(fd_copy);
+
+        r = sd_event_source_set_enabled(iof->source, SD_EVENT_ONESHOT);
+        if (r < 0)
+                return r;
+
+        if (sd_fiber_is_running()) {
+                int64_t priority;
+
+                r = sd_fiber_get_priority(&priority);
+                if (r < 0)
+                        return r;
+
+                r = sd_event_source_set_priority(iof->source, priority);
+                if (r < 0)
+                        return r;
+        }
+
+        *ret = TAKE_PTR(f);
+        return 0;
+}
 
 typedef struct TimeFuture {
         sd_event_source *source;
index 7e956906ebf74a208f7378d31217a086c83492a7..3bc275e7b7ac9b817cea158914b0bd3cd783c1d6 100644 (file)
@@ -3,5 +3,6 @@
 
 #include "sd-forward.h"
 
+int future_new_io(sd_event *e, int fd, uint32_t events, sd_future **ret);
 int future_new_time(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret);
 int future_new_time_relative(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret);
diff --git a/src/libsystemd/sd-future/fiber-io.c b/src/libsystemd/sd-future/fiber-io.c
new file mode 100644 (file)
index 0000000..823e890
--- /dev/null
@@ -0,0 +1,471 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <poll.h>
+#include <sys/epoll.h>          /* IWYU pragma: keep */
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "sd-event.h"
+#include "sd-future.h"
+
+#include "alloc-util.h"
+#include "errno-util.h"
+#include "event-future.h"
+#include "fd-util.h"
+#include "io-util.h"
+#include "time-util.h"
+
+typedef ssize_t (*FiberIOFunc)(int fd, void *args);
+
+static ssize_t fiber_io_operation(
+                int fd,
+                uint32_t events,
+                FiberIOFunc func,
+                void *args) {
+        _cleanup_(nonblock_resetp) int reset_fd = -EBADF;
+        int r;
+
+        assert(fd >= 0);
+        assert(func);
+
+        if (!sd_fiber_is_running())
+                return func(fd, args);
+
+        sd_event *e = sd_fiber_get_event();
+        assert(e);
+
+        r = fd_nonblock(fd, true);
+        if (r < 0)
+                return r;
+        if (r > 0)
+                reset_fd = fd;
+
+        ssize_t n = func(fd, args);
+        if (n >= 0 || !ERRNO_IS_NEG_TRANSIENT(n))
+                return n;
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *io = NULL;
+        r = future_new_io(e, fd, events, &io);
+        if (r < 0)
+                return r;
+
+        r = sd_fiber_suspend();
+        if (r < 0)
+                return r;
+
+        return func(fd, args);
+}
+
+typedef struct ReadArgs {
+        void *buf;
+        size_t count;
+} ReadArgs;
+
+static ssize_t read_callback(int fd, void *args) {
+        ReadArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = read(fd, a->buf, a->count);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_read(int fd, void *buf, size_t count) {
+        assert_return(fd >= 0, -EBADF);
+        assert_return(buf || count == 0, -EINVAL);
+
+        return fiber_io_operation(fd, EPOLLIN, read_callback, &(ReadArgs) {
+                .buf = buf,
+                .count = count,
+        });
+}
+
+typedef struct WriteArgs {
+        const void *buf;
+        size_t count;
+} WriteArgs;
+
+static ssize_t write_callback(int fd, void *args) {
+        WriteArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = write(fd, a->buf, a->count);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_write(int fd, const void *buf, size_t count) {
+        assert_return(fd >= 0, -EBADF);
+        assert_return(buf || count == 0, -EINVAL);
+
+        return fiber_io_operation(fd, EPOLLOUT, write_callback, &(WriteArgs) {
+                .buf = buf,
+                .count = count,
+        });
+}
+
+typedef struct ReadvArgs {
+        const struct iovec *iov;
+        int iovcnt;
+} ReadvArgs;
+
+static ssize_t readv_callback(int fd, void *args) {
+        ReadvArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = readv(fd, a->iov, a->iovcnt);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_readv(int fd, const struct iovec *iov, int iovcnt) {
+        assert_return(fd >= 0, -EBADF);
+        assert_return(iov || iovcnt == 0, -EINVAL);
+
+        return fiber_io_operation(fd, EPOLLIN, readv_callback, &(ReadvArgs) {
+                .iov = iov,
+                .iovcnt = iovcnt,
+        });
+}
+
+typedef struct WritevArgs {
+        const struct iovec *iov;
+        int iovcnt;
+} WritevArgs;
+
+static ssize_t writev_callback(int fd, void *args) {
+        WritevArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = writev(fd, a->iov, a->iovcnt);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_writev(int fd, const struct iovec *iov, int iovcnt) {
+        assert_return(fd >= 0, -EBADF);
+        assert_return(iov || iovcnt == 0, -EINVAL);
+
+        return fiber_io_operation(fd, EPOLLOUT, writev_callback, &(WritevArgs) {
+                .iov = iov,
+                .iovcnt = iovcnt,
+        });
+}
+
+typedef struct RecvArgs {
+        void *buf;
+        size_t len;
+        int flags;
+} RecvArgs;
+
+static ssize_t recv_callback(int fd, void *args) {
+        RecvArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = recv(fd, a->buf, a->len, a->flags);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_recv(int sockfd, void *buf, size_t len, int flags) {
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(buf || len == 0, -EINVAL);
+
+        return fiber_io_operation(sockfd, EPOLLIN, recv_callback, &(RecvArgs) {
+                .buf = buf,
+                .len = len,
+                .flags = flags,
+        });
+}
+
+typedef struct SendArgs {
+        const void *buf;
+        size_t len;
+        int flags;
+} SendArgs;
+
+static ssize_t send_callback(int fd, void *args) {
+        SendArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = send(fd, a->buf, a->len, a->flags);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_send(int sockfd, const void *buf, size_t len, int flags) {
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(buf || len == 0, -EINVAL);
+
+        return fiber_io_operation(sockfd, EPOLLOUT, send_callback, &(SendArgs) {
+                .buf = buf,
+                .len = len,
+                .flags = flags,
+        });
+}
+
+int sd_fiber_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
+        _cleanup_(nonblock_resetp) int reset_fd = -EBADF;
+        int r;
+
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(addr, -EINVAL);
+
+        if (!sd_fiber_is_running())
+                return RET_NERRNO(connect(sockfd, addr, addrlen));
+
+        sd_event *e = sd_fiber_get_event();
+        assert(e);
+
+        r = fd_nonblock(sockfd, true);
+        if (r < 0)
+                return r;
+        if (r > 0)
+                reset_fd = sockfd;
+
+        r = RET_NERRNO(connect(sockfd, addr, addrlen));
+        if (r != -EINPROGRESS)
+                return r;
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *io = NULL;
+        r = future_new_io(e, sockfd, EPOLLOUT, &io);
+        if (r < 0)
+                return r;
+
+        /* future_new_io resolves with the revents mask on success; translate any positive value
+         * (e.g. POLLOUT) back to the connect(2) success status. */
+        r = sd_fiber_suspend();
+        return r > 0 ? 0 : r;
+}
+
+typedef struct RecvmsgArgs {
+        struct msghdr *msg;
+        int flags;
+} RecvmsgArgs;
+
+static ssize_t recvmsg_callback(int fd, void *args) {
+        RecvmsgArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = recvmsg(fd, a->msg, a->flags);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) {
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(msg, -EINVAL);
+
+        return fiber_io_operation(sockfd, EPOLLIN, recvmsg_callback, &(RecvmsgArgs) {
+                .msg = msg,
+                .flags = flags,
+        });
+}
+
+typedef struct SendmsgArgs {
+        const struct msghdr *msg;
+        int flags;
+} SendmsgArgs;
+
+static ssize_t sendmsg_callback(int fd, void *args) {
+        SendmsgArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = sendmsg(fd, a->msg, a->flags);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(msg, -EINVAL);
+
+        return fiber_io_operation(sockfd, EPOLLOUT, sendmsg_callback, &(SendmsgArgs) {
+                .msg = msg,
+                .flags = flags,
+        });
+}
+
+static ssize_t recvfrom_callback(int fd, void *args) {
+        RecvmsgArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = recvmsg(fd, a->msg, a->flags);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) {
+        ssize_t n;
+
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(buf || len == 0, -EINVAL);
+        assert_return(!src_addr || addrlen, -EINVAL);
+
+        /* io_uring has no direct recvfrom prep helper, so emulate via recvmsg with a single-iovec
+         * msghdr. The kernel updates msg_namelen in place; we copy it back to *addrlen below. */
+        struct iovec iov = { .iov_base = buf, .iov_len = len };
+        struct msghdr msg = {
+                .msg_name = src_addr,
+                .msg_namelen = src_addr ? *addrlen : 0,
+                .msg_iov = &iov,
+                .msg_iovlen = 1,
+        };
+
+        n = fiber_io_operation(sockfd, EPOLLIN, recvfrom_callback, &(RecvmsgArgs) {
+                .msg = &msg,
+                .flags = flags,
+        });
+        if (n < 0)
+                return n;
+
+        if (addrlen)
+                *addrlen = msg.msg_namelen;
+
+        return n;
+}
+
+static ssize_t sendto_callback(int fd, void *args) {
+        SendmsgArgs *a = ASSERT_PTR(args);
+        ssize_t n;
+
+        n = sendmsg(fd, a->msg, a->flags);
+        return n >= 0 ? n : -errno;
+}
+
+ssize_t sd_fiber_sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) {
+        assert_return(sockfd >= 0, -EBADF);
+        assert_return(buf || len == 0, -EINVAL);
+
+        struct iovec iov = { .iov_base = (void *) buf, .iov_len = len };
+        struct msghdr msg = {
+                .msg_name = (void *) dest_addr,
+                .msg_namelen = dest_addr ? addrlen : 0,
+                .msg_iov = &iov,
+                .msg_iovlen = 1,
+        };
+
+        return fiber_io_operation(sockfd, EPOLLOUT, sendto_callback, &(SendmsgArgs) {
+                .msg = &msg,
+                .flags = flags,
+        });
+}
+
+typedef struct AcceptArgs {
+        struct sockaddr *addr;
+        socklen_t *addrlen;
+        int flags;
+} AcceptArgs;
+
+static ssize_t accept_callback(int fd, void *args) {
+        AcceptArgs *a = ASSERT_PTR(args);
+
+        return RET_NERRNO(accept4(fd, a->addr, a->addrlen, a->flags));
+}
+
+int sd_fiber_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
+        assert_return(sockfd >= 0, -EBADF);
+
+        return fiber_io_operation(sockfd, EPOLLIN, accept_callback, &(AcceptArgs) {
+                .addr = addr,
+                .addrlen = addrlen,
+                .flags = flags,
+        });
+}
+
+int sd_fiber_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask) {
+        int r;
+
+        assert_return(fds || n_fds == 0, -EINVAL);
+
+        if (!sd_fiber_is_running())
+                return RET_NERRNO(ppoll(fds, n_fds, timeout, sigmask));
+
+        /* When on a fiber signals are handled via sd-event hence we should never mess around with the
+         * signal mask when running on a fiber. */
+        assert_return(!sigmask, -EOPNOTSUPP);
+
+        sd_event *e = sd_fiber_get_event();
+        assert(e);
+
+        /* No fds to wait on and no timeout means there's nothing that could ever wake the fiber up,
+         * since unlike raw ppoll() we cannot use signal delivery as a wakeup. Signals received while
+         * the fiber is suspended are handled by sd-event via signalfd, in which case the signal handler
+         * is expected to cancel the fiber via sd_future_cancel() if a wakeup is desired. */
+        if (n_fds == 0 && !timeout)
+                return -EINVAL;
+
+        bool zero_timeout = timeout && timeout->tv_sec == 0 && timeout->tv_nsec == 0;
+
+        /* Try polling with zero timeout first to see if any are immediately ready. */
+        r = RET_NERRNO(ppoll(fds, n_fds, &(const struct timespec) {}, /* sigmask= */ NULL));
+        if (zero_timeout || r != 0) /* Either error or some fds are ready */
+                return r;
+
+        sd_future **futures = NULL;
+        CLEANUP_ARRAY(futures, n_fds, sd_future_cancel_wait_unref_array);
+
+        futures = new0(sd_future*, n_fds);
+        if (!futures)
+                return -ENOMEM;
+
+        /* Set up I/O event sources for all valid fds. POLL* and EPOLL* share their bit values (see
+         * EPOLL_POLL_COMMON_MASK in io-util.h), so we can pass the user-supplied event mask through
+         * to either backend without translation. */
+        size_t n_io_futures = 0;
+        for (size_t i = 0; i < n_fds; i++) {
+                if (fds[i].fd < 0)
+                        continue;
+
+                uint32_t events = fds[i].events & EPOLL_POLL_COMMON_MASK;
+                if (events == 0)
+                        continue;
+
+                r = future_new_io(e, fds[i].fd, events, &futures[i]);
+                if (r < 0)
+                        return r;
+
+                n_io_futures++;
+        }
+
+        /* A timeout that overflows usec_t saturates to USEC_INFINITY in timespec_load(); treat that
+         * like "no timeout" (matches sd_fiber_sleep(USEC_INFINITY)) rather than letting
+         * sd_event_add_time_relative() reject it with -EOVERFLOW — standard ppoll() would just
+         * wait a very long time. */
+        usec_t usec = timeout ? timespec_load(timeout) : USEC_INFINITY;
+
+        /* If every fd was skipped (negative or empty event mask) and we'd have no timer, there's
+         * nothing that could ever wake the fiber up — same situation as n_fds == 0 && !timeout,
+         * just not detectable upfront. Refuse rather than suspend forever. */
+        if (n_io_futures == 0 && usec == USEC_INFINITY)
+                return -EINVAL;
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *timer = NULL;
+        if (usec != USEC_INFINITY) {
+                r = future_new_time_relative(
+                                e,
+                                CLOCK_MONOTONIC,
+                                usec,
+                                /* accuracy= */ 1,
+                                /* result= */ 0,
+                                &timer);
+                if (r < 0)
+                        return r;
+        }
+
+        r = sd_fiber_suspend();
+        if (r < 0 && r != -ETIME)
+                return r;
+
+        /* Always sweep fds with a non-blocking ppoll(): the timer and an fd readiness can resolve in
+         * the same event-loop tick (or the fd can become ready between the timer firing and us being
+         * scheduled), and ppoll() semantics give events precedence over the timeout in that case. */
+        int n = RET_NERRNO(ppoll(fds, n_fds, &(const struct timespec) {}, /* sigmask= */ NULL));
+        if (n != 0)
+                return n;
+
+        /* No fds ready: distinguish our own timer from an external -ETIME. */
+        if (timer && sd_future_state(timer) == SD_FUTURE_RESOLVED)
+                return 0;
+
+        /* An IO future resolved with a revents mask (r > 0) but the readiness was already consumed
+         * by the time we swept — report 0 rather than leaking the bitmask as a (bogus) ppoll fd
+         * count to the caller. */
+        if (r > 0)
+                return 0;
+
+        return r;
+}
diff --git a/src/libsystemd/sd-future/test-fiber-io.c b/src/libsystemd/sd-future/test-fiber-io.c
new file mode 100644 (file)
index 0000000..aef3895
--- /dev/null
@@ -0,0 +1,1388 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <fcntl.h>
+#include <poll.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include "sd-event.h"
+#include "sd-future.h"
+
+#include "fd-util.h"
+#include "tests.h"
+#include "time-util.h"
+
+/* Test: Basic pipe I/O with sd-event */
+
+typedef struct PipeIOContext {
+        int *pipefd;
+        int order;
+} PipeIOContext;
+
+static int pipe_read_fiber(void *userdata) {
+        PipeIOContext *ctx = ASSERT_PTR(userdata);
+        char buf[64];
+        ssize_t n;
+
+        n = sd_fiber_read(ctx->pipefd[0], buf, sizeof(buf));
+        if (n < 0)
+                return (int) n;
+
+        /* Verify we read "hello" */
+        if (n != 5 || memcmp(buf, "hello", 5) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+TEST(fiber_io_basic) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        PipeIOContext ctx = { .pipefd = pipefd };
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "pipe-read", pipe_read_fiber, &ctx, NULL, &f));
+
+        /* Write data to the pipe */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "hello", 5), 5);
+
+        /* Run the scheduler - should process the I/O */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* Verify fiber read the data */
+        ASSERT_OK_EQ(sd_future_result(f), 5);
+}
+
+static int pipe_read_order_fiber(void *userdata) {
+        PipeIOContext *ctx = ASSERT_PTR(userdata);
+        char buf[64];
+        ssize_t n;
+
+        /* Record that the read fiber started before attempting the blocking read */
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        n = sd_fiber_read(ctx->pipefd[0], buf, sizeof(buf));
+        if (n < 0)
+                return (int) n;
+
+        /* After resuming, verify the write fiber ran while we were suspended */
+        ASSERT_EQ(ctx->order, 2);
+
+        /* Verify we read "hello" */
+        if (n != 5 || memcmp(buf, "hello", 5) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+static int pipe_write_order_fiber(void *userdata) {
+        PipeIOContext *ctx = ASSERT_PTR(userdata);
+
+        /* Verify the read fiber already ran and suspended before we started */
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        return sd_fiber_write(ctx->pipefd[1], "hello", STRLEN("hello"));
+}
+
+TEST(fiber_io_read_write) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        PipeIOContext ctx = { .pipefd = pipefd };
+
+        /* Higher priority for the read fiber, which will run first and then suspend because no data is
+         * available. The write fiber will run second, write data to the pipe, causing the read fiber to get
+         * resumed. */
+        _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
+        ASSERT_OK(sd_fiber_new(e, "pipe-read", pipe_read_order_fiber, &ctx, NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 0));
+        ASSERT_OK(sd_fiber_new(e, "pipe-write", pipe_write_order_fiber, &ctx, NULL, &fw));
+        ASSERT_OK(sd_future_set_priority(fw, 1));
+
+        /* Run the scheduler - should process the I/O */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* Verify both fibers completed and the full read->suspend->write->resume sequence occurred */
+        ASSERT_OK_EQ(sd_future_result(fr), 5);
+        ASSERT_OK_EQ(sd_future_result(fw), 5);
+}
+
+/* Test: Multiple concurrent reads */
+static int concurrent_read_fiber(void *userdata) {
+        int *args = userdata;
+        int fd = args[0];
+        int expected = args[1];
+        char buf[64];
+        ssize_t n;
+
+        n = sd_fiber_read(fd, buf, sizeof buf);
+        if (n < 0)
+                return (int) n;
+
+        if (n != 1 || buf[0] != (char) expected)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_io_concurrent) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        sd_future *fibers[3] = {};
+        CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear);
+
+        /* Create 3 pipes and 3 fibers */
+        int pipes[3][2];
+        int args[3][2];
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++) {
+                ASSERT_OK_ERRNO(pipe2(pipes[i], O_CLOEXEC | O_NONBLOCK));
+                args[i][0] = pipes[i][0];
+                args[i][1] = 'A' + i;
+                ASSERT_OK(sd_fiber_new(e, "concurrent-read", concurrent_read_fiber, args[i], NULL, &fibers[i]));
+        }
+
+        /* Write data in reverse order */
+        ASSERT_EQ(write(pipes[2][1], "C", 1), 1);
+        ASSERT_EQ(write(pipes[1][1], "B", 1), 1);
+        ASSERT_EQ(write(pipes[0][1], "A", 1), 1);
+
+        /* Run until all complete */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* All should complete successfully */
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++) {
+                ASSERT_OK(sd_future_result(fibers[i]));
+                safe_close_pair(pipes[i]);
+        }
+}
+
+/* Test: Cancel fiber during I/O */
+static int blocking_read_fiber(void *userdata) {
+        int fd = PTR_TO_INT(userdata);
+        char buf[64];
+        ssize_t n;
+
+        n = sd_fiber_read(fd, buf, sizeof(buf));
+        return (int) n;
+}
+
+TEST(fiber_io_cancel) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "blocking-read", blocking_read_fiber, INT_TO_PTR(pipefd[0]), NULL, &f));
+
+        /* Run once - fiber will suspend on read */
+        ASSERT_OK_POSITIVE(sd_event_run(e, 0));
+
+        /* Fiber should be suspended now - add explicit check via state tracking */
+
+        /* Cancel the fiber */
+        ASSERT_OK(sd_future_cancel(f));
+
+        /* Run to completion */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* Should be cancelled */
+        ASSERT_ERROR(sd_future_result(f), ECANCELED);
+}
+
+TEST(fiber_io_fallback) {
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));  /* Note: blocking pipe */
+
+        char buf[STRLEN("fallback")] = {};
+        ASSERT_OK_EQ(sd_fiber_write(pipefd[1], "fallback", sizeof(buf)), (ssize_t) sizeof(buf));
+        ASSERT_OK_EQ(sd_fiber_read(pipefd[0], buf, sizeof(buf)), (ssize_t) sizeof(buf));
+}
+
+static int pipe_readv_order_fiber(void *userdata) {
+        PipeIOContext *ctx = ASSERT_PTR(userdata);
+        char buf1[5], buf2[5];
+        struct iovec iov[] = {
+                { .iov_base = buf1, .iov_len = sizeof(buf1) },
+                { .iov_base = buf2, .iov_len = sizeof(buf2) },
+        };
+        ssize_t n;
+
+        /* Record that the read fiber started before attempting the blocking read */
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        /* This will initially block since no data is available */
+        n = sd_fiber_readv(ctx->pipefd[0], iov, ELEMENTSOF(iov));
+        if (n < 0)
+                return (int) n;
+
+        /* After resuming, verify the write fiber ran while we were suspended */
+        ASSERT_EQ(ctx->order, 2);
+
+        if (n != 10 || memcmp(buf1, "fiber", 5) != 0 || memcmp(buf2, "readv", 5) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+static int pipe_writev_order_fiber(void *userdata) {
+        PipeIOContext *ctx = ASSERT_PTR(userdata);
+        const char *part1 = "fiber";
+        const char *part2 = "readv";
+        struct iovec iov[] = {
+                { .iov_base = (void*) part1, .iov_len = 5 },
+                { .iov_base = (void*) part2, .iov_len = 5 },
+        };
+
+        /* Verify the read fiber already ran and suspended before we started */
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        return sd_fiber_writev(ctx->pipefd[1], iov, ELEMENTSOF(iov));
+}
+
+TEST(fiber_io_readv_writev) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        PipeIOContext ctx = { .pipefd = pipefd };
+
+        /* Higher priority for the read fiber, which will run first and then suspend because no data is
+         * available. The write fiber will run second, write data to the pipe, causing the read fiber to get
+         * resumed. */
+        _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
+        ASSERT_OK(sd_fiber_new(e, "pipe-readv", pipe_readv_order_fiber, &ctx, NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 0));
+        ASSERT_OK(sd_fiber_new(e, "pipe-writev", pipe_writev_order_fiber, &ctx, NULL, &fw));
+        ASSERT_OK(sd_future_set_priority(fw, 1));
+
+        /* Run the scheduler - should process the I/O */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* Verify both fibers completed and the full read->suspend->write->resume sequence occurred */
+        ASSERT_OK_EQ(sd_future_result(fr), 10);
+        ASSERT_OK_EQ(sd_future_result(fw), 10);
+}
+
+static int concurrent_readv_fiber(void *userdata) {
+        int *args = userdata;
+        int fd = args[0];
+        int expected1 = args[1];
+        int expected2 = args[2];
+        char buf1[1], buf2[1];
+        struct iovec iov[] = {
+                { .iov_base = buf1, .iov_len = sizeof(buf1) },
+                { .iov_base = buf2, .iov_len = sizeof(buf2) },
+        };
+        ssize_t n;
+
+        n = sd_fiber_readv(fd, iov, ELEMENTSOF(iov));
+        if (n < 0)
+                return (int) n;
+
+        if (n != 2 || buf1[0] != (char) expected1 || buf2[0] != (char) expected2)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_io_readv_concurrent) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        sd_future *fibers[3] = {};
+        CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear);
+
+        /* Create 3 pipes and 3 fibers */
+        int pipes[3][2];
+        int args[3][3];
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++) {
+                ASSERT_OK_ERRNO(pipe2(pipes[i], O_CLOEXEC | O_NONBLOCK));
+                args[i][0] = pipes[i][0];
+                args[i][1] = 'A' + i;
+                args[i][2] = 'a' + i;
+                ASSERT_OK(sd_fiber_new(e, "concurrent-readv", concurrent_readv_fiber, args[i], NULL, &fibers[i]));
+        }
+
+        /* Write data in reverse order */
+        ASSERT_EQ(write(pipes[2][1], "Cc", 2), 2);
+        ASSERT_EQ(write(pipes[1][1], "Bb", 2), 2);
+        ASSERT_EQ(write(pipes[0][1], "Aa", 2), 2);
+
+        /* Run until all complete */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* All should complete successfully */
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++) {
+                ASSERT_OK(sd_future_result(fibers[i]));
+                safe_close_pair(pipes[i]);
+        }
+}
+
+typedef struct SocketIOContext {
+        int *sockfd;
+        int order;
+} SocketIOContext;
+
+static int socket_send_order_fiber(void *userdata) {
+        SocketIOContext *ctx = ASSERT_PTR(userdata);
+
+        /* Verify the recv fiber already ran and suspended before we started */
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        return sd_fiber_send(ctx->sockfd[0], "socket", STRLEN("socket"), 0);
+}
+
+static int socket_recv_order_fiber(void *userdata) {
+        SocketIOContext *ctx = ASSERT_PTR(userdata);
+        char buf[64];
+        ssize_t n;
+
+        /* Record that the recv fiber started before attempting the blocking recv */
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        n = sd_fiber_recv(ctx->sockfd[1], buf, sizeof(buf), 0);
+        if (n < 0)
+                return (int) n;
+
+        /* After resuming, verify the send fiber ran while we were suspended */
+        ASSERT_EQ(ctx->order, 2);
+
+        /* Verify we received "socket" */
+        if (n != 6 || memcmp(buf, "socket", 6) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+TEST(fiber_io_recv_send) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd));
+
+        SocketIOContext ctx = { .sockfd = sockfd };
+
+        /* Higher priority for the recv fiber, which will run first and suspend */
+        _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL;
+        ASSERT_OK(sd_fiber_new(e, "socket-recv", socket_recv_order_fiber, &ctx, NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 0));
+        ASSERT_OK(sd_fiber_new(e, "socket-send", socket_send_order_fiber, &ctx, NULL, &fs));
+        ASSERT_OK(sd_future_set_priority(fs, 1));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        /* Verify both fibers completed and the full recv->suspend->send->resume sequence occurred */
+        ASSERT_OK_EQ(sd_future_result(fr), 6);
+        ASSERT_OK_EQ(sd_future_result(fs), 6);
+}
+
+static int socket_recv_peek_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+        char buf1[64], buf2[64];
+        ssize_t n1, n2;
+
+        /* First peek at the data */
+        n1 = sd_fiber_recv(sockfd, buf1, sizeof(buf1), MSG_PEEK);
+        if (n1 < 0)
+                return (int) n1;
+
+        /* Then actually read it */
+        n2 = sd_fiber_recv(sockfd, buf2, sizeof(buf2), 0);
+        if (n2 < 0)
+                return (int) n2;
+
+        /* Both should have read the same data */
+        if (n1 != n2 || memcmp(buf1, buf2, n1) != 0)
+                return -EIO;
+
+        if (n1 != 4 || memcmp(buf1, "peek", 4) != 0)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_io_recv_peek) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "socket-recv-peek", socket_recv_peek_fiber, INT_TO_PTR(sockfd[1]), NULL, &f));
+
+        /* Write data to the socket */
+        ASSERT_OK_EQ_ERRNO(write(sockfd[0], "peek", 4), 4);
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+static int socket_connect_fiber(void *userdata) {
+        struct sockaddr_un *addr = userdata;
+        _cleanup_close_ int sockfd = -EBADF;
+
+        sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+        if (sockfd < 0)
+                return -errno;
+
+        return sd_fiber_connect(sockfd, (struct sockaddr*) addr, sizeof(*addr));
+}
+
+TEST(fiber_io_connect) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create listening socket with abstract namespace */
+        _cleanup_close_ int listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+        ASSERT_OK(listen_fd);
+
+        /* Use abstract socket (starts with null byte) */
+        struct sockaddr_un addr = {
+                .sun_family = AF_UNIX,
+        };
+        addr.sun_path[0] = '\0';
+        snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-connect-%d", getpid());
+
+        ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)));
+        ASSERT_OK(listen(listen_fd, 1));
+
+        /* Create fiber to connect */
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "socket-connect", socket_connect_fiber, &addr, NULL, &f));
+
+        /* Run the event loop - connection should complete */
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+static int socket_sendmsg_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+        struct iovec iov = {
+                .iov_base = (void*) "message",
+                .iov_len = STRLEN("message"),
+        };
+        struct msghdr msg = {
+                .msg_iov = &iov,
+                .msg_iovlen = 1,
+        };
+
+        return sd_fiber_sendmsg(sockfd, &msg, 0);
+}
+
+static int socket_recvmsg_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+        char buf[64];
+        struct iovec iov = {
+                .iov_base = buf,
+                .iov_len = sizeof(buf),
+        };
+        struct msghdr msg = {
+                .msg_iov = &iov,
+                .msg_iovlen = 1,
+        };
+        ssize_t n;
+
+        n = sd_fiber_recvmsg(sockfd, &msg, 0);
+        if (n < 0)
+                return (int) n;
+
+        if (n != 7 || memcmp(buf, "message", 7) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+TEST(fiber_io_recvmsg_sendmsg) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd));
+
+        _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL;
+        ASSERT_OK(sd_fiber_new(e, "socket-recvmsg", socket_recvmsg_fiber, INT_TO_PTR(sockfd[1]), NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 1));
+        ASSERT_OK(sd_fiber_new(e, "socket-sendmsg", socket_sendmsg_fiber, INT_TO_PTR(sockfd[0]), NULL, &fs));
+        ASSERT_OK(sd_future_set_priority(fs, 0));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK_EQ(sd_future_result(fr), 7);
+        ASSERT_OK_EQ(sd_future_result(fs), 7);
+}
+
+static int socket_sendto_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+
+        /* For socketpair dgram sockets, we can use NULL address since they're connected */
+        return sd_fiber_sendto(sockfd, "datagram", STRLEN("datagram"), 0, NULL, 0);
+}
+
+static int socket_recvfrom_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+        char buf[64];
+        struct sockaddr_un addr;
+        socklen_t addr_len = sizeof(addr);
+        ssize_t n;
+
+        n = sd_fiber_recvfrom(sockfd, buf, sizeof(buf), 0,
+                              (struct sockaddr*) &addr, &addr_len);
+        if (n < 0)
+                return (int) n;
+
+        if (n != 8 || memcmp(buf, "datagram", 8) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+TEST(fiber_io_recvfrom_sendto) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd));
+
+        _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL;
+        ASSERT_OK(sd_fiber_new(e, "socket-recvfrom", socket_recvfrom_fiber, INT_TO_PTR(sockfd[1]), NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 1));
+        ASSERT_OK(sd_fiber_new(e, "socket-sendto", socket_sendto_fiber, INT_TO_PTR(sockfd[0]), NULL, &fs));
+        ASSERT_OK(sd_future_set_priority(fs, 0));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK_EQ(sd_future_result(fr), 8);
+        ASSERT_OK_EQ(sd_future_result(fs), 8);
+}
+
+static int socket_sendmsg_fd_fiber(void *userdata) {
+        int *args = userdata;
+        int sockfd = args[0];
+        int fd_to_send = args[1];
+        struct iovec iov = {
+                .iov_base = (void*) "X",
+                .iov_len = 1,
+        };
+        union {
+                struct cmsghdr cmsghdr;
+                uint8_t buf[CMSG_SPACE(sizeof(int))];
+        } control = {};
+        struct msghdr msg = {
+                .msg_iov = &iov,
+                .msg_iovlen = 1,
+                .msg_control = &control,
+                .msg_controllen = sizeof(control),
+        };
+        struct cmsghdr *cmsg;
+
+        cmsg = CMSG_FIRSTHDR(&msg);
+        cmsg->cmsg_level = SOL_SOCKET;
+        cmsg->cmsg_type = SCM_RIGHTS;
+        cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+        memcpy(CMSG_DATA(cmsg), &fd_to_send, sizeof(int));
+
+        return sd_fiber_sendmsg(sockfd, &msg, 0);
+}
+
+static int socket_recvmsg_fd_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+        char buf[1];
+        struct iovec iov = {
+                .iov_base = buf,
+                .iov_len = sizeof(buf),
+        };
+        union {
+                struct cmsghdr cmsghdr;
+                uint8_t buf[CMSG_SPACE(sizeof(int))];
+        } control = {};
+        struct msghdr msg = {
+                .msg_iov = &iov,
+                .msg_iovlen = 1,
+                .msg_control = &control,
+                .msg_controllen = sizeof(control),
+        };
+        struct cmsghdr *cmsg;
+        int received_fd;
+        ssize_t n;
+
+        n = sd_fiber_recvmsg(sockfd, &msg, 0);
+        if (n < 0)
+                return (int) n;
+
+        if (n != 1 || buf[0] != 'X')
+                return -EIO;
+
+        /* Extract the file descriptor */
+        cmsg = CMSG_FIRSTHDR(&msg);
+        if (!cmsg || cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
+                return -EIO;
+
+        memcpy(&received_fd, CMSG_DATA(cmsg), sizeof(int));
+
+        /* Verify we can use the fd */
+        if (fcntl(received_fd, F_GETFD) < 0)
+                return -errno;
+
+        close(received_fd);
+        return 0;
+}
+
+TEST(fiber_io_sendmsg_recvmsg_fd) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd));
+
+        /* Create a test file descriptor to send */
+        _cleanup_close_ int test_fd = open("/dev/null", O_RDONLY | O_CLOEXEC);
+        ASSERT_OK_ERRNO(test_fd);
+
+        _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL;
+        int args[2] = { sockfd[0], test_fd };
+        ASSERT_OK(sd_fiber_new(e, "socket-recvmsg-fd", socket_recvmsg_fd_fiber, INT_TO_PTR(sockfd[1]), NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 1));
+        ASSERT_OK(sd_fiber_new(e, "socket-sendmsg-fd", socket_sendmsg_fd_fiber, args, NULL, &fs));
+        ASSERT_OK(sd_future_set_priority(fs, 0));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(sd_future_result(fr));
+        ASSERT_OK_EQ(sd_future_result(fs), 1);
+}
+
+TEST(fiber_io_socket_fallback) {
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        char buf[STRLEN("fallback")] = {};
+
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, sockfd));
+
+        /* Test send/recv without fiber context */
+        ASSERT_OK_EQ(sd_fiber_send(sockfd[0], "fallback", sizeof(buf), 0), (ssize_t) sizeof(buf));
+        ASSERT_OK_EQ(sd_fiber_recv(sockfd[1], buf, sizeof(buf), 0), (ssize_t) sizeof(buf));
+
+        /* Test sendto/recvfrom without fiber context */
+        ASSERT_OK_EQ(sd_fiber_sendto(sockfd[0], "fallback", sizeof(buf), 0, NULL, 0), (ssize_t) sizeof(buf));
+        ASSERT_OK_EQ(sd_fiber_recvfrom(sockfd[1], buf, sizeof(buf), 0, NULL, NULL), (ssize_t) sizeof(buf));
+}
+
+static int blocking_recv_fiber(void *userdata) {
+        int sockfd = PTR_TO_INT(userdata);
+        char buf[64];
+
+        return sd_fiber_recv(sockfd, buf, sizeof(buf), 0);
+}
+
+TEST(fiber_io_socket_cancel) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "blocking-recv", blocking_recv_fiber, INT_TO_PTR(sockfd[0]), NULL, &f));
+
+        /* Run once - fiber will suspend on recv */
+        ASSERT_OK_POSITIVE(sd_event_run(e, 0));
+
+        /* Cancel the fiber */
+        ASSERT_OK(sd_future_cancel(f));
+
+        /* Run to completion */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* Should be cancelled */
+        ASSERT_ERROR(sd_future_result(f), ECANCELED);
+}
+
+/* Test: Basic accept operation */
+static int accept_fiber(void *userdata) {
+        int listen_fd = PTR_TO_INT(userdata);
+        struct sockaddr_un addr;
+        socklen_t addr_len = sizeof(addr);
+        int client_fd;
+
+        client_fd = sd_fiber_accept(listen_fd, (struct sockaddr*) &addr, &addr_len, SOCK_CLOEXEC);
+        if (client_fd < 0)
+                return client_fd;
+
+        close(client_fd);
+        return 0;
+}
+
+TEST(fiber_io_accept_basic) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create listening socket with abstract namespace */
+        _cleanup_close_ int listen_fd = -EBADF;
+        ASSERT_OK_ERRNO(listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0));
+
+        struct sockaddr_un addr = {
+                .sun_family = AF_UNIX,
+        };
+        addr.sun_path[0] = '\0';
+        snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-accept-%d", getpid());
+
+        ASSERT_OK_ERRNO(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)));
+        ASSERT_OK_ERRNO(listen(listen_fd, 1));
+
+        /* Create fiber to accept connection */
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "accept", accept_fiber, INT_TO_PTR(listen_fd), NULL, &f));
+
+        /* Connect from outside fiber context */
+        _cleanup_close_ int connect_fd = -EBADF;
+        ASSERT_OK(connect_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0));
+        ASSERT_OK(connect(connect_fd, (struct sockaddr*) &addr, sizeof(addr)));
+
+        /* Run the event loop - accept should complete */
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: Multiple sequential accepts */
+static int accept_multiple_fiber(void *userdata) {
+        int listen_fd = PTR_TO_INT(userdata);
+        struct sockaddr_un addr;
+        socklen_t addr_len;
+        int count = 0;
+
+        for (int i = 0; i < 3; i++) {
+                _cleanup_close_ int client_fd = -EBADF;
+
+                addr_len = sizeof(addr);
+                client_fd = sd_fiber_accept(listen_fd, (struct sockaddr*) &addr, &addr_len, SOCK_CLOEXEC);
+                if (client_fd < 0)
+                        return client_fd;
+
+                count++;
+        }
+
+        return count;
+}
+
+TEST(fiber_io_accept_multiple) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create listening socket */
+        _cleanup_close_ int listen_fd = -EBADF;
+        ASSERT_OK(listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0));
+
+        struct sockaddr_un addr = {
+                .sun_family = AF_UNIX,
+        };
+        addr.sun_path[0] = '\0';
+        snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-accept-multi-%d", getpid());
+
+        ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)));
+        ASSERT_OK(listen(listen_fd, 5));
+
+        /* Create fiber to accept multiple connections */
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "accept-multiple", accept_multiple_fiber, INT_TO_PTR(listen_fd), NULL, &f));
+
+        /* Connect multiple times */
+        int connect_fds[3] = { -EBADF, -EBADF, -EBADF };
+        for (size_t i = 0; i < 3; i++) {
+                connect_fds[i] = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+                ASSERT_OK(connect_fds[i]);
+                ASSERT_OK(connect(connect_fds[i], (struct sockaddr*) &addr, sizeof(addr)));
+        }
+
+        /* Run the event loop */
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK_EQ(sd_future_result(f), 3);
+
+        /* Clean up connection fds */
+        for (size_t i = 0; i < 3; i++)
+                safe_close(connect_fds[i]);
+}
+
+/* Test: Accept and exchange data */
+static int accept_and_read_fiber(void *userdata) {
+        int listen_fd = PTR_TO_INT(userdata);
+        _cleanup_close_ int client_fd = -EBADF;
+        char buf[64];
+        ssize_t n;
+
+        client_fd = sd_fiber_accept(listen_fd, NULL, NULL, SOCK_CLOEXEC);
+        if (client_fd < 0)
+                return client_fd;
+
+        n = sd_fiber_read(client_fd, buf, sizeof(buf));
+        if (n < 0)
+                return (int) n;
+
+        if (n != 5 || memcmp(buf, "hello", 5) != 0)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_io_accept_and_read) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create listening socket */
+        _cleanup_close_ int listen_fd = -EBADF;
+        ASSERT_OK(listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0));
+
+        struct sockaddr_un addr = {
+                .sun_family = AF_UNIX,
+        };
+        addr.sun_path[0] = '\0';
+        snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-accept-read-%d", getpid());
+
+        ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)));
+        ASSERT_OK(listen(listen_fd, 1));
+
+        /* Create fiber to accept and read */
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "accept-and-read", accept_and_read_fiber, INT_TO_PTR(listen_fd), NULL, &f));
+
+        /* Connect and send data */
+        _cleanup_close_ int connect_fd = -EBADF;
+        ASSERT_OK(connect_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0));
+        ASSERT_OK(connect(connect_fd, (struct sockaddr*) &addr, sizeof(addr)));
+        ASSERT_OK_EQ_ERRNO(write(connect_fd, "hello", 5), 5);
+
+        /* Run the event loop */
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: poll with single fd ready immediately */
+static int poll_immediate_fiber(void *userdata) {
+        int *pipefd = userdata;
+        struct pollfd fds[] = {
+                { .fd = pipefd[0], .events = POLLIN },
+        };
+        int r;
+
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL);
+        if (r < 0)
+                return r;
+
+        /* Should have one fd ready */
+        if (r != 1)
+                return -EIO;
+
+        if (!(fds[0].revents & POLLIN))
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_poll_immediate) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        /* Write data before creating fiber */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "X", 1), 1);
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-immediate", poll_immediate_fiber, pipefd, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: poll with fd that becomes ready after suspension */
+static int poll_fiber(void *userdata) {
+        int *pipefd = userdata;
+        struct pollfd fds[] = {
+                { .fd = pipefd[0], .events = POLLIN },
+        };
+        int r;
+
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL);
+        if (r < 0)
+                return r;
+
+        if (r != 1 || !(fds[0].revents & POLLIN))
+                return -EIO;
+
+        /* Read the data */
+        char buf[1];
+        if (read(pipefd[0], buf, 1) != 1 || buf[0] != 'Y')
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_poll) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-suspend", poll_fiber, pipefd, NULL, &f));
+
+        /* Run once - fiber will suspend on poll */
+        ASSERT_OK_POSITIVE(sd_event_run(e, 0));
+
+        /* Write data to wake it up */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "Y", 1), 1);
+
+        /* Complete execution */
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: poll with multiple fds */
+static int poll_multiple_fiber(void *userdata) {
+        int (*pipes)[2] = userdata;
+        struct pollfd fds[] = {
+                { .fd = pipes[0][0], .events = POLLIN },
+                { .fd = pipes[1][0], .events = POLLIN },
+                { .fd = pipes[2][0], .events = POLLIN },
+        };
+        int r;
+
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL);
+        if (r < 0)
+                return r;
+
+        /* Should have all three ready */
+        if (r != 3)
+                return -EIO;
+
+        for (size_t i = 0; i < 3; i++) {
+                if (!(fds[i].revents & POLLIN))
+                        return -EIO;
+
+                char buf[1];
+                if (read(fds[i].fd, buf, 1) != 1 || buf[0] != (char) ('A' + i))
+                        return -EIO;
+        }
+
+        return 0;
+}
+
+TEST(fiber_poll_multiple) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create three pipes */
+        int pipes[3][2];
+        for (size_t i = 0; i < 3; i++)
+                ASSERT_OK_ERRNO(pipe2(pipes[i], O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-multiple", poll_multiple_fiber, pipes, NULL, &f));
+
+        /* Run once - fiber will suspend waiting for data */
+        ASSERT_OK_POSITIVE(sd_event_run(e, 0));
+
+        /* Write to all three pipes in different order */
+        ASSERT_OK_EQ_ERRNO(write(pipes[2][1], "C", 1), 1);
+        ASSERT_OK_EQ_ERRNO(write(pipes[0][1], "A", 1), 1);
+        ASSERT_OK_EQ_ERRNO(write(pipes[1][1], "B", 1), 1);
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+
+        for (size_t i = 0; i < 3; i++)
+                safe_close_pair(pipes[i]);
+}
+
+/* Test: poll with POLLOUT (write readiness) */
+static int poll_pollout_fiber(void *userdata) {
+        int *pipefd = userdata;
+        struct pollfd fds[] = {
+                { .fd = pipefd[1], .events = POLLOUT },
+        };
+        int r;
+
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL);
+        if (r < 0)
+                return r;
+
+        if (r != 1 || !(fds[0].revents & POLLOUT))
+                return -EIO;
+
+        /* Pipe should be writable */
+        if (write(pipefd[1], "Z", 1) != 1)
+                return -errno;
+
+        return 0;
+}
+
+TEST(fiber_poll_pollout) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-pollout", poll_pollout_fiber, pipefd, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+
+        /* Verify data was written */
+        char buf[1];
+        ASSERT_OK_EQ_ERRNO(read(pipefd[0], buf, 1), 1);
+        ASSERT_EQ(buf[0], 'Z');
+}
+
+/* Test: poll with timeout that expires */
+static int poll_timeout_fiber(void *userdata) {
+        int *pipefd = userdata;
+        struct pollfd fds[] = {
+                { .fd = pipefd[0], .events = POLLIN },
+        };
+        int r;
+
+        /* Poll with 100ms timeout - no data will arrive */
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), &(struct timespec) { .tv_nsec = 100 * NSEC_PER_MSEC }, NULL);
+        if (r < 0)
+                return r;
+
+        /* Should timeout with no fds ready */
+        if (r != 0)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_poll_timeout) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-timeout", poll_timeout_fiber, pipefd, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: poll with zero timeout (should not block) */
+static int poll_zero_timeout_fiber(void *userdata) {
+        int *pipefd = userdata;
+        struct pollfd fds[] = {
+                { .fd = pipefd[0], .events = POLLIN },
+        };
+        int r;
+
+        /* Poll with zero timeout - should return immediately */
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), &(struct timespec) {}, NULL);
+        if (r < 0)
+                return r;
+
+        /* No data available, so should return 0 */
+        if (r != 0)
+                return -EIO;
+
+        /* Now write data */
+        if (write(pipefd[1], "Q", 1) != 1)
+                return -errno;
+
+        /* Poll again with zero timeout - should see data */
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL);
+        if (r < 0)
+                return r;
+
+        if (r != 1 || !(fds[0].revents & POLLIN))
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_poll_zero_timeout) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-zero-timeout", poll_zero_timeout_fiber, pipefd, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: poll with zero fds and zero timeout (should return immediately) */
+static int poll_zero_fds_fiber(void *userdata) {
+        return sd_fiber_ppoll(NULL, 0, &(struct timespec) {}, NULL);
+}
+
+TEST(fiber_poll_zero_fds) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-zero-fds", poll_zero_fds_fiber, NULL, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK_EQ(sd_future_result(f), 0);
+}
+
+/* Test: poll with zero fds and no timeout has no possible wakeup, must reject with -EINVAL */
+static int poll_zero_fds_no_timeout_fiber(void *userdata) {
+        return sd_fiber_ppoll(NULL, 0, NULL, NULL);
+}
+
+TEST(fiber_poll_zero_fds_no_timeout) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-zero-fds-no-timeout", poll_zero_fds_no_timeout_fiber, NULL, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_ERROR(sd_future_result(f), EINVAL);
+}
+
+/* Test: poll with negative fd (should be ignored) */
+static int poll_negative_fd_fiber(void *userdata) {
+        int *pipefd = userdata;
+        struct pollfd fds[] = {
+                { .fd = -1, .events = POLLIN },
+                { .fd = pipefd[0], .events = POLLIN },
+        };
+        int r;
+
+        r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL);
+        if (r < 0)
+                return r;
+
+        /* Only the second fd should be ready */
+        if (r != 1 || !(fds[1].revents & POLLIN))
+                return -EIO;
+
+        /* First fd should have no events */
+        if (fds[0].revents != 0)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_poll_negative_fd) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        /* Write data before creating fiber */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "N", 1), 1);
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "poll-negative-fd", poll_negative_fd_fiber, pipefd, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: Multiple fibers waiting on the same fd */
+typedef struct SharedFdArgs {
+        int pipefd;
+        int *counter;
+} SharedFdArgs;
+
+static int shared_fd_read_fiber(void *userdata) {
+        SharedFdArgs *args = ASSERT_PTR(userdata);
+        char buf[1];
+        ssize_t n;
+
+        n = sd_fiber_read(args->pipefd, buf, sizeof(buf));
+        if (n < 0)
+                return (int) n;
+
+        if (n != 1)
+                return -EIO;
+
+        /* Increment counter to track successful reads */
+        (*args->counter)++;
+
+        return 0;
+}
+
+TEST(fiber_io_same_fd_multiple_fibers) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        /* Create 3 fibers all waiting on the same pipe read end */
+        sd_future *fibers[3] = {};
+        CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear);
+        SharedFdArgs args[3];
+        int counter = 0;
+
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++) {
+                args[i].pipefd = pipefd[0];
+                args[i].counter = &counter;
+                ASSERT_OK(sd_fiber_new(e, "shared-fd-read", shared_fd_read_fiber, &args[i], NULL, &fibers[i]));
+        }
+
+        /* All fibers should suspend waiting for data */
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++)
+                ASSERT_OK_POSITIVE(sd_event_run(e, 0));
+
+        /* Write 3 bytes - each byte will wake one fiber */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "ABC", 3), 3);
+
+        /* Run until all fibers complete */
+        ASSERT_OK(sd_event_loop(e));
+
+        /* All should complete successfully and each should have read one byte */
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++)
+                ASSERT_OK(sd_future_result(fibers[i]));
+
+        ASSERT_EQ(counter, 3);
+}
+
+static int blocking_fd_preserve_fiber(void *userdata) {
+        int *pipefd = ASSERT_PTR(userdata);
+        char buf[8] = {};
+        ssize_t n;
+
+        /* The pipe has data pre-filled, so this should succeed immediately on the fast path.
+         * This exercises the fd blocking state restore: fiber_io_operation() temporarily sets the fd
+         * to nonblocking, and must restore it to blocking on the success path. */
+        n = sd_fiber_read(pipefd[0], buf, sizeof(buf));
+        if (n < 0)
+                return (int) n;
+
+        if ((size_t) n != sizeof(buf) || memcmp(buf, "blocking", sizeof(buf)) != 0)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(fiber_io_blocking_fd_preserved) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create a blocking pipe (no O_NONBLOCK) */
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+
+        /* Pre-fill the pipe so the read will succeed immediately */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "blocking", 8), 8);
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "blocking-fd-preserve", blocking_fd_preserve_fiber, pipefd, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+
+        /* Verify the read end is still in blocking mode after the fiber completed */
+        ASSERT_OK_ZERO(fd_nonblock(pipefd[0], false));
+}
+
+static int socket_connect_blocking_fiber(void *userdata) {
+        struct sockaddr_un *addr = userdata;
+        _cleanup_close_ int sockfd = -EBADF;
+
+        /* Use a blocking socket (no SOCK_NONBLOCK). sd_fiber_connect() should temporarily set it
+         * to nonblocking, handle the EINPROGRESS path with getsockopt(SO_ERROR), and restore
+         * the blocking state. */
+        sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
+        if (sockfd < 0)
+                return -errno;
+
+        int r = sd_fiber_connect(sockfd, (struct sockaddr*) addr, sizeof(*addr));
+        if (r < 0)
+                return r;
+
+        /* Verify the socket is back in blocking mode */
+        r = fd_nonblock(sockfd, false);
+        if (r < 0)
+                return r;
+        if (r > 0)
+                return -EBUSY; /* fd was nonblocking, but should have been restored to blocking */
+
+        return 0;
+}
+
+TEST(fiber_io_connect_blocking) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        /* Create listening socket */
+        _cleanup_close_ int listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+        ASSERT_OK(listen_fd);
+
+        struct sockaddr_un addr = {
+                .sun_family = AF_UNIX,
+        };
+        addr.sun_path[0] = '\0';
+        snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-connect-blocking-%d", getpid());
+
+        ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)));
+        ASSERT_OK(listen(listen_fd, 1));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "connect-blocking", socket_connect_blocking_fiber, &addr, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+DEFINE_TEST_MAIN(LOG_DEBUG);
index 9d0d03acf48a7544e846535091d56c57640215d2..5e0fa2252561591b82f25553ccb10c147e3ca98f 100644 (file)
   along with systemd; If not, see <https://www.gnu.org/licenses/>.
 ***/
 
+#include <sys/socket.h>
+
 #include "_sd-common.h"
 
 _SD_BEGIN_DECLARATIONS;
 
+struct iovec;
+struct pollfd;
+struct sockaddr;
+struct msghdr;
+struct timespec;
+
 typedef struct sd_event sd_event;
 typedef struct sd_future sd_future;
 typedef struct sd_future_ops sd_future_ops;
@@ -96,6 +104,23 @@ sd_future* sd_fiber_timeout(uint64_t timeout);
              _SD_CONCATENATE(_sd_fto_b_, uniq);                                                                                                                 \
              _SD_CONCATENATE(_sd_fto_b_, uniq) = NULL)
 
+/* Fiber I/O operations - use sd-event for non-blocking I/O when in fiber context */
+ssize_t sd_fiber_read(int fd, void *buf, size_t count);
+ssize_t sd_fiber_write(int fd, const void *buf, size_t count);
+ssize_t sd_fiber_readv(int fd, const struct iovec *iov, int iovcnt);
+ssize_t sd_fiber_writev(int fd, const struct iovec *iov, int iovcnt);
+ssize_t sd_fiber_recv(int sockfd, void *buf, size_t len, int flags);
+ssize_t sd_fiber_send(int sockfd, const void *buf, size_t len, int flags);
+int sd_fiber_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
+ssize_t sd_fiber_recvmsg(int sockfd, struct msghdr *msg, int flags);
+ssize_t sd_fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags);
+ssize_t sd_fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
+ssize_t sd_fiber_sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
+int sd_fiber_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
+#ifndef __STRICT_ANSI__
+int sd_fiber_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask);
+#endif
+
 _SD_END_DECLARATIONS;
 
 #endif