From: Daan De Meyer Date: Sat, 25 Apr 2026 20:06:54 +0000 (+0200) Subject: sd-future: make src/basic blocking helpers fiber-aware X-Git-Tag: v261-rc1~40^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7bc793e21f2d4bf67bd311545270bc515fe63ad9;p=thirdparty%2Fsystemd.git sd-future: make src/basic blocking helpers fiber-aware Some helpers in src/basic — ppoll_usec_full() (used by fd_wait_for_event()), loop_read(), loop_read_exact(), loop_write_full() and pidref_wait_for_terminate_full() — block the calling thread. That's the right behaviour outside a fiber but not inside one, where blocking the thread also stalls every other fiber running on the same event loop. Rewriting every caller to pick a fiber or non-fiber variant explicitly would be a lot of churn and would split otherwise-shared code paths in two. Instead, the helpers detect at runtime whether they're running on a fiber and dispatch to a suspending variant when they are. FiberOps in fiber-ops.h holds five function pointers (ppoll, read, write, timeout, cancel_wait_unref); a fiber_ops global constant is populated whenever we enter a fiber with functions that delegate to suspending variants of common syscalls. With this approach, the variants themselves stay in libsystemd which is required because they make use of sd-event. - loop_read()/loop_read_exact() take the fiber read hook on a fiber unless the caller asked for a non-blocking attempt (do_poll=false) and the fd is already non-blocking — in that case we fall through to read() to preserve the existing return-EAGAIN-immediately semantic. The hook itself suspends on EAGAIN until data is available, so neither the do_poll knob nor the explicit fd_wait_for_event() retry loop are needed on the fiber path. - loop_write_full() likewise takes the fiber write hook on a fiber, except when timeout=0 with an already-non-blocking fd (preserving the fast-return-EAGAIN semantic). The fiber path runs inside a FIBER_OPS_TIMEOUT() scope so the caller's timeout is honoured via a deadline future, mirroring SD_FIBER_TIMEOUT() but reachable from src/basic without pulling in sd-future.h. - pidref_wait_for_terminate_full() polls the pidfd via fd_wait_for_event() before each waitid() when either a finite timeout is set or we're on a fiber, and requires pidref->fd >= 0 in those cases (returning -ENOMEDIUM otherwise — extending the rule that already applied to finite timeouts). The poll suspends the fiber via the ppoll hook above; the subsequent waitid() doesn't block because the pidfd is already signalled. --- diff --git a/src/basic/basic-forward.h b/src/basic/basic-forward.h index 5f9109cb17c..6fa9f3bf868 100644 --- a/src/basic/basic-forward.h +++ b/src/basic/basic-forward.h @@ -112,6 +112,7 @@ typedef enum UnitType UnitType; typedef enum WaitFlags WaitFlags; typedef struct Fiber Fiber; +typedef struct FiberOps FiberOps; typedef struct Hashmap Hashmap; typedef struct HashmapBase HashmapBase; typedef struct IteratedCache IteratedCache; diff --git a/src/basic/fiber-ops.c b/src/basic/fiber-ops.c new file mode 100644 index 00000000000..330630f9dc6 --- /dev/null +++ b/src/basic/fiber-ops.c @@ -0,0 +1,51 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#include +#include +#include + +#include "errno-util.h" +#include "fiber-ops.h" + +static thread_local const FiberOps *fiber_ops = NULL; + +bool fiber_ops_is_set(void) { + return fiber_ops != NULL; +} + +void fiber_ops_set(const FiberOps *ops) { + fiber_ops = ops; +} + +int fiber_ops_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask) { + if (fiber_ops) + return fiber_ops->ppoll(fds, n_fds, timeout, sigmask); + + return RET_NERRNO(ppoll(fds, n_fds, timeout, sigmask)); +} + +ssize_t fiber_ops_read(int fd, void *buf, size_t count) { + if (fiber_ops) + return fiber_ops->read(fd, buf, count); + + ssize_t n = read(fd, buf, count); + return n < 0 ? -errno : n; +} + +ssize_t fiber_ops_write(int fd, const void *buf, size_t count) { + if (fiber_ops) + return fiber_ops->write(fd, buf, count); + + return RET_NERRNO(write(fd, buf, count)); +} + +sd_future* fiber_ops_timeout(uint64_t timeout) { + assert(fiber_ops); + + return fiber_ops->timeout(timeout); +} + +sd_future* fiber_ops_cancel_wait_unref(sd_future *f) { + assert(fiber_ops); + + return fiber_ops->cancel_wait_unref(f); +} diff --git a/src/basic/fiber-ops.h b/src/basic/fiber-ops.h new file mode 100644 index 00000000000..64bd82353ba --- /dev/null +++ b/src/basic/fiber-ops.h @@ -0,0 +1,34 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include "basic-forward.h" + +typedef struct sd_future sd_future; + +/* Hooks installed on a fiber so that functions in src/basic can transparently defer to the suspending + * variants in sd-future when invoked from a running fiber. Populated by sd_fiber_new() with pointers to the + * implementations in fiber-ops.c. */ +typedef struct FiberOps { + int (*ppoll)(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask); + ssize_t (*read)(int fd, void *buf, size_t count); + ssize_t (*write)(int fd, const void *buf, size_t count); + sd_future* (*timeout)(uint64_t timeout); + sd_future* (*cancel_wait_unref)(sd_future *f); +} FiberOps; + +bool fiber_ops_is_set(void); +void fiber_ops_set(const FiberOps *fiber_ops); + +int fiber_ops_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask); +ssize_t fiber_ops_read(int fd, void *buf, size_t count); +ssize_t fiber_ops_write(int fd, const void *buf, size_t count); + +/* Mirror of SD_FIBER_TIMEOUT() for code under src/basic that doesn't include sd-future.h: dispatches + * through FiberOps so the actual sd_fiber_timeout() implementation lives in libsystemd. */ +sd_future* fiber_ops_timeout(uint64_t timeout); +sd_future* fiber_ops_cancel_wait_unref(sd_future *f); +DEFINE_TRIVIAL_CLEANUP_FUNC(sd_future*, fiber_ops_cancel_wait_unref); + +#define FIBER_OPS_TIMEOUT(timeout) _FIBER_OPS_TIMEOUT(UNIQ, (timeout)) +#define _FIBER_OPS_TIMEOUT(uniq, timeout) \ + _unused_ _cleanup_(fiber_ops_cancel_wait_unrefp) sd_future *UNIQ_T(_fot_, uniq) = fiber_ops_timeout(timeout) diff --git a/src/basic/io-util.c b/src/basic/io-util.c index b4f643b5721..3dbc3670a4a 100644 --- a/src/basic/io-util.c +++ b/src/basic/io-util.c @@ -1,5 +1,6 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ +#include #include #include #include @@ -8,6 +9,7 @@ #include #include "errno-util.h" +#include "fiber-ops.h" #include "io-util.h" #include "time-util.h" @@ -69,25 +71,44 @@ ssize_t loop_read(int fd, void *buf, size_t nbytes, bool do_poll) { if (nbytes > (size_t) SSIZE_MAX) return -EINVAL; + /* do_poll == false means "don't wait, return what we have if EAGAIN". If the fd is already + * non-blocking, read() can't block the thread, so the non-fiber path satisfies that semantic + * correctly even from a fiber. Only use the fiber path when the fd is blocking (where read() + * would otherwise block the entire event loop). */ + int flags = 0; + if (fiber_ops_is_set() && !do_poll) { + flags = fcntl(fd, F_GETFL); + if (flags < 0) + return -errno; + } + do { ssize_t k; - k = read(fd, p, nbytes); - if (k < 0) { - if (errno == EINTR) - continue; - - if (errno == EAGAIN && do_poll) { - - /* We knowingly ignore any return value here, - * and expect that any error/EOF is reported - * via read() */ - - (void) fd_wait_for_event(fd, POLLIN, USEC_INFINITY); - continue; + if (fiber_ops_is_set() && (do_poll || !FLAGS_SET(flags, O_NONBLOCK))) { + /* On a fiber the read op suspends on EAGAIN until data is available, so we don't + * need a separate poll step or the do_poll knob. */ + k = fiber_ops_read(fd, p, nbytes); + if (k < 0) + return n > 0 ? n : k; + } else { + k = read(fd, p, nbytes); + if (k < 0) { + if (errno == EINTR) + continue; + + if (errno == EAGAIN && do_poll) { + + /* We knowingly ignore any return value here, + * and expect that any error/EOF is reported + * via read() */ + + (void) fd_wait_for_event(fd, POLLIN, USEC_INFINITY); + continue; + } + + return n > 0 ? n : -errno; } - - return n > 0 ? n : -errno; } if (k == 0) @@ -137,6 +158,37 @@ int loop_write_full(int fd, const void *buf, size_t nbytes, usec_t timeout) { p = buf; } + /* timeout == 0 means "don't wait, return -EAGAIN if not ready". If the fd is already + * non-blocking, write() can't block the thread, so the non-fiber path satisfies that + * semantic correctly even from a fiber. Only use the fiber path when the fd is blocking + * (where write() would otherwise block the entire event loop). */ + int flags = 0; + if (fiber_ops_is_set() && timeout == 0) { + flags = fcntl(fd, F_GETFL); + if (flags < 0) + return -errno; + } + + if (fiber_ops_is_set() && !FLAGS_SET(flags, O_NONBLOCK)) { + /* On a fiber the write op suspends on EAGAIN until the fd is writable; honor the + * caller's timeout via a deadline scope. */ + FIBER_OPS_TIMEOUT(timestamp_is_set(timeout) ? timeout : USEC_INFINITY); + + while (nbytes > 0) { + ssize_t k = fiber_ops_write(fd, p, nbytes); + if (k < 0) + return (int) k; + if (_unlikely_(nbytes > 0 && k == 0)) /* Can't really happen */ + return -EIO; + + assert((size_t) k <= nbytes); + p += k; + nbytes -= k; + } + + return 0; + } + /* When timeout is 0 or USEC_INFINITY this is not used. But we initialize it to a sensible value. */ end = timestamp_is_set(timeout) ? usec_add(now(CLOCK_MONOTONIC), timeout) : USEC_INFINITY; @@ -220,11 +272,9 @@ int ppoll_usec_full(struct pollfd *fds, size_t n_fds, usec_t timeout, const sigs if (n_fds == 0 && timeout == 0) return 0; - r = ppoll(fds, n_fds, timeout == USEC_INFINITY ? NULL : TIMESPEC_STORE(timeout), ss); - if (r < 0) - return -errno; - if (r == 0) - return 0; + r = fiber_ops_ppoll(fds, n_fds, timeout == USEC_INFINITY ? NULL : TIMESPEC_STORE(timeout), ss); + if (r <= 0) + return r; for (size_t i = 0, n = r; i < n_fds && n > 0; i++) { if (fds[i].revents == 0) diff --git a/src/basic/meson.build b/src/basic/meson.build index 7cc62c7b8bd..bae72fbcb27 100644 --- a/src/basic/meson.build +++ b/src/basic/meson.build @@ -36,6 +36,7 @@ basic_sources = files( 'ether-addr-util.c', 'extract-word.c', 'fd-util.c', + 'fiber-ops.c', 'fileio.c', 'filesystems.c', 'format-ifname.c', diff --git a/src/basic/pidref.c b/src/basic/pidref.c index 10ff9a63b12..33131c0586d 100644 --- a/src/basic/pidref.c +++ b/src/basic/pidref.c @@ -7,6 +7,7 @@ #include "alloc-util.h" #include "errno-util.h" #include "fd-util.h" +#include "fiber-ops.h" #include "format-util.h" #include "hash-funcs.h" #include "io-util.h" @@ -466,16 +467,28 @@ int pidref_wait_for_terminate_full(PidRef *pidref, usec_t timeout, siginfo_t *re if (pidref->pid == 1 || pidref_is_self(pidref)) return -ECHILD; - if (timeout != USEC_INFINITY && pidref->fd < 0) + if (pidref->fd < 0 && (timeout != USEC_INFINITY || fiber_ops_is_set())) return -ENOMEDIUM; usec_t ts = timeout == USEC_INFINITY ? USEC_INFINITY : usec_add(now(CLOCK_MONOTONIC), timeout); + /* Poll the pidfd before waitid() if either there's a finite timeout (so we can honor it) or + * we're on a fiber (so fd_wait_for_event() can suspend us instead of blocking the event loop + * inside waitid()). Otherwise let waitid() block directly. The precondition above guarantees + * pidref->fd >= 0 in both cases. */ + bool poll_first = ts != USEC_INFINITY || fiber_ops_is_set(); + for (;;) { - if (ts != USEC_INFINITY) { - usec_t left = usec_sub_unsigned(ts, now(CLOCK_MONOTONIC)); - if (left == 0) - return -ETIMEDOUT; + if (poll_first) { + usec_t left; + + if (ts == USEC_INFINITY) + left = USEC_INFINITY; + else { + left = usec_sub_unsigned(ts, now(CLOCK_MONOTONIC)); + if (left == 0) + return -ETIMEDOUT; + } r = fd_wait_for_event(pidref->fd, POLLIN, left); if (r == 0) diff --git a/src/libsystemd/meson.build b/src/libsystemd/meson.build index 3c624051f0d..91a78155d2d 100644 --- a/src/libsystemd/meson.build +++ b/src/libsystemd/meson.build @@ -192,6 +192,7 @@ simple_tests += files( 'sd-device/test-sd-device-monitor.c', 'sd-future/test-fiber.c', 'sd-future/test-fiber-io.c', + 'sd-future/test-fiber-ops.c', 'sd-hwdb/test-sd-hwdb.c', 'sd-id128/test-id128.c', 'sd-journal/test-audit-type.c', diff --git a/src/libsystemd/sd-future/fiber.c b/src/libsystemd/sd-future/fiber.c index 48d26a27dd2..64dee8411df 100644 --- a/src/libsystemd/sd-future/fiber.c +++ b/src/libsystemd/sd-future/fiber.c @@ -20,6 +20,7 @@ #include "architecture.h" #include "errno-util.h" #include "event-future.h" +#include "fiber-ops.h" #include "log-context.h" #include "log.h" #include "memory-util.h" @@ -270,8 +271,10 @@ static void reset_current_fiber(void) { * current_fiber. Without this, the child of a fork() that happened mid-fiber would inherit the * fiber's log prefix / context list in its thread-locals even though no fiber is running. */ Fiber *f = fiber_get_current(); - if (f) + if (f) { fiber_swap_log_state(f); + fiber_ops_set(NULL); + } fiber_set_current(NULL); } @@ -307,9 +310,19 @@ static void fiber_resolve(sd_future *f) { sd_future_resolve(f, fiber->result); } +static const FiberOps fiber_ops = { + .ppoll = sd_fiber_ppoll, + .read = sd_fiber_read, + .write = sd_fiber_write, + .timeout = sd_fiber_timeout, + .cancel_wait_unref = sd_future_cancel_wait_unref, +}; + static void fiber_enter(Fiber *fiber, Fiber *prev, void **fake_stack_save) { fiber_set_current(fiber); fiber_swap_log_state(fiber); + if (!prev) + fiber_ops_set(&fiber_ops); struct iovec fiber_stack = fiber_stack_usable(&fiber->stack); start_switch_stack(fake_stack_save, &fiber_stack); @@ -318,6 +331,8 @@ static void fiber_enter(Fiber *fiber, Fiber *prev, void **fake_stack_save) { static void fiber_leave(Fiber *fiber, Fiber *prev, void *fake_stack_save) { finish_switch_stack(fake_stack_save); + if (!prev) + fiber_ops_set(NULL); fiber_swap_log_state(fiber); fiber_set_current(prev); } diff --git a/src/libsystemd/sd-future/test-fiber-ops.c b/src/libsystemd/sd-future/test-fiber-ops.c new file mode 100644 index 00000000000..f2c603fda0b --- /dev/null +++ b/src/libsystemd/sd-future/test-fiber-ops.c @@ -0,0 +1,574 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include +#include + +#include "sd-event.h" +#include "sd-future.h" + +#include "alloc-util.h" +#include "cleanup-util.h" +#include "fd-util.h" +#include "io-util.h" +#include "pidref.h" +#include "process-util.h" +#include "tests.h" +#include "time-util.h" + +/* Test: wait_for_terminate basic functionality */ +static int wait_simple_fiber(void *userdata) { + _cleanup_(pidref_done_sigkill_wait) PidRef pidref = PIDREF_NULL; + siginfo_t si; + int r; + + /* Fork a child that exits immediately */ + r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidref); + if (r < 0) + return r; + + if (r == 0) + _exit(42); + + /* Parent - wait for child */ + r = pidref_wait_for_terminate(&pidref, &si); + if (r < 0) + return r; + + pidref_done(&pidref); + + /* Verify child exited with status 42 */ + if (si.si_code != CLD_EXITED || si.si_status != 42) + return -EIO; + + return 0; +} + +TEST(wait_for_terminate_fiber_basic) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "wait-simple", wait_simple_fiber, NULL, /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: wait_for_terminate with multiple children */ +static int wait_multiple_fiber(void *userdata) { + PidRef pidrefs[3] = { PIDREF_NULL, PIDREF_NULL, PIDREF_NULL }; + siginfo_t si; + int r; + + /* Fork three children with different exit codes */ + for (size_t i = 0; i < 3; i++) { + r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidrefs[i]); + if (r < 0) + goto cleanup; + + if (r == 0) + /* Child process */ + _exit(10 + i); + } + + /* Wait for all three in order */ + for (size_t i = 0; i < 3; i++) { + r = pidref_wait_for_terminate(&pidrefs[i], &si); + if (r < 0) + goto cleanup; + + pidref_done(&pidrefs[i]); + + if (si.si_code != CLD_EXITED || si.si_status != (int) (10 + i)) { + r = -EIO; + goto cleanup; + } + } + + return 0; + +cleanup: + for (size_t i = 0; i < 3; i++) + pidref_done(&pidrefs[i]); + + return r; +} + +TEST(wait_for_terminate_fiber_multiple) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "wait-multiple", wait_multiple_fiber, NULL, /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +static int concurrent_wait_fiber(void *userdata) { + _cleanup_(pidref_done_sigkill_wait) PidRef pidref = PIDREF_NULL; + siginfo_t si; + int r; + + r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidref); + if (r < 0) + return r; + + if (r == 0) + /* Child exits with specified status */ + _exit(PTR_TO_INT(userdata)); + + r = pidref_wait_for_terminate(&pidref, &si); + if (r < 0) + return r; + + pidref_done(&pidref); + + if (si.si_code != CLD_EXITED || si.si_status != PTR_TO_INT(userdata)) + return -EIO; + + return 0; +} + +TEST(wait_for_terminate_fiber_concurrent) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + sd_future *fibers[3] = {}; + CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear); + + /* Create 3 fibers, each waiting for a different child */ + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) + ASSERT_OK(sd_fiber_new(e, "concurrent-wait", concurrent_wait_fiber, INT_TO_PTR(20 + i), /* destroy= */ NULL, &fibers[i])); + + ASSERT_OK(sd_event_loop(e)); + + /* All fibers should complete successfully */ + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) + ASSERT_OK(sd_future_result(fibers[i])); +} + +typedef struct LoopIOContext { + int *pipefd; + const char *data; + size_t len; + int order; +} LoopIOContext; + +static int loop_read_suspend_fiber(void *userdata) { + LoopIOContext *ctx = ASSERT_PTR(userdata); + char buf[64]; + + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + ssize_t n = loop_read(ctx->pipefd[0], buf, sizeof(buf), /* do_poll= */ true); + + /* While we were suspended, the writer fiber should have run. */ + ASSERT_EQ(ctx->order, 2); + + if (n < 0) + return (int) n; + if ((size_t) n != ctx->len || memcmp(buf, ctx->data, ctx->len) != 0) + return -EIO; + + return (int) n; +} + +static int loop_write_suspend_fiber(void *userdata) { + LoopIOContext *ctx = ASSERT_PTR(userdata); + + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + int r = loop_write(ctx->pipefd[1], ctx->data, ctx->len); + if (r < 0) + return r; + + /* Close the write end so the reader sees EOF after reading the data. */ + ctx->pipefd[1] = safe_close(ctx->pipefd[1]); + return 0; +} + +/* Test: two fibers cooperatively pass a small payload through a blocking pipe using the suspending + * loop helpers. Exercises the non-blocking flip, event-loop yielding, and the blocking-mode restore. */ +TEST(loop_read_write_suspend) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + + static const char payload[] = "loop-suspend"; + LoopIOContext ctx = { + .pipefd = pipefd, + .data = payload, + .len = sizeof(payload) - 1, + }; + + _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-read", loop_read_suspend_fiber, &ctx, /* destroy= */ NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 0)); + ASSERT_OK(sd_fiber_new(e, "loop-write", loop_write_suspend_fiber, &ctx, /* destroy= */ NULL, &fw)); + ASSERT_OK(sd_future_set_priority(fw, 1)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.len); + ASSERT_OK_ZERO(sd_future_result(fw)); + + /* The read fd started out blocking and loop_read() must have restored it before returning. */ + ASSERT_OK_ZERO(fcntl(pipefd[0], F_GETFL) & O_NONBLOCK); +} + +static int loop_read_exact_short_fiber(void *userdata) { + int fd = PTR_TO_INT(userdata); + char buf[16]; + + /* Requesting more bytes than the peer writes should return -EIO once EOF is hit. */ + return loop_read_exact(fd, buf, sizeof(buf), /* do_poll= */ true); +} + +/* Test: loop_read_exact() returns -EIO when the peer closes early. */ +TEST(loop_read_exact_short) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-read-exact", loop_read_exact_short_fiber, + INT_TO_PTR(pipefd[0]), /* destroy= */ NULL, &f)); + + /* Write a few bytes and close the write end — less than the fiber asked for. */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "abc", 3), (ssize_t) 3); + pipefd[1] = safe_close(pipefd[1]); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_ERROR(sd_future_result(f), EIO); +} + +typedef struct LoopWriteTimeoutContext { + int fd; + int result; +} LoopWriteTimeoutContext; + +static int loop_write_timeout_fiber(void *userdata) { + LoopWriteTimeoutContext *ctx = ASSERT_PTR(userdata); + + /* Try to write much more than the pipe buffer can hold with a short timeout. The write will + * succeed partially and then hit -ETIME after exhausting the timeout while blocked. */ + static const char big_buf[128 * 1024] = { 0 }; + ctx->result = loop_write_full(ctx->fd, big_buf, sizeof(big_buf), 100 * USEC_PER_MSEC); + return 0; +} + +/* Test: loop_write_full() returns -ETIME when the peer never drains. */ +TEST(loop_write_full_timeout) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + + /* Shrink the pipe buffer to its minimum (one page) so the 128K write below is guaranteed to block + * regardless of the architecture's page size. The default pipe buffer is 16 pages, which on + * 64K-page architectures (e.g. ppc64le) is 1 MiB — enough to absorb the entire write without ever + * blocking, defeating the purpose of the timeout. */ + ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1)); + + LoopWriteTimeoutContext ctx = { .fd = pipefd[1], .result = 0 }; + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-write-timeout", loop_write_timeout_fiber, &ctx, /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK_ZERO(sd_future_result(f)); + ASSERT_ERROR(ctx.result, ETIME); +} + +typedef struct PpollDispatchContext { + int *pipefd; + int order; +} PpollDispatchContext; + +static int ppoll_dispatch_read_fiber(void *userdata) { + PpollDispatchContext *ctx = ASSERT_PTR(userdata); + struct pollfd pfd = { + .fd = ctx->pipefd[0], + .events = POLLIN, + }; + + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + /* Direct ppoll_usec() call from a fiber must dispatch through sd_fiber_poll(), suspending the + * fiber instead of blocking the entire thread. If dispatch fails, the writer fiber never gets a + * chance to run and the test deadlocks. */ + int r = ppoll_usec(&pfd, 1, USEC_INFINITY); + if (r < 0) + return r; + + ASSERT_EQ(ctx->order, 2); + + if (r != 1 || !FLAGS_SET(pfd.revents, POLLIN)) + return -EIO; + + return 0; +} + +static int ppoll_dispatch_write_fiber(void *userdata) { + PpollDispatchContext *ctx = ASSERT_PTR(userdata); + + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + if (write(ctx->pipefd[1], "x", 1) < 0) + return -errno; + + return 0; +} + +/* Test: ppoll_usec() called from a fiber dispatches through the FiberOps hook to sd_fiber_poll(), + * yielding to the event loop instead of blocking. */ +TEST(ppoll_usec_dispatch) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + PpollDispatchContext ctx = { .pipefd = pipefd }; + + _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL; + ASSERT_OK(sd_fiber_new(e, "ppoll-read", ppoll_dispatch_read_fiber, &ctx, /* destroy= */ NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 0)); + ASSERT_OK(sd_fiber_new(e, "ppoll-write", ppoll_dispatch_write_fiber, &ctx, /* destroy= */ NULL, &fw)); + ASSERT_OK(sd_future_set_priority(fw, 1)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(fr)); + ASSERT_OK(sd_future_result(fw)); +} + +static int loop_write_zero_timeout_nonblock_fiber(void *userdata) { + int fd = PTR_TO_INT(userdata); + + /* Fill the pipe so the next write would block. The fd is non-blocking, so on a fiber + * loop_write_full(timeout=0) must take the non-fiber path and return -EAGAIN immediately + * rather than suspending. */ + static const char big_buf[128 * 1024] = { 0 }; + return loop_write_full(fd, big_buf, sizeof(big_buf), /* timeout= */ 0); +} + +/* Test: timeout == 0 on a non-blocking fd from a fiber preserves the "don't wait" semantic and + * returns -EAGAIN when the pipe buffer is full, instead of suspending the fiber. */ +TEST(loop_write_zero_timeout_nonblock) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-write-zt-nb", loop_write_zero_timeout_nonblock_fiber, + INT_TO_PTR(pipefd[1]), /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_ERROR(sd_future_result(f), EAGAIN); +} + +typedef struct LoopWriteZeroBlockingContext { + int *pipefd; + size_t total; + int order; +} LoopWriteZeroBlockingContext; + +static int loop_write_zero_blocking_writer_fiber(void *userdata) { + LoopWriteZeroBlockingContext *ctx = ASSERT_PTR(userdata); + + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + /* timeout == 0 on a *blocking* fd from a fiber: the fast EAGAIN return isn't possible, so + * loop_write_full() takes the fiber path. The reader fiber drains the pipe, letting our + * write complete via fiber suspension/resume. */ + _cleanup_free_ char *big_buf = malloc0(ctx->total); + ASSERT_NOT_NULL(big_buf); + int r = loop_write_full(ctx->pipefd[1], big_buf, ctx->total, /* timeout= */ 0); + + ASSERT_EQ(ctx->order, 2); + return r; +} + +static int loop_write_zero_blocking_reader_fiber(void *userdata) { + LoopWriteZeroBlockingContext *ctx = ASSERT_PTR(userdata); + + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + _cleanup_free_ char *buf = malloc(ctx->total); + ASSERT_NOT_NULL(buf); + ssize_t n = loop_read(ctx->pipefd[0], buf, ctx->total, /* do_poll= */ true); + if (n < 0) + return (int) n; + return (int) n; +} + +/* Test: timeout == 0 on a blocking fd from a fiber takes the fiber path (suspends until the peer + * drains) instead of blocking the entire thread. */ +TEST(loop_write_zero_timeout_blocking) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1)); + + /* F_SETPIPE_SZ rounds up to the kernel's pipe minimum (typically a page); query the actual + * size and write more than that, so the write must wait on the reader regardless of page size. */ + int pipe_sz = fcntl(pipefd[1], F_GETPIPE_SZ); + ASSERT_OK_ERRNO(pipe_sz); + + LoopWriteZeroBlockingContext ctx = { .pipefd = pipefd, .total = (size_t) pipe_sz * 2 }; + + _cleanup_(sd_future_unrefp) sd_future *fw = NULL, *fr = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-write-zt-blk", loop_write_zero_blocking_writer_fiber, + &ctx, /* destroy= */ NULL, &fw)); + ASSERT_OK(sd_future_set_priority(fw, 0)); + ASSERT_OK(sd_fiber_new(e, "loop-read-zt-blk", loop_write_zero_blocking_reader_fiber, + &ctx, /* destroy= */ NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 1)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(fw)); + ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.total); +} + +static int loop_read_no_poll_nonblock_fiber(void *userdata) { + int fd = PTR_TO_INT(userdata); + char buf[64]; + + /* Empty non-blocking pipe + do_poll=false: on a fiber loop_read() must take the non-fiber + * path and return -EAGAIN immediately rather than suspending. */ + return (int) loop_read(fd, buf, sizeof(buf), /* do_poll= */ false); +} + +/* Test: do_poll == false on a non-blocking fd from a fiber preserves the "don't wait" semantic + * and returns -EAGAIN when no data is available, instead of suspending the fiber. */ +TEST(loop_read_no_poll_nonblock) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-read-np-nb", loop_read_no_poll_nonblock_fiber, + INT_TO_PTR(pipefd[0]), /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_ERROR(sd_future_result(f), EAGAIN); +} + +typedef struct LoopReadNoPollBlockingContext { + int *pipefd; + const char *data; + size_t len; + int order; +} LoopReadNoPollBlockingContext; + +static int loop_read_no_poll_blocking_reader_fiber(void *userdata) { + LoopReadNoPollBlockingContext *ctx = ASSERT_PTR(userdata); + char buf[64]; + + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + /* do_poll == false on a *blocking* fd from a fiber: the fast EAGAIN return isn't possible, + * so loop_read() takes the fiber path and suspends until the writer fiber feeds data. */ + ssize_t n = loop_read(ctx->pipefd[0], buf, sizeof(buf), /* do_poll= */ false); + + ASSERT_EQ(ctx->order, 2); + + if (n < 0) + return (int) n; + if ((size_t) n != ctx->len || memcmp(buf, ctx->data, ctx->len) != 0) + return -EIO; + + return (int) n; +} + +static int loop_read_no_poll_blocking_writer_fiber(void *userdata) { + LoopReadNoPollBlockingContext *ctx = ASSERT_PTR(userdata); + + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + int r = loop_write(ctx->pipefd[1], ctx->data, ctx->len); + if (r < 0) + return r; + + ctx->pipefd[1] = safe_close(ctx->pipefd[1]); + return 0; +} + +/* Test: do_poll == false on a blocking fd from a fiber takes the fiber path (suspends until the + * peer feeds data) instead of blocking the entire thread. */ +TEST(loop_read_no_poll_blocking) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + + static const char payload[] = "no-poll"; + LoopReadNoPollBlockingContext ctx = { + .pipefd = pipefd, + .data = payload, + .len = sizeof(payload) - 1, + }; + + _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL; + ASSERT_OK(sd_fiber_new(e, "loop-read-np-blk", loop_read_no_poll_blocking_reader_fiber, + &ctx, /* destroy= */ NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 0)); + ASSERT_OK(sd_fiber_new(e, "loop-write-np-blk", loop_read_no_poll_blocking_writer_fiber, + &ctx, /* destroy= */ NULL, &fw)); + ASSERT_OK(sd_future_set_priority(fw, 1)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.len); + ASSERT_OK_ZERO(sd_future_result(fw)); +} + +/* Test: loop_*() helpers transparently fall back to blocking I/O when called outside any + * fiber context. */ +TEST(loop_read_write_fallback) { + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + + ASSERT_OK(loop_write(pipefd[1], "fallback", STRLEN("fallback"))); + + char buf[16]; + ssize_t n = loop_read(pipefd[0], buf, STRLEN("fallback"), /* do_poll= */ true); + ASSERT_OK_EQ(n, (ssize_t) STRLEN("fallback")); + ASSERT_EQ(memcmp(buf, "fallback", STRLEN("fallback")), 0); +} + +DEFINE_TEST_MAIN(LOG_DEBUG);