]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-event: suspend instead of blocking when sd_event_run() runs on a fiber
authorDaan De Meyer <daan@amutable.com>
Mon, 23 Mar 2026 09:15:27 +0000 (10:15 +0100)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
sd_event_run() blocks the calling thread on the event loop's epoll fd
until something happens. When the caller is a fiber, that's the wrong
behaviour: blocking the thread also stalls every other fiber and the
outer event loop driving them. The most common way to hit this is a
fiber that creates its own inner event loop (e.g. a server-style fiber
that wants to dispatch its own sources independently of whatever loop
the test or supervising fiber is running on) — with the existing
implementation the inner sd_event_run() would hold the thread while the
outer scheduler should be free to advance other fibers.

Add an event_run_suspend() variant in sd-event/event-future.c that
performs the same prepare/wait/dispatch dance, but when the fast path
finds nothing ready it (a) creates an IO future watching the inner
event loop's epoll fd on the *outer* event loop, (b) optionally creates
a time future for the timeout, and (c) suspends the fiber. When either
future fires the fiber is resumed and the prepare/wait/dispatch sequence
runs once more to actually dispatch what's pending. sd_event_run()
checks sd_fiber_is_running() and delegates to this variant when on a
fiber; profile_delays accounting is intentionally skipped on that path
since the underlying prepare/wait/dispatch primitives already account
for themselves.

src/libsystemd/meson.build
src/libsystemd/sd-event/event-future.c
src/libsystemd/sd-event/event-future.h
src/libsystemd/sd-event/event-util.h
src/libsystemd/sd-event/sd-event.c
src/libsystemd/sd-event/test-event-future.c [new file with mode: 0644]

index 91a78155d2d3b24e8501de90653fa894bedf0738..4e9fd28d44231a9181299a801030717d2c1c7e48 100644 (file)
@@ -190,6 +190,7 @@ simple_tests += files(
         'sd-bus/test-bus-vtable.c',
         'sd-device/test-device-util.c',
         'sd-device/test-sd-device-monitor.c',
+        'sd-event/test-event-future.c',
         'sd-future/test-fiber.c',
         'sd-future/test-fiber-io.c',
         'sd-future/test-fiber-ops.c',
index 8c9960fcdd9d0eff52b9ab1596a79e1301b8a98f..4e0a0a87fc2041267360aeb7c221cecdc5c77cad 100644 (file)
@@ -6,6 +6,7 @@
 #include "alloc-util.h"
 #include "errno-util.h"
 #include "event-future.h"
+#include "event-util.h"
 #include "fd-util.h"
 
 typedef struct IoFuture {
@@ -234,3 +235,74 @@ int future_new_time(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accura
 int future_new_time_relative(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret) {
         return future_new_time_internal(sd_event_add_time_relative, e, clock, usec, accuracy, result, ret);
 }
+
+int event_run_suspend(sd_event *e, uint64_t timeout) {
+        sd_event *outer = sd_fiber_get_event();
+        int r;
+
+        assert(e);
+        assert(sd_fiber_is_running());
+        assert(outer);
+        assert(e != outer);
+
+        /* Make sure that none of the preparation callbacks ends up freeing the event source under our feet */
+        PROTECT_EVENT(e);
+
+        r = sd_event_prepare(e);
+        if (r < 0)
+                return r;
+        if (r == 0) {
+                r = sd_event_wait(e, 0);
+                if (r < 0)
+                        return r;
+        }
+        if (r > 0) {
+                r = sd_event_dispatch(e);
+                if (r < 0)
+                        return r;
+
+                return 1;
+        }
+
+        if (timeout == 0)
+                return 0;
+
+        int fd = sd_event_get_fd(e);
+        if (fd < 0)
+                return fd;
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *io = NULL;
+        r = future_new_io(outer, fd, EPOLLIN, &io);
+        if (r < 0)
+                return r;
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *timer = NULL;
+        if (timeout != USEC_INFINITY) {
+                r = future_new_time_relative(
+                                outer,
+                                CLOCK_MONOTONIC,
+                                timeout,
+                                /* accuracy= */ 1,
+                                /* result= */ 0,
+                                &timer);
+                if (r < 0)
+                        return r;
+        }
+
+        r = sd_fiber_suspend();
+        if (r < 0)
+                return r;
+
+        r = sd_event_prepare(e);
+        if (r == 0)
+                r = sd_event_wait(e, 0);
+        if (r > 0) {
+                r = sd_event_dispatch(e);
+                if (r < 0)
+                        return r;
+
+                return 1;
+        }
+
+        return r;
+}
index 3bc275e7b7ac9b817cea158914b0bd3cd783c1d6..83d5939d6b02d03a47da98b10a5aeb079f0a9e25 100644 (file)
@@ -6,3 +6,5 @@
 int future_new_io(sd_event *e, int fd, uint32_t events, sd_future **ret);
 int future_new_time(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret);
 int future_new_time_relative(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret);
+
+int event_run_suspend(sd_event *e, uint64_t timeout);
index dc3b3ed70ff1280d7a35ec43358b8fe9ca1841d6..ce213b9c9e4d934ea0396ee55dce9ee67e55e646 100644 (file)
@@ -5,6 +5,9 @@
 
 #include "sd-forward.h"
 
+#define PROTECT_EVENT(e)                                                \
+        _unused_ _cleanup_(sd_event_unrefp) sd_event *_ref = sd_event_ref(e);
+
 extern const struct hash_ops event_source_hash_ops;
 
 int event_reset_time(
index 8c419fad306ea04736339fc006cb399cf240ef23..4935ae1b4aac28e253cab45c5650b34ecd262f45 100644 (file)
 
 #include "sd-daemon.h"
 #include "sd-event.h"
+#include "sd-future.h"
 #include "sd-id128.h"
 #include "sd-messages.h"
 
 #include "alloc-util.h"
 #include "errno-util.h"
+#include "event-future.h"
 #include "event-source.h"
+#include "event-util.h"
 #include "fd-util.h"
 #include "format-util.h"
 #include "glyph-util.h"
@@ -474,9 +477,6 @@ _public_ sd_event* sd_event_unref(sd_event *e) {
         return event_free(e);
 }
 
-#define PROTECT_EVENT(e)                                                \
-        _unused_ _cleanup_(sd_event_unrefp) sd_event *_ref = sd_event_ref(e);
-
 _public_ sd_event_source* sd_event_source_disable_unref(sd_event_source *s) {
         int r;
 
@@ -4932,6 +4932,13 @@ _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
         assert_return(e->state == SD_EVENT_INITIAL, -EBUSY);
 
+        /* When running on a fiber, delegate to the suspending implementation. Note that the
+         * profile_delays accounting below is intentionally skipped on that path: the suspending variant
+         * drives the event loop via sd_event_prepare()/sd_event_wait()/sd_event_dispatch() itself, which
+         * are the same primitives profile_delays tracks when called directly. */
+        if (sd_fiber_is_running())
+                return event_run_suspend(e, timeout);
+
         if (e->profile_delays && e->last_run_usec != 0) {
                 usec_t this_run;
                 unsigned l;
diff --git a/src/libsystemd/sd-event/test-event-future.c b/src/libsystemd/sd-event/test-event-future.c
new file mode 100644 (file)
index 0000000..754daf0
--- /dev/null
@@ -0,0 +1,358 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "sd-event.h"
+#include "sd-future.h"
+
+#include "fd-util.h"
+#include "tests.h"
+#include "time-util.h"
+
+static int timer_callback(sd_event_source *s, uint64_t usec, void *userdata) {
+        int *count = ASSERT_PTR(userdata);
+        int r;
+
+        (*count)++;
+
+        r = sd_event_source_set_time_relative(s, 5 * USEC_PER_MSEC);
+        if (r < 0)
+                return r;
+
+        if (sd_fiber_is_running() && *count >= 3)
+                return sd_event_exit(sd_event_source_get_event(s), 0);
+
+        return 0;
+}
+
+static int event_run_fiber_func(void *userdata) {
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        _cleanup_(sd_event_source_unrefp) sd_event_source *inner_timer = NULL;
+        int r;
+
+        /* Create inner event loop from within the fiber */
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        /* Add a timer to the inner event loop that fires every 5ms */
+        r = sd_event_add_time_relative(inner, &inner_timer, CLOCK_MONOTONIC,
+                                       5 * USEC_PER_MSEC, 0, timer_callback,
+                                       userdata);
+        if (r < 0)
+                return r;
+
+        r = sd_event_source_set_enabled(inner_timer, SD_EVENT_ON);
+        if (r < 0)
+                return r;
+
+        return sd_event_loop(inner);
+}
+
+TEST(sd_event_loop_fiber) {
+        /* Create outer event loop for the fiber scheduler */
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        /* Add a timer to the outer event loop that fires every 5ms */
+        _cleanup_(sd_event_source_unrefp) sd_event_source *outer_timer = NULL;
+        int outer_timer_count = 0;
+        ASSERT_OK(sd_event_add_time_relative(outer, &outer_timer, CLOCK_MONOTONIC,
+                                             5 * USEC_PER_MSEC, 0, timer_callback,
+                                             &outer_timer_count));
+
+        /* Create a fiber that will create and run the inner event loop */
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        int inner_timer_count = 0;
+        ASSERT_OK(sd_fiber_new(outer, "event-runner", event_run_fiber_func, &inner_timer_count, /* destroy= */ NULL, &f));
+
+        /* Run the outer event loop */
+        ASSERT_OK(sd_event_loop(outer));
+
+        /* Fiber should have completed successfully */
+        ASSERT_OK(sd_future_result(f));
+
+        /* Both timers should have fired at least once */
+        ASSERT_EQ(inner_timer_count, 3);
+        ASSERT_GT(outer_timer_count, 0);
+}
+
+static int event_run_fiber_timeout_func(void *userdata) {
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        int r;
+
+        /* Create inner event loop from within the fiber */
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        /* Run with a short timeout - should timeout since there are no events */
+        return sd_event_run(inner, 10 * USEC_PER_MSEC);
+}
+
+TEST(sd_event_run_fiber_timeout) {
+        /* Create outer event loop for the fiber scheduler */
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        /* Create a fiber that will run sd_event_run() with timeout */
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(outer, "event-timeout", event_run_fiber_timeout_func, NULL, /* destroy= */ NULL, &f));
+
+        /* Run the outer event loop */
+        ASSERT_OK(sd_event_loop(outer));
+
+        /* Fiber should have completed successfully (timeout returns 0) */
+        ASSERT_OK_ZERO(sd_future_result(f));
+}
+
+/* Test: sd_event_run() with zero timeout returns immediately */
+static int sd_event_run_zero_timeout_fiber(void *userdata) {
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        int r;
+
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        /* With zero timeout on an empty event loop, should return 0 immediately */
+        r = sd_event_run(inner, 0);
+        if (r != 0)
+                return r < 0 ? r : -EIO;
+
+        return 0;
+}
+
+TEST(sd_event_run_zero_timeout) {
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(outer, "run-suspend-zero", sd_event_run_zero_timeout_fiber, NULL, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(outer));
+        ASSERT_OK_ZERO(sd_future_result(f));
+}
+
+/* Test: sd_event_run() dispatches immediately pending IO */
+static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
+        int *counter = ASSERT_PTR(userdata);
+        char buf[64];
+
+        (*counter)++;
+
+        /* Drain the fd */
+        (void) read(fd, buf, sizeof(buf));
+
+        return sd_event_exit(sd_event_source_get_event(s), 0);
+}
+
+static int sd_event_run_immediate_fiber(void *userdata) {
+        int *pipefd = ASSERT_PTR(userdata);
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        _cleanup_(sd_event_source_unrefp) sd_event_source *source = NULL;
+        int counter = 0, r;
+
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        /* Add IO source watching the read end of the pipe */
+        r = sd_event_add_io(inner, &source, pipefd[0], EPOLLIN, io_callback, &counter);
+        if (r < 0)
+                return r;
+
+        /* Data is already available on the pipe (written before fiber started), so
+         * sd_event_run() should dispatch immediately without suspending */
+        r = sd_event_run(inner, USEC_INFINITY);
+        if (r < 0)
+                return r;
+
+        /* The IO callback should have fired */
+        if (counter != 1)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(sd_event_run_immediate) {
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        /* Write data before starting the fiber so it's immediately available */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "X", 1), 1);
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(outer, "run-suspend-immediate", sd_event_run_immediate_fiber, pipefd, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(outer));
+        ASSERT_OK_ZERO(sd_future_result(f));
+}
+
+/* Test: sd_event_run() with IO arriving during suspension */
+static int sd_event_run_io_fiber(void *userdata) {
+        int *pipefd = ASSERT_PTR(userdata);
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        _cleanup_(sd_event_source_unrefp) sd_event_source *source = NULL;
+        int counter = 0, r;
+
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        r = sd_event_add_io(inner, &source, pipefd[0], EPOLLIN, io_callback, &counter);
+        if (r < 0)
+                return r;
+
+        /* No data available yet, so this will suspend the fiber until IO arrives */
+        r = sd_event_run(inner, USEC_INFINITY);
+        if (r < 0)
+                return r;
+
+        if (counter != 1)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(sd_event_run_io) {
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(outer, "run-suspend-io", sd_event_run_io_fiber, pipefd, /* destroy= */ NULL, &f));
+
+        /* First iteration: fiber runs, adds IO source, suspends because no data */
+        ASSERT_OK_POSITIVE(sd_event_run(outer, 0));
+
+        /* Write data to the pipe to wake the inner event loop */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "Y", 1), 1);
+
+        /* Complete: fiber resumes, dispatches IO, finishes */
+        ASSERT_OK(sd_event_loop(outer));
+        ASSERT_OK_ZERO(sd_future_result(f));
+}
+
+/* Test: event_run called in a loop keeps event loop state consistent.
+ * This is a regression test for a bug where error paths after sd_event_prepare()
+ * could leave the inner event loop stuck in SD_EVENT_ARMED state. */
+static int sd_event_run_loop_fiber(void *userdata) {
+        int *pipefd = ASSERT_PTR(userdata);
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        _cleanup_(sd_event_source_unrefp) sd_event_source *source = NULL;
+        int counter = 0, r;
+
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        r = sd_event_add_io(inner, &source, pipefd[0], EPOLLIN, io_callback, &counter);
+        if (r < 0)
+                return r;
+
+        /* Call sd_event_run() multiple times with short timeouts.
+         * Each call should leave the inner event loop in a clean state for the next call. */
+        for (int i = 0; i < 5; i++) {
+                r = sd_event_run(inner, 10 * USEC_PER_MSEC);
+                if (r < 0)
+                        return r;
+                if (r > 0)
+                        break;
+        }
+
+        /* After multiple timeouts, the event loop should still be usable.
+         * Write data and do one more run to verify. */
+        if (counter == 0) {
+                /* Data wasn't written yet, do a final run with longer timeout */
+                r = sd_event_run(inner, USEC_INFINITY);
+                if (r < 0)
+                        return r;
+        }
+
+        if (counter != 1)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(sd_event_run_loop) {
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
+        ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(outer, "run-suspend-loop", sd_event_run_loop_fiber, pipefd, /* destroy= */ NULL, &f));
+
+        /* Let the fiber run through a few timeout iterations */
+        for (int i = 0; i < 10; i++)
+                ASSERT_OK(sd_event_run(outer, 50 * USEC_PER_MSEC));
+
+        /* Write data to unblock the fiber */
+        ASSERT_OK_EQ_ERRNO(write(pipefd[1], "Z", 1), 1);
+
+        ASSERT_OK(sd_event_loop(outer));
+        ASSERT_OK_ZERO(sd_future_result(f));
+}
+
+/* Test: sd_event_run() with an inner timer that fires during suspension */
+static int inner_timer_handler(sd_event_source *s, uint64_t usec, void *userdata) {
+        int *counter = ASSERT_PTR(userdata);
+        (*counter)++;
+        return sd_event_exit(sd_event_source_get_event(s), 0);
+}
+
+static int sd_event_run_timer_fiber(void *userdata) {
+        _cleanup_(sd_event_unrefp) sd_event *inner = NULL;
+        _cleanup_(sd_event_source_unrefp) sd_event_source *source = NULL;
+        int counter = 0, r;
+
+        r = sd_event_new(&inner);
+        if (r < 0)
+                return r;
+
+        /* Add a timer that fires after 10ms */
+        r = sd_event_add_time_relative(inner, &source, CLOCK_MONOTONIC,
+                                       10 * USEC_PER_MSEC, 0, inner_timer_handler,
+                                       &counter);
+        if (r < 0)
+                return r;
+
+        /* Should suspend, then resume when the timer fires */
+        r = sd_event_run(inner, USEC_INFINITY);
+        if (r < 0)
+                return r;
+
+        if (counter != 1)
+                return -EIO;
+
+        return 0;
+}
+
+TEST(sd_event_run_timer) {
+        _cleanup_(sd_event_unrefp) sd_event *outer = NULL;
+        ASSERT_OK(sd_event_new(&outer));
+        ASSERT_OK(sd_event_set_exit_on_idle(outer, true));
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        ASSERT_OK(sd_fiber_new(outer, "run-suspend-timer", sd_event_run_timer_fiber, NULL, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(outer));
+        ASSERT_OK_ZERO(sd_future_result(f));
+}
+
+DEFINE_TEST_MAIN(LOG_DEBUG);