]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-future: make src/basic blocking helpers fiber-aware
authorDaan De Meyer <daan@amutable.com>
Sat, 25 Apr 2026 20:06:54 +0000 (22:06 +0200)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
Some helpers in src/basic — ppoll_usec_full() (used by fd_wait_for_event()),
loop_read(), loop_read_exact(), loop_write_full() and
pidref_wait_for_terminate_full() — block the calling thread. That's the
right behaviour outside a fiber but not inside one, where blocking the
thread also stalls every other fiber running on the same event loop.
Rewriting every caller to pick a fiber or non-fiber variant explicitly
would be a lot of churn and would split otherwise-shared code paths in
two.

Instead, the helpers detect at runtime whether they're running on a fiber
and dispatch to a suspending variant when they are. FiberOps in
fiber-ops.h holds five function pointers (ppoll, read, write, timeout,
cancel_wait_unref); a fiber_ops global constant is populated whenever we
enter a fiber with functions that delegate to suspending variants of common
syscalls. With this approach, the variants themselves stay in libsystemd
which is required because they make use of sd-event.

- loop_read()/loop_read_exact() take the fiber read hook on a fiber
  unless the caller asked for a non-blocking attempt (do_poll=false) and
  the fd is already non-blocking — in that case we fall through to read()
  to preserve the existing return-EAGAIN-immediately semantic. The hook
  itself suspends on EAGAIN until data is available, so neither the
  do_poll knob nor the explicit fd_wait_for_event() retry loop are
  needed on the fiber path.

- loop_write_full() likewise takes the fiber write hook on a fiber,
  except when timeout=0 with an already-non-blocking fd (preserving the
  fast-return-EAGAIN semantic). The fiber path runs inside a
  FIBER_OPS_TIMEOUT() scope so the caller's timeout is honoured via a
  deadline future, mirroring SD_FIBER_TIMEOUT() but reachable from
  src/basic without pulling in sd-future.h.

- pidref_wait_for_terminate_full() polls the pidfd via fd_wait_for_event()
  before each waitid() when either a finite timeout is set or we're on a
  fiber, and requires pidref->fd >= 0 in those cases (returning
  -ENOMEDIUM otherwise — extending the rule that already applied to
  finite timeouts). The poll suspends the fiber via the ppoll hook above;
  the subsequent waitid() doesn't block because the pidfd is already
  signalled.

src/basic/basic-forward.h
src/basic/fiber-ops.c [new file with mode: 0644]
src/basic/fiber-ops.h [new file with mode: 0644]
src/basic/io-util.c
src/basic/meson.build
src/basic/pidref.c
src/libsystemd/meson.build
src/libsystemd/sd-future/fiber.c
src/libsystemd/sd-future/test-fiber-ops.c [new file with mode: 0644]

index 5f9109cb17c1e0d9650931e57a849a4c5c13e243..6fa9f3bf868de48548c8c6d3da2c54307b3c7abb 100644 (file)
@@ -112,6 +112,7 @@ typedef enum UnitType UnitType;
 typedef enum WaitFlags WaitFlags;
 
 typedef struct Fiber Fiber;
+typedef struct FiberOps FiberOps;
 typedef struct Hashmap Hashmap;
 typedef struct HashmapBase HashmapBase;
 typedef struct IteratedCache IteratedCache;
diff --git a/src/basic/fiber-ops.c b/src/basic/fiber-ops.c
new file mode 100644 (file)
index 0000000..330630f
--- /dev/null
@@ -0,0 +1,51 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#include <poll.h>
+#include <threads.h>
+#include <unistd.h>
+
+#include "errno-util.h"
+#include "fiber-ops.h"
+
+static thread_local const FiberOps *fiber_ops = NULL;
+
+bool fiber_ops_is_set(void) {
+        return fiber_ops != NULL;
+}
+
+void fiber_ops_set(const FiberOps *ops) {
+        fiber_ops = ops;
+}
+
+int fiber_ops_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask) {
+        if (fiber_ops)
+                return fiber_ops->ppoll(fds, n_fds, timeout, sigmask);
+
+        return RET_NERRNO(ppoll(fds, n_fds, timeout, sigmask));
+}
+
+ssize_t fiber_ops_read(int fd, void *buf, size_t count) {
+        if (fiber_ops)
+                return fiber_ops->read(fd, buf, count);
+
+        ssize_t n = read(fd, buf, count);
+        return n < 0 ? -errno : n;
+}
+
+ssize_t fiber_ops_write(int fd, const void *buf, size_t count) {
+        if (fiber_ops)
+                return fiber_ops->write(fd, buf, count);
+
+        return RET_NERRNO(write(fd, buf, count));
+}
+
+sd_future* fiber_ops_timeout(uint64_t timeout) {
+        assert(fiber_ops);
+
+        return fiber_ops->timeout(timeout);
+}
+
+sd_future* fiber_ops_cancel_wait_unref(sd_future *f) {
+        assert(fiber_ops);
+
+        return fiber_ops->cancel_wait_unref(f);
+}
diff --git a/src/basic/fiber-ops.h b/src/basic/fiber-ops.h
new file mode 100644 (file)
index 0000000..64bd823
--- /dev/null
@@ -0,0 +1,34 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+#include "basic-forward.h"
+
+typedef struct sd_future sd_future;
+
+/* Hooks installed on a fiber so that functions in src/basic can transparently defer to the suspending
+ * variants in sd-future when invoked from a running fiber. Populated by sd_fiber_new() with pointers to the
+ * implementations in fiber-ops.c. */
+typedef struct FiberOps {
+        int (*ppoll)(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask);
+        ssize_t (*read)(int fd, void *buf, size_t count);
+        ssize_t (*write)(int fd, const void *buf, size_t count);
+        sd_future* (*timeout)(uint64_t timeout);
+        sd_future* (*cancel_wait_unref)(sd_future *f);
+} FiberOps;
+
+bool fiber_ops_is_set(void);
+void fiber_ops_set(const FiberOps *fiber_ops);
+
+int fiber_ops_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask);
+ssize_t fiber_ops_read(int fd, void *buf, size_t count);
+ssize_t fiber_ops_write(int fd, const void *buf, size_t count);
+
+/* Mirror of SD_FIBER_TIMEOUT() for code under src/basic that doesn't include sd-future.h: dispatches
+ * through FiberOps so the actual sd_fiber_timeout() implementation lives in libsystemd. */
+sd_future* fiber_ops_timeout(uint64_t timeout);
+sd_future* fiber_ops_cancel_wait_unref(sd_future *f);
+DEFINE_TRIVIAL_CLEANUP_FUNC(sd_future*, fiber_ops_cancel_wait_unref);
+
+#define FIBER_OPS_TIMEOUT(timeout) _FIBER_OPS_TIMEOUT(UNIQ, (timeout))
+#define _FIBER_OPS_TIMEOUT(uniq, timeout)                                                                                               \
+        _unused_ _cleanup_(fiber_ops_cancel_wait_unrefp) sd_future *UNIQ_T(_fot_, uniq) = fiber_ops_timeout(timeout)
index b4f643b5721aa191ee4ec892e6f9f6757b458c30..3dbc3670a4a857dcfae8e9d88c726e2fc2ed33e9 100644 (file)
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
+#include <fcntl.h>
 #include <poll.h>
 #include <stdio.h>
 #include <string.h>
@@ -8,6 +9,7 @@
 #include <unistd.h>
 
 #include "errno-util.h"
+#include "fiber-ops.h"
 #include "io-util.h"
 #include "time-util.h"
 
@@ -69,25 +71,44 @@ ssize_t loop_read(int fd, void *buf, size_t nbytes, bool do_poll) {
         if (nbytes > (size_t) SSIZE_MAX)
                 return -EINVAL;
 
+        /* do_poll == false means "don't wait, return what we have if EAGAIN". If the fd is already
+         * non-blocking, read() can't block the thread, so the non-fiber path satisfies that semantic
+         * correctly even from a fiber. Only use the fiber path when the fd is blocking (where read()
+         * would otherwise block the entire event loop). */
+        int flags = 0;
+        if (fiber_ops_is_set() && !do_poll) {
+                flags = fcntl(fd, F_GETFL);
+                if (flags < 0)
+                        return -errno;
+        }
+
         do {
                 ssize_t k;
 
-                k = read(fd, p, nbytes);
-                if (k < 0) {
-                        if (errno == EINTR)
-                                continue;
-
-                        if (errno == EAGAIN && do_poll) {
-
-                                /* We knowingly ignore any return value here,
-                                 * and expect that any error/EOF is reported
-                                 * via read() */
-
-                                (void) fd_wait_for_event(fd, POLLIN, USEC_INFINITY);
-                                continue;
+                if (fiber_ops_is_set() && (do_poll || !FLAGS_SET(flags, O_NONBLOCK))) {
+                        /* On a fiber the read op suspends on EAGAIN until data is available, so we don't
+                         * need a separate poll step or the do_poll knob. */
+                        k = fiber_ops_read(fd, p, nbytes);
+                        if (k < 0)
+                                return n > 0 ? n : k;
+                } else {
+                        k = read(fd, p, nbytes);
+                        if (k < 0) {
+                                if (errno == EINTR)
+                                        continue;
+
+                                if (errno == EAGAIN && do_poll) {
+
+                                        /* We knowingly ignore any return value here,
+                                         * and expect that any error/EOF is reported
+                                         * via read() */
+
+                                        (void) fd_wait_for_event(fd, POLLIN, USEC_INFINITY);
+                                        continue;
+                                }
+
+                                return n > 0 ? n : -errno;
                         }
-
-                        return n > 0 ? n : -errno;
                 }
 
                 if (k == 0)
@@ -137,6 +158,37 @@ int loop_write_full(int fd, const void *buf, size_t nbytes, usec_t timeout) {
                 p = buf;
         }
 
+        /* timeout == 0 means "don't wait, return -EAGAIN if not ready". If the fd is already
+         * non-blocking, write() can't block the thread, so the non-fiber path satisfies that
+         * semantic correctly even from a fiber. Only use the fiber path when the fd is blocking
+         * (where write() would otherwise block the entire event loop). */
+        int flags = 0;
+        if (fiber_ops_is_set() && timeout == 0) {
+                flags = fcntl(fd, F_GETFL);
+                if (flags < 0)
+                        return -errno;
+        }
+
+        if (fiber_ops_is_set() && !FLAGS_SET(flags, O_NONBLOCK)) {
+                /* On a fiber the write op suspends on EAGAIN until the fd is writable; honor the
+                 * caller's timeout via a deadline scope. */
+                FIBER_OPS_TIMEOUT(timestamp_is_set(timeout) ? timeout : USEC_INFINITY);
+
+                while (nbytes > 0) {
+                        ssize_t k = fiber_ops_write(fd, p, nbytes);
+                        if (k < 0)
+                                return (int) k;
+                        if (_unlikely_(nbytes > 0 && k == 0)) /* Can't really happen */
+                                return -EIO;
+
+                        assert((size_t) k <= nbytes);
+                        p += k;
+                        nbytes -= k;
+                }
+
+                return 0;
+        }
+
         /* When timeout is 0 or USEC_INFINITY this is not used. But we initialize it to a sensible value. */
         end = timestamp_is_set(timeout) ? usec_add(now(CLOCK_MONOTONIC), timeout) : USEC_INFINITY;
 
@@ -220,11 +272,9 @@ int ppoll_usec_full(struct pollfd *fds, size_t n_fds, usec_t timeout, const sigs
         if (n_fds == 0 && timeout == 0)
                 return 0;
 
-        r = ppoll(fds, n_fds, timeout == USEC_INFINITY ? NULL : TIMESPEC_STORE(timeout), ss);
-        if (r < 0)
-                return -errno;
-        if (r == 0)
-                return 0;
+        r = fiber_ops_ppoll(fds, n_fds, timeout == USEC_INFINITY ? NULL : TIMESPEC_STORE(timeout), ss);
+        if (r <= 0)
+                return r;
 
         for (size_t i = 0, n = r; i < n_fds && n > 0; i++) {
                 if (fds[i].revents == 0)
index 7cc62c7b8bdac87a49e33539ec9b9a723e653d7c..bae72fbcb27d1216270866a57f36a7b0fbe3e2d8 100644 (file)
@@ -36,6 +36,7 @@ basic_sources = files(
         'ether-addr-util.c',
         'extract-word.c',
         'fd-util.c',
+        'fiber-ops.c',
         'fileio.c',
         'filesystems.c',
         'format-ifname.c',
index 10ff9a63b12bce6d0183d51a3bd00771a7e211d2..33131c0586d1204da7145b1e4f62062b92a51f14 100644 (file)
@@ -7,6 +7,7 @@
 #include "alloc-util.h"
 #include "errno-util.h"
 #include "fd-util.h"
+#include "fiber-ops.h"
 #include "format-util.h"
 #include "hash-funcs.h"
 #include "io-util.h"
@@ -466,16 +467,28 @@ int pidref_wait_for_terminate_full(PidRef *pidref, usec_t timeout, siginfo_t *re
         if (pidref->pid == 1 || pidref_is_self(pidref))
                 return -ECHILD;
 
-        if (timeout != USEC_INFINITY && pidref->fd < 0)
+        if (pidref->fd < 0 && (timeout != USEC_INFINITY || fiber_ops_is_set()))
                 return -ENOMEDIUM;
 
         usec_t ts = timeout == USEC_INFINITY ? USEC_INFINITY : usec_add(now(CLOCK_MONOTONIC), timeout);
 
+        /* Poll the pidfd before waitid() if either there's a finite timeout (so we can honor it) or
+         * we're on a fiber (so fd_wait_for_event() can suspend us instead of blocking the event loop
+         * inside waitid()). Otherwise let waitid() block directly. The precondition above guarantees
+         * pidref->fd >= 0 in both cases. */
+        bool poll_first = ts != USEC_INFINITY || fiber_ops_is_set();
+
         for (;;) {
-                if (ts != USEC_INFINITY) {
-                        usec_t left = usec_sub_unsigned(ts, now(CLOCK_MONOTONIC));
-                        if (left == 0)
-                                return -ETIMEDOUT;
+                if (poll_first) {
+                        usec_t left;
+
+                        if (ts == USEC_INFINITY)
+                                left = USEC_INFINITY;
+                        else {
+                                left = usec_sub_unsigned(ts, now(CLOCK_MONOTONIC));
+                                if (left == 0)
+                                        return -ETIMEDOUT;
+                        }
 
                         r = fd_wait_for_event(pidref->fd, POLLIN, left);
                         if (r == 0)
index 3c624051f0ded8a45ac5d08d093f30370974f78e..91a78155d2d3b24e8501de90653fa894bedf0738 100644 (file)
@@ -192,6 +192,7 @@ simple_tests += files(
         'sd-device/test-sd-device-monitor.c',
         'sd-future/test-fiber.c',
         'sd-future/test-fiber-io.c',
+        'sd-future/test-fiber-ops.c',
         'sd-hwdb/test-sd-hwdb.c',
         'sd-id128/test-id128.c',
         'sd-journal/test-audit-type.c',
index 48d26a27dd2c704a244148ca9402b17609a9c942..64dee8411df224a52e78a74d66ce2e20effffdd5 100644 (file)
@@ -20,6 +20,7 @@
 #include "architecture.h"
 #include "errno-util.h"
 #include "event-future.h"
+#include "fiber-ops.h"
 #include "log-context.h"
 #include "log.h"
 #include "memory-util.h"
@@ -270,8 +271,10 @@ static void reset_current_fiber(void) {
          * current_fiber. Without this, the child of a fork() that happened mid-fiber would inherit the
          * fiber's log prefix / context list in its thread-locals even though no fiber is running. */
         Fiber *f = fiber_get_current();
-        if (f)
+        if (f) {
                 fiber_swap_log_state(f);
+                fiber_ops_set(NULL);
+        }
         fiber_set_current(NULL);
 }
 
@@ -307,9 +310,19 @@ static void fiber_resolve(sd_future *f) {
         sd_future_resolve(f, fiber->result);
 }
 
+static const FiberOps fiber_ops = {
+        .ppoll = sd_fiber_ppoll,
+        .read = sd_fiber_read,
+        .write = sd_fiber_write,
+        .timeout = sd_fiber_timeout,
+        .cancel_wait_unref = sd_future_cancel_wait_unref,
+};
+
 static void fiber_enter(Fiber *fiber, Fiber *prev, void **fake_stack_save) {
         fiber_set_current(fiber);
         fiber_swap_log_state(fiber);
+        if (!prev)
+                fiber_ops_set(&fiber_ops);
 
         struct iovec fiber_stack = fiber_stack_usable(&fiber->stack);
         start_switch_stack(fake_stack_save, &fiber_stack);
@@ -318,6 +331,8 @@ static void fiber_enter(Fiber *fiber, Fiber *prev, void **fake_stack_save) {
 
 static void fiber_leave(Fiber *fiber, Fiber *prev, void *fake_stack_save) {
         finish_switch_stack(fake_stack_save);
+        if (!prev)
+                fiber_ops_set(NULL);
         fiber_swap_log_state(fiber);
         fiber_set_current(prev);
 }
diff --git a/src/libsystemd/sd-future/test-fiber-ops.c b/src/libsystemd/sd-future/test-fiber-ops.c
new file mode 100644 (file)
index 0000000..f2c603f
--- /dev/null
@@ -0,0 +1,574 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <fcntl.h>
+#include <poll.h>
+#include <unistd.h>
+
+#include "sd-event.h"
+#include "sd-future.h"
+
+#include "alloc-util.h"
+#include "cleanup-util.h"
+#include "fd-util.h"
+#include "io-util.h"
+#include "pidref.h"
+#include "process-util.h"
+#include "tests.h"
+#include "time-util.h"
+
+/* Test: wait_for_terminate basic functionality */
+static int wait_simple_fiber(void *userdata) {
+        _cleanup_(pidref_done_sigkill_wait) PidRef pidref = PIDREF_NULL;
+        siginfo_t si;
+        int r;
+
+        /* Fork a child that exits immediately */
+        r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidref);
+        if (r < 0)
+                return r;
+
+        if (r == 0)
+                _exit(42);
+
+        /* Parent - wait for child */
+        r = pidref_wait_for_terminate(&pidref, &si);
+        if (r < 0)
+                return r;
+
+        pidref_done(&pidref);
+
+        /* Verify child exited with status 42 */
+        if (si.si_code != CLD_EXITED || si.si_status != 42)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(wait_for_terminate_fiber_basic) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "wait-simple", wait_simple_fiber, NULL, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+/* Test: wait_for_terminate with multiple children */
+static int wait_multiple_fiber(void *userdata) {
+        PidRef pidrefs[3] = { PIDREF_NULL, PIDREF_NULL, PIDREF_NULL };
+        siginfo_t si;
+        int r;
+
+        /* Fork three children with different exit codes */
+        for (size_t i = 0; i < 3; i++) {
+                r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidrefs[i]);
+                if (r < 0)
+                        goto cleanup;
+
+                if (r == 0)
+                        /* Child process */
+                        _exit(10 + i);
+        }
+
+        /* Wait for all three in order */
+        for (size_t i = 0; i < 3; i++) {
+                r = pidref_wait_for_terminate(&pidrefs[i], &si);
+                if (r < 0)
+                        goto cleanup;
+
+                pidref_done(&pidrefs[i]);
+
+                if (si.si_code != CLD_EXITED || si.si_status != (int) (10 + i)) {
+                        r = -EIO;
+                        goto cleanup;
+                }
+        }
+
+        return 0;
+
+cleanup:
+        for (size_t i = 0; i < 3; i++)
+                pidref_done(&pidrefs[i]);
+
+        return r;
+}
+
+TEST(wait_for_terminate_fiber_multiple) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "wait-multiple", wait_multiple_fiber, NULL, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(f));
+}
+
+static int concurrent_wait_fiber(void *userdata) {
+        _cleanup_(pidref_done_sigkill_wait) PidRef pidref = PIDREF_NULL;
+        siginfo_t si;
+        int r;
+
+        r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidref);
+        if (r < 0)
+                return r;
+
+        if (r == 0)
+                /* Child exits with specified status */
+                _exit(PTR_TO_INT(userdata));
+
+        r = pidref_wait_for_terminate(&pidref, &si);
+        if (r < 0)
+                return r;
+
+        pidref_done(&pidref);
+
+        if (si.si_code != CLD_EXITED || si.si_status != PTR_TO_INT(userdata))
+                return -EIO;
+
+        return 0;
+}
+
+TEST(wait_for_terminate_fiber_concurrent) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        sd_future *fibers[3] = {};
+        CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear);
+
+        /* Create 3 fibers, each waiting for a different child */
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++)
+                ASSERT_OK(sd_fiber_new(e, "concurrent-wait", concurrent_wait_fiber, INT_TO_PTR(20 + i), /* destroy= */ NULL, &fibers[i]));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        /* All fibers should complete successfully */
+        for (size_t i = 0; i < ELEMENTSOF(fibers); i++)
+                ASSERT_OK(sd_future_result(fibers[i]));
+}
+
+typedef struct LoopIOContext {
+        int *pipefd;
+        const char *data;
+        size_t len;
+        int order;
+} LoopIOContext;
+
+static int loop_read_suspend_fiber(void *userdata) {
+        LoopIOContext *ctx = ASSERT_PTR(userdata);
+        char buf[64];
+
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        ssize_t n = loop_read(ctx->pipefd[0], buf, sizeof(buf), /* do_poll= */ true);
+
+        /* While we were suspended, the writer fiber should have run. */
+        ASSERT_EQ(ctx->order, 2);
+
+        if (n < 0)
+                return (int) n;
+        if ((size_t) n != ctx->len || memcmp(buf, ctx->data, ctx->len) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+static int loop_write_suspend_fiber(void *userdata) {
+        LoopIOContext *ctx = ASSERT_PTR(userdata);
+
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        int r = loop_write(ctx->pipefd[1], ctx->data, ctx->len);
+        if (r < 0)
+                return r;
+
+        /* Close the write end so the reader sees EOF after reading the data. */
+        ctx->pipefd[1] = safe_close(ctx->pipefd[1]);
+        return 0;
+}
+
+/* Test: two fibers cooperatively pass a small payload through a blocking pipe using the suspending
+ * loop helpers. Exercises the non-blocking flip, event-loop yielding, and the blocking-mode restore. */
+TEST(loop_read_write_suspend) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+
+        static const char payload[] = "loop-suspend";
+        LoopIOContext ctx = {
+                .pipefd = pipefd,
+                .data = payload,
+                .len = sizeof(payload) - 1,
+        };
+
+        _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-read", loop_read_suspend_fiber, &ctx, /* destroy= */ NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 0));
+        ASSERT_OK(sd_fiber_new(e, "loop-write", loop_write_suspend_fiber, &ctx, /* destroy= */ NULL, &fw));
+        ASSERT_OK(sd_future_set_priority(fw, 1));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.len);
+        ASSERT_OK_ZERO(sd_future_result(fw));
+
+        /* The read fd started out blocking and loop_read() must have restored it before returning. */
+        ASSERT_OK_ZERO(fcntl(pipefd[0], F_GETFL) & O_NONBLOCK);
+}
+
+static int loop_read_exact_short_fiber(void *userdata) {
+        int fd = PTR_TO_INT(userdata);
+        char buf[16];
+
+        /* Requesting more bytes than the peer writes should return -EIO once EOF is hit. */
+        return loop_read_exact(fd, buf, sizeof(buf), /* do_poll= */ true);
+}
+
+/* Test: loop_read_exact() returns -EIO when the peer closes early. */
+TEST(loop_read_exact_short) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-read-exact", loop_read_exact_short_fiber,
+                               INT_TO_PTR(pipefd[0]), /* destroy= */ NULL, &f));
+
+        /* Write a few bytes and close the write end — less than the fiber asked for. */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "abc", 3), (ssize_t) 3);
+        pipefd[1] = safe_close(pipefd[1]);
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_ERROR(sd_future_result(f), EIO);
+}
+
+typedef struct LoopWriteTimeoutContext {
+        int fd;
+        int result;
+} LoopWriteTimeoutContext;
+
+static int loop_write_timeout_fiber(void *userdata) {
+        LoopWriteTimeoutContext *ctx = ASSERT_PTR(userdata);
+
+        /* Try to write much more than the pipe buffer can hold with a short timeout. The write will
+         * succeed partially and then hit -ETIME after exhausting the timeout while blocked. */
+        static const char big_buf[128 * 1024] = { 0 };
+        ctx->result = loop_write_full(ctx->fd, big_buf, sizeof(big_buf), 100 * USEC_PER_MSEC);
+        return 0;
+}
+
+/* Test: loop_write_full() returns -ETIME when the peer never drains. */
+TEST(loop_write_full_timeout) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+
+        /* Shrink the pipe buffer to its minimum (one page) so the 128K write below is guaranteed to block
+         * regardless of the architecture's page size. The default pipe buffer is 16 pages, which on
+         * 64K-page architectures (e.g. ppc64le) is 1 MiB — enough to absorb the entire write without ever
+         * blocking, defeating the purpose of the timeout. */
+        ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1));
+
+        LoopWriteTimeoutContext ctx = { .fd = pipefd[1], .result = 0 };
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-write-timeout", loop_write_timeout_fiber, &ctx, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK_ZERO(sd_future_result(f));
+        ASSERT_ERROR(ctx.result, ETIME);
+}
+
+typedef struct PpollDispatchContext {
+        int *pipefd;
+        int order;
+} PpollDispatchContext;
+
+static int ppoll_dispatch_read_fiber(void *userdata) {
+        PpollDispatchContext *ctx = ASSERT_PTR(userdata);
+        struct pollfd pfd = {
+                .fd = ctx->pipefd[0],
+                .events = POLLIN,
+        };
+
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        /* Direct ppoll_usec() call from a fiber must dispatch through sd_fiber_poll(), suspending the
+         * fiber instead of blocking the entire thread. If dispatch fails, the writer fiber never gets a
+         * chance to run and the test deadlocks. */
+        int r = ppoll_usec(&pfd, 1, USEC_INFINITY);
+        if (r < 0)
+                return r;
+
+        ASSERT_EQ(ctx->order, 2);
+
+        if (r != 1 || !FLAGS_SET(pfd.revents, POLLIN))
+                return -EIO;
+
+        return 0;
+}
+
+static int ppoll_dispatch_write_fiber(void *userdata) {
+        PpollDispatchContext *ctx = ASSERT_PTR(userdata);
+
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        if (write(ctx->pipefd[1], "x", 1) < 0)
+                return -errno;
+
+        return 0;
+}
+
+/* Test: ppoll_usec() called from a fiber dispatches through the FiberOps hook to sd_fiber_poll(),
+ * yielding to the event loop instead of blocking. */
+TEST(ppoll_usec_dispatch) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        PpollDispatchContext ctx = { .pipefd = pipefd };
+
+        _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
+        ASSERT_OK(sd_fiber_new(e, "ppoll-read", ppoll_dispatch_read_fiber, &ctx, /* destroy= */ NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 0));
+        ASSERT_OK(sd_fiber_new(e, "ppoll-write", ppoll_dispatch_write_fiber, &ctx, /* destroy= */ NULL, &fw));
+        ASSERT_OK(sd_future_set_priority(fw, 1));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(fr));
+        ASSERT_OK(sd_future_result(fw));
+}
+
+static int loop_write_zero_timeout_nonblock_fiber(void *userdata) {
+        int fd = PTR_TO_INT(userdata);
+
+        /* Fill the pipe so the next write would block. The fd is non-blocking, so on a fiber
+         * loop_write_full(timeout=0) must take the non-fiber path and return -EAGAIN immediately
+         * rather than suspending. */
+        static const char big_buf[128 * 1024] = { 0 };
+        return loop_write_full(fd, big_buf, sizeof(big_buf), /* timeout= */ 0);
+}
+
+/* Test: timeout == 0 on a non-blocking fd from a fiber preserves the "don't wait" semantic and
+ * returns -EAGAIN when the pipe buffer is full, instead of suspending the fiber. */
+TEST(loop_write_zero_timeout_nonblock) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+        ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-write-zt-nb", loop_write_zero_timeout_nonblock_fiber,
+                               INT_TO_PTR(pipefd[1]), /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_ERROR(sd_future_result(f), EAGAIN);
+}
+
+typedef struct LoopWriteZeroBlockingContext {
+        int *pipefd;
+        size_t total;
+        int order;
+} LoopWriteZeroBlockingContext;
+
+static int loop_write_zero_blocking_writer_fiber(void *userdata) {
+        LoopWriteZeroBlockingContext *ctx = ASSERT_PTR(userdata);
+
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        /* timeout == 0 on a *blocking* fd from a fiber: the fast EAGAIN return isn't possible, so
+         * loop_write_full() takes the fiber path. The reader fiber drains the pipe, letting our
+         * write complete via fiber suspension/resume. */
+        _cleanup_free_ char *big_buf = malloc0(ctx->total);
+        ASSERT_NOT_NULL(big_buf);
+        int r = loop_write_full(ctx->pipefd[1], big_buf, ctx->total, /* timeout= */ 0);
+
+        ASSERT_EQ(ctx->order, 2);
+        return r;
+}
+
+static int loop_write_zero_blocking_reader_fiber(void *userdata) {
+        LoopWriteZeroBlockingContext *ctx = ASSERT_PTR(userdata);
+
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        _cleanup_free_ char *buf = malloc(ctx->total);
+        ASSERT_NOT_NULL(buf);
+        ssize_t n = loop_read(ctx->pipefd[0], buf, ctx->total, /* do_poll= */ true);
+        if (n < 0)
+                return (int) n;
+        return (int) n;
+}
+
+/* Test: timeout == 0 on a blocking fd from a fiber takes the fiber path (suspends until the peer
+ * drains) instead of blocking the entire thread. */
+TEST(loop_write_zero_timeout_blocking) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+        ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1));
+
+        /* F_SETPIPE_SZ rounds up to the kernel's pipe minimum (typically a page); query the actual
+         * size and write more than that, so the write must wait on the reader regardless of page size. */
+        int pipe_sz = fcntl(pipefd[1], F_GETPIPE_SZ);
+        ASSERT_OK_ERRNO(pipe_sz);
+
+        LoopWriteZeroBlockingContext ctx = { .pipefd = pipefd, .total = (size_t) pipe_sz * 2 };
+
+        _cleanup_(sd_future_unrefp) sd_future *fw = NULL, *fr = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-write-zt-blk", loop_write_zero_blocking_writer_fiber,
+                               &ctx, /* destroy= */ NULL, &fw));
+        ASSERT_OK(sd_future_set_priority(fw, 0));
+        ASSERT_OK(sd_fiber_new(e, "loop-read-zt-blk", loop_write_zero_blocking_reader_fiber,
+                               &ctx, /* destroy= */ NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 1));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK(sd_future_result(fw));
+        ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.total);
+}
+
+static int loop_read_no_poll_nonblock_fiber(void *userdata) {
+        int fd = PTR_TO_INT(userdata);
+        char buf[64];
+
+        /* Empty non-blocking pipe + do_poll=false: on a fiber loop_read() must take the non-fiber
+         * path and return -EAGAIN immediately rather than suspending. */
+        return (int) loop_read(fd, buf, sizeof(buf), /* do_poll= */ false);
+}
+
+/* Test: do_poll == false on a non-blocking fd from a fiber preserves the "don't wait" semantic
+ * and returns -EAGAIN when no data is available, instead of suspending the fiber. */
+TEST(loop_read_no_poll_nonblock) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-read-np-nb", loop_read_no_poll_nonblock_fiber,
+                               INT_TO_PTR(pipefd[0]), /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_ERROR(sd_future_result(f), EAGAIN);
+}
+
+typedef struct LoopReadNoPollBlockingContext {
+        int *pipefd;
+        const char *data;
+        size_t len;
+        int order;
+} LoopReadNoPollBlockingContext;
+
+static int loop_read_no_poll_blocking_reader_fiber(void *userdata) {
+        LoopReadNoPollBlockingContext *ctx = ASSERT_PTR(userdata);
+        char buf[64];
+
+        ASSERT_EQ(ctx->order, 0);
+        ctx->order = 1;
+
+        /* do_poll == false on a *blocking* fd from a fiber: the fast EAGAIN return isn't possible,
+         * so loop_read() takes the fiber path and suspends until the writer fiber feeds data. */
+        ssize_t n = loop_read(ctx->pipefd[0], buf, sizeof(buf), /* do_poll= */ false);
+
+        ASSERT_EQ(ctx->order, 2);
+
+        if (n < 0)
+                return (int) n;
+        if ((size_t) n != ctx->len || memcmp(buf, ctx->data, ctx->len) != 0)
+                return -EIO;
+
+        return (int) n;
+}
+
+static int loop_read_no_poll_blocking_writer_fiber(void *userdata) {
+        LoopReadNoPollBlockingContext *ctx = ASSERT_PTR(userdata);
+
+        ASSERT_EQ(ctx->order, 1);
+        ctx->order = 2;
+
+        int r = loop_write(ctx->pipefd[1], ctx->data, ctx->len);
+        if (r < 0)
+                return r;
+
+        ctx->pipefd[1] = safe_close(ctx->pipefd[1]);
+        return 0;
+}
+
+/* Test: do_poll == false on a blocking fd from a fiber takes the fiber path (suspends until the
+ * peer feeds data) instead of blocking the entire thread. */
+TEST(loop_read_no_poll_blocking) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+
+        static const char payload[] = "no-poll";
+        LoopReadNoPollBlockingContext ctx = {
+                .pipefd = pipefd,
+                .data = payload,
+                .len = sizeof(payload) - 1,
+        };
+
+        _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
+        ASSERT_OK(sd_fiber_new(e, "loop-read-np-blk", loop_read_no_poll_blocking_reader_fiber,
+                               &ctx, /* destroy= */ NULL, &fr));
+        ASSERT_OK(sd_future_set_priority(fr, 0));
+        ASSERT_OK(sd_fiber_new(e, "loop-write-np-blk", loop_read_no_poll_blocking_writer_fiber,
+                               &ctx, /* destroy= */ NULL, &fw));
+        ASSERT_OK(sd_future_set_priority(fw, 1));
+
+        ASSERT_OK(sd_event_loop(e));
+        ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.len);
+        ASSERT_OK_ZERO(sd_future_result(fw));
+}
+
+/* Test: loop_*() helpers transparently fall back to blocking I/O when called outside any
+ * fiber context. */
+TEST(loop_read_write_fallback) {
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
+
+        ASSERT_OK(loop_write(pipefd[1], "fallback", STRLEN("fallback")));
+
+        char buf[16];
+        ssize_t n = loop_read(pipefd[0], buf, STRLEN("fallback"), /* do_poll= */ true);
+        ASSERT_OK_EQ(n, (ssize_t) STRLEN("fallback"));
+        ASSERT_EQ(memcmp(buf, "fallback", STRLEN("fallback")), 0);
+}
+
+DEFINE_TEST_MAIN(LOG_DEBUG);