From: Daan De Meyer Date: Sat, 25 Apr 2026 20:31:58 +0000 (+0200) Subject: sd-future: add fiber-aware non-blocking I/O wrappers X-Git-Tag: v261-rc1~40^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cf4c65afa86021a750de38bbed192eeb1c9fd425;p=thirdparty%2Fsystemd.git sd-future: add fiber-aware non-blocking I/O wrappers Add a family of sd_fiber_*() I/O wrappers that, when called from a fiber, behave like blocking I/O from the caller's perspective but yield to the event loop instead of blocking the thread: sd_fiber_read / sd_fiber_write sd_fiber_readv / sd_fiber_writev sd_fiber_recv / sd_fiber_send sd_fiber_connect sd_fiber_recvmsg / sd_fiber_sendmsg sd_fiber_recvfrom / sd_fiber_sendto sd_fiber_accept sd_fiber_ppoll Most of them share a single helper, fiber_io_operation(), which when invoked outside a fiber falls through to the underlying syscall directly, preserving the regular blocking behaviour. Inside a fiber the helper flips the fd to non-blocking (restoring its original mode on return), tries the syscall once on the fast path, and on EAGAIN/ EWOULDBLOCK creates an sd-event-backed IO future via future_new_io(), suspends the fiber, and retries the syscall once the event source fires. future_new_io() itself is added to sd-event/event-future.{c,h} as a new IoFuture kind. It wraps sd_event_add_io() into an sd_future: oneshot enable, EPOLLERR translated via SO_ERROR (suppressed for non-sockets), and the fd duplicated with F_DUPFD_CLOEXEC to avoid EEXIST when multiple sources watch the same descriptor. Together these let fiber-using code write straight-line socket and pipe I/O without bundling state into callbacks. --- diff --git a/src/basic/io-util.c b/src/basic/io-util.c index 103aa2a7cde..b4f643b5721 100644 --- a/src/basic/io-util.c +++ b/src/basic/io-util.c @@ -3,6 +3,7 @@ #include #include #include +#include /* IWYU pragma: keep */ #include #include @@ -10,6 +11,14 @@ #include "io-util.h" #include "time-util.h" +/* EPOLL_POLL_COMMON_MASK in io-util.h treats POLL* and EPOLL* as interchangeable; verify it. */ +assert_cc((uint32_t) POLLIN == EPOLLIN); +assert_cc((uint32_t) POLLOUT == EPOLLOUT); +assert_cc((uint32_t) POLLERR == EPOLLERR); +assert_cc((uint32_t) POLLHUP == EPOLLHUP); +assert_cc((uint32_t) POLLPRI == EPOLLPRI); +assert_cc((uint32_t) POLLRDHUP == EPOLLRDHUP); + int flush_fd(int fd) { int count = 0; diff --git a/src/basic/io-util.h b/src/basic/io-util.h index 918108c023b..ab52dc8db65 100644 --- a/src/basic/io-util.h +++ b/src/basic/io-util.h @@ -3,6 +3,12 @@ #include "basic-forward.h" +/* The intersection of poll() and epoll_wait() event masks. Linux defines POLL* and EPOLL* with the + * same numeric values for these — see the assert_cc()s in io-util.c — so this mask can be used + * interchangeably as a `revents` (poll) or `events` (epoll) bitset. */ +#define EPOLL_POLL_COMMON_MASK \ + (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLRDHUP) + int flush_fd(int fd); ssize_t loop_read(int fd, void *buf, size_t nbytes, bool do_poll); diff --git a/src/libsystemd/meson.build b/src/libsystemd/meson.build index 061ff6213f6..3c624051f0d 100644 --- a/src/libsystemd/meson.build +++ b/src/libsystemd/meson.build @@ -77,6 +77,7 @@ sd_device_sources = files( ############################################################ sd_future_sources = files( + 'sd-future/fiber-io.c', 'sd-future/fiber.c', 'sd-future/sd-future.c', ) @@ -190,6 +191,7 @@ simple_tests += files( 'sd-device/test-device-util.c', 'sd-device/test-sd-device-monitor.c', 'sd-future/test-fiber.c', + 'sd-future/test-fiber-io.c', 'sd-hwdb/test-sd-hwdb.c', 'sd-id128/test-id128.c', 'sd-journal/test-audit-type.c', diff --git a/src/libsystemd/sd-event/event-future.c b/src/libsystemd/sd-event/event-future.c index 4595a7a6fb0..8c9960fcdd9 100644 --- a/src/libsystemd/sd-event/event-future.c +++ b/src/libsystemd/sd-event/event-future.c @@ -6,6 +6,124 @@ #include "alloc-util.h" #include "errno-util.h" #include "event-future.h" +#include "fd-util.h" + +typedef struct IoFuture { + sd_event_source *source; +} IoFuture; + +static void* io_future_alloc(void) { + return new0(IoFuture, 1); +} + +static void io_future_free(sd_future *f) { + IoFuture *iof = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f))); + sd_event_source_unref(iof->source); + free(iof); +} + +static int io_future_cancel(sd_future *f) { + IoFuture *iof = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f))); + int r = 0; + + RET_GATHER(r, sd_event_source_set_enabled(iof->source, SD_EVENT_OFF)); + RET_GATHER(r, sd_future_resolve(f, -ECANCELED)); + return r; +} + +static int io_future_set_priority(sd_future *f, int64_t priority) { + IoFuture *iof = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f))); + return sd_event_source_set_priority(iof->source, priority); +} + +static const sd_future_ops io_future_ops = { + .size = sizeof(sd_future_ops), + .alloc = io_future_alloc, + .free = io_future_free, + .cancel = io_future_cancel, + .set_priority = io_future_set_priority, +}; + +static int io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) { + sd_future *f = ASSERT_PTR(userdata); + + /* Resolve with the revents mask on success (matching io_uring poll_add's CQE convention) so + * callers can read it directly off the future result. EPOLLERR is the one exception: surface + * the actual socket error via SO_ERROR so callers like sd_fiber_connect() can return -errno + * directly without re-querying. */ + if (FLAGS_SET(revents, EPOLLERR)) { + int error = 0; + socklen_t len = sizeof(error); + + int r = RET_NERRNO(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len)); + if (r == -ENOTSOCK) + return sd_future_resolve(f, (int) revents); + if (r >= 0 && error != 0) + return sd_future_resolve(f, -error); + if (r >= 0) + /* EPOLLERR was reported but SO_ERROR returned no pending error (e.g. + * already consumed elsewhere). Surface the revents mask so the caller + * still sees the error condition rather than mistaking it for success. */ + return sd_future_resolve(f, (int) revents); + /* On any other getsockopt() error fall through and resolve the future with that + * error so the waiting fiber wakes up rather than hanging forever. */ + return sd_future_resolve(f, r); + } + + return sd_future_resolve(f, (int) revents); +} + +int future_new_io(sd_event *e, int fd, uint32_t events, sd_future **ret) { + int r; + + assert(e); + assert(fd >= 0); + assert(ret); + + if (IN_SET(sd_event_get_state(e), SD_EVENT_EXITING, SD_EVENT_FINISHED)) + return -ECANCELED; + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + r = sd_future_new(&io_future_ops, &f); + if (r < 0) + return r; + + IoFuture *iof = sd_future_get_private(f); + + /* Duplicate fd to avoid EEXIST from epoll when adding the same fd multiple times */ + _cleanup_close_ int fd_copy = fcntl(fd, F_DUPFD_CLOEXEC, 3); + if (fd_copy < 0) + return -errno; + + r = sd_event_add_io(e, &iof->source, fd_copy, events, io_handler, f); + if (r < 0) + return r; + + r = sd_event_source_set_io_fd_own(iof->source, true); + if (r < 0) + return r; + + TAKE_FD(fd_copy); + + r = sd_event_source_set_enabled(iof->source, SD_EVENT_ONESHOT); + if (r < 0) + return r; + + if (sd_fiber_is_running()) { + int64_t priority; + + r = sd_fiber_get_priority(&priority); + if (r < 0) + return r; + + r = sd_event_source_set_priority(iof->source, priority); + if (r < 0) + return r; + } + + *ret = TAKE_PTR(f); + return 0; +} typedef struct TimeFuture { sd_event_source *source; diff --git a/src/libsystemd/sd-event/event-future.h b/src/libsystemd/sd-event/event-future.h index 7e956906ebf..3bc275e7b7a 100644 --- a/src/libsystemd/sd-event/event-future.h +++ b/src/libsystemd/sd-event/event-future.h @@ -3,5 +3,6 @@ #include "sd-forward.h" +int future_new_io(sd_event *e, int fd, uint32_t events, sd_future **ret); int future_new_time(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret); int future_new_time_relative(sd_event *e, clockid_t clock, uint64_t usec, uint64_t accuracy, int result, sd_future **ret); diff --git a/src/libsystemd/sd-future/fiber-io.c b/src/libsystemd/sd-future/fiber-io.c new file mode 100644 index 00000000000..823e8907185 --- /dev/null +++ b/src/libsystemd/sd-future/fiber-io.c @@ -0,0 +1,471 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include /* IWYU pragma: keep */ +#include +#include +#include +#include + +#include "sd-event.h" +#include "sd-future.h" + +#include "alloc-util.h" +#include "errno-util.h" +#include "event-future.h" +#include "fd-util.h" +#include "io-util.h" +#include "time-util.h" + +typedef ssize_t (*FiberIOFunc)(int fd, void *args); + +static ssize_t fiber_io_operation( + int fd, + uint32_t events, + FiberIOFunc func, + void *args) { + _cleanup_(nonblock_resetp) int reset_fd = -EBADF; + int r; + + assert(fd >= 0); + assert(func); + + if (!sd_fiber_is_running()) + return func(fd, args); + + sd_event *e = sd_fiber_get_event(); + assert(e); + + r = fd_nonblock(fd, true); + if (r < 0) + return r; + if (r > 0) + reset_fd = fd; + + ssize_t n = func(fd, args); + if (n >= 0 || !ERRNO_IS_NEG_TRANSIENT(n)) + return n; + + _cleanup_(sd_future_cancel_wait_unrefp) sd_future *io = NULL; + r = future_new_io(e, fd, events, &io); + if (r < 0) + return r; + + r = sd_fiber_suspend(); + if (r < 0) + return r; + + return func(fd, args); +} + +typedef struct ReadArgs { + void *buf; + size_t count; +} ReadArgs; + +static ssize_t read_callback(int fd, void *args) { + ReadArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = read(fd, a->buf, a->count); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_read(int fd, void *buf, size_t count) { + assert_return(fd >= 0, -EBADF); + assert_return(buf || count == 0, -EINVAL); + + return fiber_io_operation(fd, EPOLLIN, read_callback, &(ReadArgs) { + .buf = buf, + .count = count, + }); +} + +typedef struct WriteArgs { + const void *buf; + size_t count; +} WriteArgs; + +static ssize_t write_callback(int fd, void *args) { + WriteArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = write(fd, a->buf, a->count); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_write(int fd, const void *buf, size_t count) { + assert_return(fd >= 0, -EBADF); + assert_return(buf || count == 0, -EINVAL); + + return fiber_io_operation(fd, EPOLLOUT, write_callback, &(WriteArgs) { + .buf = buf, + .count = count, + }); +} + +typedef struct ReadvArgs { + const struct iovec *iov; + int iovcnt; +} ReadvArgs; + +static ssize_t readv_callback(int fd, void *args) { + ReadvArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = readv(fd, a->iov, a->iovcnt); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_readv(int fd, const struct iovec *iov, int iovcnt) { + assert_return(fd >= 0, -EBADF); + assert_return(iov || iovcnt == 0, -EINVAL); + + return fiber_io_operation(fd, EPOLLIN, readv_callback, &(ReadvArgs) { + .iov = iov, + .iovcnt = iovcnt, + }); +} + +typedef struct WritevArgs { + const struct iovec *iov; + int iovcnt; +} WritevArgs; + +static ssize_t writev_callback(int fd, void *args) { + WritevArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = writev(fd, a->iov, a->iovcnt); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_writev(int fd, const struct iovec *iov, int iovcnt) { + assert_return(fd >= 0, -EBADF); + assert_return(iov || iovcnt == 0, -EINVAL); + + return fiber_io_operation(fd, EPOLLOUT, writev_callback, &(WritevArgs) { + .iov = iov, + .iovcnt = iovcnt, + }); +} + +typedef struct RecvArgs { + void *buf; + size_t len; + int flags; +} RecvArgs; + +static ssize_t recv_callback(int fd, void *args) { + RecvArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = recv(fd, a->buf, a->len, a->flags); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_recv(int sockfd, void *buf, size_t len, int flags) { + assert_return(sockfd >= 0, -EBADF); + assert_return(buf || len == 0, -EINVAL); + + return fiber_io_operation(sockfd, EPOLLIN, recv_callback, &(RecvArgs) { + .buf = buf, + .len = len, + .flags = flags, + }); +} + +typedef struct SendArgs { + const void *buf; + size_t len; + int flags; +} SendArgs; + +static ssize_t send_callback(int fd, void *args) { + SendArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = send(fd, a->buf, a->len, a->flags); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_send(int sockfd, const void *buf, size_t len, int flags) { + assert_return(sockfd >= 0, -EBADF); + assert_return(buf || len == 0, -EINVAL); + + return fiber_io_operation(sockfd, EPOLLOUT, send_callback, &(SendArgs) { + .buf = buf, + .len = len, + .flags = flags, + }); +} + +int sd_fiber_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { + _cleanup_(nonblock_resetp) int reset_fd = -EBADF; + int r; + + assert_return(sockfd >= 0, -EBADF); + assert_return(addr, -EINVAL); + + if (!sd_fiber_is_running()) + return RET_NERRNO(connect(sockfd, addr, addrlen)); + + sd_event *e = sd_fiber_get_event(); + assert(e); + + r = fd_nonblock(sockfd, true); + if (r < 0) + return r; + if (r > 0) + reset_fd = sockfd; + + r = RET_NERRNO(connect(sockfd, addr, addrlen)); + if (r != -EINPROGRESS) + return r; + + _cleanup_(sd_future_cancel_wait_unrefp) sd_future *io = NULL; + r = future_new_io(e, sockfd, EPOLLOUT, &io); + if (r < 0) + return r; + + /* future_new_io resolves with the revents mask on success; translate any positive value + * (e.g. POLLOUT) back to the connect(2) success status. */ + r = sd_fiber_suspend(); + return r > 0 ? 0 : r; +} + +typedef struct RecvmsgArgs { + struct msghdr *msg; + int flags; +} RecvmsgArgs; + +static ssize_t recvmsg_callback(int fd, void *args) { + RecvmsgArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = recvmsg(fd, a->msg, a->flags); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_recvmsg(int sockfd, struct msghdr *msg, int flags) { + assert_return(sockfd >= 0, -EBADF); + assert_return(msg, -EINVAL); + + return fiber_io_operation(sockfd, EPOLLIN, recvmsg_callback, &(RecvmsgArgs) { + .msg = msg, + .flags = flags, + }); +} + +typedef struct SendmsgArgs { + const struct msghdr *msg; + int flags; +} SendmsgArgs; + +static ssize_t sendmsg_callback(int fd, void *args) { + SendmsgArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = sendmsg(fd, a->msg, a->flags); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags) { + assert_return(sockfd >= 0, -EBADF); + assert_return(msg, -EINVAL); + + return fiber_io_operation(sockfd, EPOLLOUT, sendmsg_callback, &(SendmsgArgs) { + .msg = msg, + .flags = flags, + }); +} + +static ssize_t recvfrom_callback(int fd, void *args) { + RecvmsgArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = recvmsg(fd, a->msg, a->flags); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) { + ssize_t n; + + assert_return(sockfd >= 0, -EBADF); + assert_return(buf || len == 0, -EINVAL); + assert_return(!src_addr || addrlen, -EINVAL); + + /* io_uring has no direct recvfrom prep helper, so emulate via recvmsg with a single-iovec + * msghdr. The kernel updates msg_namelen in place; we copy it back to *addrlen below. */ + struct iovec iov = { .iov_base = buf, .iov_len = len }; + struct msghdr msg = { + .msg_name = src_addr, + .msg_namelen = src_addr ? *addrlen : 0, + .msg_iov = &iov, + .msg_iovlen = 1, + }; + + n = fiber_io_operation(sockfd, EPOLLIN, recvfrom_callback, &(RecvmsgArgs) { + .msg = &msg, + .flags = flags, + }); + if (n < 0) + return n; + + if (addrlen) + *addrlen = msg.msg_namelen; + + return n; +} + +static ssize_t sendto_callback(int fd, void *args) { + SendmsgArgs *a = ASSERT_PTR(args); + ssize_t n; + + n = sendmsg(fd, a->msg, a->flags); + return n >= 0 ? n : -errno; +} + +ssize_t sd_fiber_sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { + assert_return(sockfd >= 0, -EBADF); + assert_return(buf || len == 0, -EINVAL); + + struct iovec iov = { .iov_base = (void *) buf, .iov_len = len }; + struct msghdr msg = { + .msg_name = (void *) dest_addr, + .msg_namelen = dest_addr ? addrlen : 0, + .msg_iov = &iov, + .msg_iovlen = 1, + }; + + return fiber_io_operation(sockfd, EPOLLOUT, sendto_callback, &(SendmsgArgs) { + .msg = &msg, + .flags = flags, + }); +} + +typedef struct AcceptArgs { + struct sockaddr *addr; + socklen_t *addrlen; + int flags; +} AcceptArgs; + +static ssize_t accept_callback(int fd, void *args) { + AcceptArgs *a = ASSERT_PTR(args); + + return RET_NERRNO(accept4(fd, a->addr, a->addrlen, a->flags)); +} + +int sd_fiber_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { + assert_return(sockfd >= 0, -EBADF); + + return fiber_io_operation(sockfd, EPOLLIN, accept_callback, &(AcceptArgs) { + .addr = addr, + .addrlen = addrlen, + .flags = flags, + }); +} + +int sd_fiber_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask) { + int r; + + assert_return(fds || n_fds == 0, -EINVAL); + + if (!sd_fiber_is_running()) + return RET_NERRNO(ppoll(fds, n_fds, timeout, sigmask)); + + /* When on a fiber signals are handled via sd-event hence we should never mess around with the + * signal mask when running on a fiber. */ + assert_return(!sigmask, -EOPNOTSUPP); + + sd_event *e = sd_fiber_get_event(); + assert(e); + + /* No fds to wait on and no timeout means there's nothing that could ever wake the fiber up, + * since unlike raw ppoll() we cannot use signal delivery as a wakeup. Signals received while + * the fiber is suspended are handled by sd-event via signalfd, in which case the signal handler + * is expected to cancel the fiber via sd_future_cancel() if a wakeup is desired. */ + if (n_fds == 0 && !timeout) + return -EINVAL; + + bool zero_timeout = timeout && timeout->tv_sec == 0 && timeout->tv_nsec == 0; + + /* Try polling with zero timeout first to see if any are immediately ready. */ + r = RET_NERRNO(ppoll(fds, n_fds, &(const struct timespec) {}, /* sigmask= */ NULL)); + if (zero_timeout || r != 0) /* Either error or some fds are ready */ + return r; + + sd_future **futures = NULL; + CLEANUP_ARRAY(futures, n_fds, sd_future_cancel_wait_unref_array); + + futures = new0(sd_future*, n_fds); + if (!futures) + return -ENOMEM; + + /* Set up I/O event sources for all valid fds. POLL* and EPOLL* share their bit values (see + * EPOLL_POLL_COMMON_MASK in io-util.h), so we can pass the user-supplied event mask through + * to either backend without translation. */ + size_t n_io_futures = 0; + for (size_t i = 0; i < n_fds; i++) { + if (fds[i].fd < 0) + continue; + + uint32_t events = fds[i].events & EPOLL_POLL_COMMON_MASK; + if (events == 0) + continue; + + r = future_new_io(e, fds[i].fd, events, &futures[i]); + if (r < 0) + return r; + + n_io_futures++; + } + + /* A timeout that overflows usec_t saturates to USEC_INFINITY in timespec_load(); treat that + * like "no timeout" (matches sd_fiber_sleep(USEC_INFINITY)) rather than letting + * sd_event_add_time_relative() reject it with -EOVERFLOW — standard ppoll() would just + * wait a very long time. */ + usec_t usec = timeout ? timespec_load(timeout) : USEC_INFINITY; + + /* If every fd was skipped (negative or empty event mask) and we'd have no timer, there's + * nothing that could ever wake the fiber up — same situation as n_fds == 0 && !timeout, + * just not detectable upfront. Refuse rather than suspend forever. */ + if (n_io_futures == 0 && usec == USEC_INFINITY) + return -EINVAL; + + _cleanup_(sd_future_cancel_wait_unrefp) sd_future *timer = NULL; + if (usec != USEC_INFINITY) { + r = future_new_time_relative( + e, + CLOCK_MONOTONIC, + usec, + /* accuracy= */ 1, + /* result= */ 0, + &timer); + if (r < 0) + return r; + } + + r = sd_fiber_suspend(); + if (r < 0 && r != -ETIME) + return r; + + /* Always sweep fds with a non-blocking ppoll(): the timer and an fd readiness can resolve in + * the same event-loop tick (or the fd can become ready between the timer firing and us being + * scheduled), and ppoll() semantics give events precedence over the timeout in that case. */ + int n = RET_NERRNO(ppoll(fds, n_fds, &(const struct timespec) {}, /* sigmask= */ NULL)); + if (n != 0) + return n; + + /* No fds ready: distinguish our own timer from an external -ETIME. */ + if (timer && sd_future_state(timer) == SD_FUTURE_RESOLVED) + return 0; + + /* An IO future resolved with a revents mask (r > 0) but the readiness was already consumed + * by the time we swept — report 0 rather than leaking the bitmask as a (bogus) ppoll fd + * count to the caller. */ + if (r > 0) + return 0; + + return r; +} diff --git a/src/libsystemd/sd-future/test-fiber-io.c b/src/libsystemd/sd-future/test-fiber-io.c new file mode 100644 index 00000000000..aef38950b63 --- /dev/null +++ b/src/libsystemd/sd-future/test-fiber-io.c @@ -0,0 +1,1388 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include +#include +#include +#include + +#include "sd-event.h" +#include "sd-future.h" + +#include "fd-util.h" +#include "tests.h" +#include "time-util.h" + +/* Test: Basic pipe I/O with sd-event */ + +typedef struct PipeIOContext { + int *pipefd; + int order; +} PipeIOContext; + +static int pipe_read_fiber(void *userdata) { + PipeIOContext *ctx = ASSERT_PTR(userdata); + char buf[64]; + ssize_t n; + + n = sd_fiber_read(ctx->pipefd[0], buf, sizeof(buf)); + if (n < 0) + return (int) n; + + /* Verify we read "hello" */ + if (n != 5 || memcmp(buf, "hello", 5) != 0) + return -EIO; + + return (int) n; +} + +TEST(fiber_io_basic) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + PipeIOContext ctx = { .pipefd = pipefd }; + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "pipe-read", pipe_read_fiber, &ctx, NULL, &f)); + + /* Write data to the pipe */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "hello", 5), 5); + + /* Run the scheduler - should process the I/O */ + ASSERT_OK(sd_event_loop(e)); + + /* Verify fiber read the data */ + ASSERT_OK_EQ(sd_future_result(f), 5); +} + +static int pipe_read_order_fiber(void *userdata) { + PipeIOContext *ctx = ASSERT_PTR(userdata); + char buf[64]; + ssize_t n; + + /* Record that the read fiber started before attempting the blocking read */ + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + n = sd_fiber_read(ctx->pipefd[0], buf, sizeof(buf)); + if (n < 0) + return (int) n; + + /* After resuming, verify the write fiber ran while we were suspended */ + ASSERT_EQ(ctx->order, 2); + + /* Verify we read "hello" */ + if (n != 5 || memcmp(buf, "hello", 5) != 0) + return -EIO; + + return (int) n; +} + +static int pipe_write_order_fiber(void *userdata) { + PipeIOContext *ctx = ASSERT_PTR(userdata); + + /* Verify the read fiber already ran and suspended before we started */ + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + return sd_fiber_write(ctx->pipefd[1], "hello", STRLEN("hello")); +} + +TEST(fiber_io_read_write) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + PipeIOContext ctx = { .pipefd = pipefd }; + + /* Higher priority for the read fiber, which will run first and then suspend because no data is + * available. The write fiber will run second, write data to the pipe, causing the read fiber to get + * resumed. */ + _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL; + ASSERT_OK(sd_fiber_new(e, "pipe-read", pipe_read_order_fiber, &ctx, NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 0)); + ASSERT_OK(sd_fiber_new(e, "pipe-write", pipe_write_order_fiber, &ctx, NULL, &fw)); + ASSERT_OK(sd_future_set_priority(fw, 1)); + + /* Run the scheduler - should process the I/O */ + ASSERT_OK(sd_event_loop(e)); + + /* Verify both fibers completed and the full read->suspend->write->resume sequence occurred */ + ASSERT_OK_EQ(sd_future_result(fr), 5); + ASSERT_OK_EQ(sd_future_result(fw), 5); +} + +/* Test: Multiple concurrent reads */ +static int concurrent_read_fiber(void *userdata) { + int *args = userdata; + int fd = args[0]; + int expected = args[1]; + char buf[64]; + ssize_t n; + + n = sd_fiber_read(fd, buf, sizeof buf); + if (n < 0) + return (int) n; + + if (n != 1 || buf[0] != (char) expected) + return -EIO; + + return 0; +} + +TEST(fiber_io_concurrent) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + sd_future *fibers[3] = {}; + CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear); + + /* Create 3 pipes and 3 fibers */ + int pipes[3][2]; + int args[3][2]; + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) { + ASSERT_OK_ERRNO(pipe2(pipes[i], O_CLOEXEC | O_NONBLOCK)); + args[i][0] = pipes[i][0]; + args[i][1] = 'A' + i; + ASSERT_OK(sd_fiber_new(e, "concurrent-read", concurrent_read_fiber, args[i], NULL, &fibers[i])); + } + + /* Write data in reverse order */ + ASSERT_EQ(write(pipes[2][1], "C", 1), 1); + ASSERT_EQ(write(pipes[1][1], "B", 1), 1); + ASSERT_EQ(write(pipes[0][1], "A", 1), 1); + + /* Run until all complete */ + ASSERT_OK(sd_event_loop(e)); + + /* All should complete successfully */ + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) { + ASSERT_OK(sd_future_result(fibers[i])); + safe_close_pair(pipes[i]); + } +} + +/* Test: Cancel fiber during I/O */ +static int blocking_read_fiber(void *userdata) { + int fd = PTR_TO_INT(userdata); + char buf[64]; + ssize_t n; + + n = sd_fiber_read(fd, buf, sizeof(buf)); + return (int) n; +} + +TEST(fiber_io_cancel) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "blocking-read", blocking_read_fiber, INT_TO_PTR(pipefd[0]), NULL, &f)); + + /* Run once - fiber will suspend on read */ + ASSERT_OK_POSITIVE(sd_event_run(e, 0)); + + /* Fiber should be suspended now - add explicit check via state tracking */ + + /* Cancel the fiber */ + ASSERT_OK(sd_future_cancel(f)); + + /* Run to completion */ + ASSERT_OK(sd_event_loop(e)); + + /* Should be cancelled */ + ASSERT_ERROR(sd_future_result(f), ECANCELED); +} + +TEST(fiber_io_fallback) { + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); /* Note: blocking pipe */ + + char buf[STRLEN("fallback")] = {}; + ASSERT_OK_EQ(sd_fiber_write(pipefd[1], "fallback", sizeof(buf)), (ssize_t) sizeof(buf)); + ASSERT_OK_EQ(sd_fiber_read(pipefd[0], buf, sizeof(buf)), (ssize_t) sizeof(buf)); +} + +static int pipe_readv_order_fiber(void *userdata) { + PipeIOContext *ctx = ASSERT_PTR(userdata); + char buf1[5], buf2[5]; + struct iovec iov[] = { + { .iov_base = buf1, .iov_len = sizeof(buf1) }, + { .iov_base = buf2, .iov_len = sizeof(buf2) }, + }; + ssize_t n; + + /* Record that the read fiber started before attempting the blocking read */ + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + /* This will initially block since no data is available */ + n = sd_fiber_readv(ctx->pipefd[0], iov, ELEMENTSOF(iov)); + if (n < 0) + return (int) n; + + /* After resuming, verify the write fiber ran while we were suspended */ + ASSERT_EQ(ctx->order, 2); + + if (n != 10 || memcmp(buf1, "fiber", 5) != 0 || memcmp(buf2, "readv", 5) != 0) + return -EIO; + + return (int) n; +} + +static int pipe_writev_order_fiber(void *userdata) { + PipeIOContext *ctx = ASSERT_PTR(userdata); + const char *part1 = "fiber"; + const char *part2 = "readv"; + struct iovec iov[] = { + { .iov_base = (void*) part1, .iov_len = 5 }, + { .iov_base = (void*) part2, .iov_len = 5 }, + }; + + /* Verify the read fiber already ran and suspended before we started */ + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + return sd_fiber_writev(ctx->pipefd[1], iov, ELEMENTSOF(iov)); +} + +TEST(fiber_io_readv_writev) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + PipeIOContext ctx = { .pipefd = pipefd }; + + /* Higher priority for the read fiber, which will run first and then suspend because no data is + * available. The write fiber will run second, write data to the pipe, causing the read fiber to get + * resumed. */ + _cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL; + ASSERT_OK(sd_fiber_new(e, "pipe-readv", pipe_readv_order_fiber, &ctx, NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 0)); + ASSERT_OK(sd_fiber_new(e, "pipe-writev", pipe_writev_order_fiber, &ctx, NULL, &fw)); + ASSERT_OK(sd_future_set_priority(fw, 1)); + + /* Run the scheduler - should process the I/O */ + ASSERT_OK(sd_event_loop(e)); + + /* Verify both fibers completed and the full read->suspend->write->resume sequence occurred */ + ASSERT_OK_EQ(sd_future_result(fr), 10); + ASSERT_OK_EQ(sd_future_result(fw), 10); +} + +static int concurrent_readv_fiber(void *userdata) { + int *args = userdata; + int fd = args[0]; + int expected1 = args[1]; + int expected2 = args[2]; + char buf1[1], buf2[1]; + struct iovec iov[] = { + { .iov_base = buf1, .iov_len = sizeof(buf1) }, + { .iov_base = buf2, .iov_len = sizeof(buf2) }, + }; + ssize_t n; + + n = sd_fiber_readv(fd, iov, ELEMENTSOF(iov)); + if (n < 0) + return (int) n; + + if (n != 2 || buf1[0] != (char) expected1 || buf2[0] != (char) expected2) + return -EIO; + + return 0; +} + +TEST(fiber_io_readv_concurrent) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + sd_future *fibers[3] = {}; + CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear); + + /* Create 3 pipes and 3 fibers */ + int pipes[3][2]; + int args[3][3]; + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) { + ASSERT_OK_ERRNO(pipe2(pipes[i], O_CLOEXEC | O_NONBLOCK)); + args[i][0] = pipes[i][0]; + args[i][1] = 'A' + i; + args[i][2] = 'a' + i; + ASSERT_OK(sd_fiber_new(e, "concurrent-readv", concurrent_readv_fiber, args[i], NULL, &fibers[i])); + } + + /* Write data in reverse order */ + ASSERT_EQ(write(pipes[2][1], "Cc", 2), 2); + ASSERT_EQ(write(pipes[1][1], "Bb", 2), 2); + ASSERT_EQ(write(pipes[0][1], "Aa", 2), 2); + + /* Run until all complete */ + ASSERT_OK(sd_event_loop(e)); + + /* All should complete successfully */ + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) { + ASSERT_OK(sd_future_result(fibers[i])); + safe_close_pair(pipes[i]); + } +} + +typedef struct SocketIOContext { + int *sockfd; + int order; +} SocketIOContext; + +static int socket_send_order_fiber(void *userdata) { + SocketIOContext *ctx = ASSERT_PTR(userdata); + + /* Verify the recv fiber already ran and suspended before we started */ + ASSERT_EQ(ctx->order, 1); + ctx->order = 2; + + return sd_fiber_send(ctx->sockfd[0], "socket", STRLEN("socket"), 0); +} + +static int socket_recv_order_fiber(void *userdata) { + SocketIOContext *ctx = ASSERT_PTR(userdata); + char buf[64]; + ssize_t n; + + /* Record that the recv fiber started before attempting the blocking recv */ + ASSERT_EQ(ctx->order, 0); + ctx->order = 1; + + n = sd_fiber_recv(ctx->sockfd[1], buf, sizeof(buf), 0); + if (n < 0) + return (int) n; + + /* After resuming, verify the send fiber ran while we were suspended */ + ASSERT_EQ(ctx->order, 2); + + /* Verify we received "socket" */ + if (n != 6 || memcmp(buf, "socket", 6) != 0) + return -EIO; + + return (int) n; +} + +TEST(fiber_io_recv_send) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd)); + + SocketIOContext ctx = { .sockfd = sockfd }; + + /* Higher priority for the recv fiber, which will run first and suspend */ + _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL; + ASSERT_OK(sd_fiber_new(e, "socket-recv", socket_recv_order_fiber, &ctx, NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 0)); + ASSERT_OK(sd_fiber_new(e, "socket-send", socket_send_order_fiber, &ctx, NULL, &fs)); + ASSERT_OK(sd_future_set_priority(fs, 1)); + + ASSERT_OK(sd_event_loop(e)); + + /* Verify both fibers completed and the full recv->suspend->send->resume sequence occurred */ + ASSERT_OK_EQ(sd_future_result(fr), 6); + ASSERT_OK_EQ(sd_future_result(fs), 6); +} + +static int socket_recv_peek_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + char buf1[64], buf2[64]; + ssize_t n1, n2; + + /* First peek at the data */ + n1 = sd_fiber_recv(sockfd, buf1, sizeof(buf1), MSG_PEEK); + if (n1 < 0) + return (int) n1; + + /* Then actually read it */ + n2 = sd_fiber_recv(sockfd, buf2, sizeof(buf2), 0); + if (n2 < 0) + return (int) n2; + + /* Both should have read the same data */ + if (n1 != n2 || memcmp(buf1, buf2, n1) != 0) + return -EIO; + + if (n1 != 4 || memcmp(buf1, "peek", 4) != 0) + return -EIO; + + return 0; +} + +TEST(fiber_io_recv_peek) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "socket-recv-peek", socket_recv_peek_fiber, INT_TO_PTR(sockfd[1]), NULL, &f)); + + /* Write data to the socket */ + ASSERT_OK_EQ_ERRNO(write(sockfd[0], "peek", 4), 4); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +static int socket_connect_fiber(void *userdata) { + struct sockaddr_un *addr = userdata; + _cleanup_close_ int sockfd = -EBADF; + + sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (sockfd < 0) + return -errno; + + return sd_fiber_connect(sockfd, (struct sockaddr*) addr, sizeof(*addr)); +} + +TEST(fiber_io_connect) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create listening socket with abstract namespace */ + _cleanup_close_ int listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + ASSERT_OK(listen_fd); + + /* Use abstract socket (starts with null byte) */ + struct sockaddr_un addr = { + .sun_family = AF_UNIX, + }; + addr.sun_path[0] = '\0'; + snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-connect-%d", getpid()); + + ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr))); + ASSERT_OK(listen(listen_fd, 1)); + + /* Create fiber to connect */ + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "socket-connect", socket_connect_fiber, &addr, NULL, &f)); + + /* Run the event loop - connection should complete */ + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +static int socket_sendmsg_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + struct iovec iov = { + .iov_base = (void*) "message", + .iov_len = STRLEN("message"), + }; + struct msghdr msg = { + .msg_iov = &iov, + .msg_iovlen = 1, + }; + + return sd_fiber_sendmsg(sockfd, &msg, 0); +} + +static int socket_recvmsg_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + char buf[64]; + struct iovec iov = { + .iov_base = buf, + .iov_len = sizeof(buf), + }; + struct msghdr msg = { + .msg_iov = &iov, + .msg_iovlen = 1, + }; + ssize_t n; + + n = sd_fiber_recvmsg(sockfd, &msg, 0); + if (n < 0) + return (int) n; + + if (n != 7 || memcmp(buf, "message", 7) != 0) + return -EIO; + + return (int) n; +} + +TEST(fiber_io_recvmsg_sendmsg) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd)); + + _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL; + ASSERT_OK(sd_fiber_new(e, "socket-recvmsg", socket_recvmsg_fiber, INT_TO_PTR(sockfd[1]), NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 1)); + ASSERT_OK(sd_fiber_new(e, "socket-sendmsg", socket_sendmsg_fiber, INT_TO_PTR(sockfd[0]), NULL, &fs)); + ASSERT_OK(sd_future_set_priority(fs, 0)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK_EQ(sd_future_result(fr), 7); + ASSERT_OK_EQ(sd_future_result(fs), 7); +} + +static int socket_sendto_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + + /* For socketpair dgram sockets, we can use NULL address since they're connected */ + return sd_fiber_sendto(sockfd, "datagram", STRLEN("datagram"), 0, NULL, 0); +} + +static int socket_recvfrom_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + char buf[64]; + struct sockaddr_un addr; + socklen_t addr_len = sizeof(addr); + ssize_t n; + + n = sd_fiber_recvfrom(sockfd, buf, sizeof(buf), 0, + (struct sockaddr*) &addr, &addr_len); + if (n < 0) + return (int) n; + + if (n != 8 || memcmp(buf, "datagram", 8) != 0) + return -EIO; + + return (int) n; +} + +TEST(fiber_io_recvfrom_sendto) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd)); + + _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL; + ASSERT_OK(sd_fiber_new(e, "socket-recvfrom", socket_recvfrom_fiber, INT_TO_PTR(sockfd[1]), NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 1)); + ASSERT_OK(sd_fiber_new(e, "socket-sendto", socket_sendto_fiber, INT_TO_PTR(sockfd[0]), NULL, &fs)); + ASSERT_OK(sd_future_set_priority(fs, 0)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK_EQ(sd_future_result(fr), 8); + ASSERT_OK_EQ(sd_future_result(fs), 8); +} + +static int socket_sendmsg_fd_fiber(void *userdata) { + int *args = userdata; + int sockfd = args[0]; + int fd_to_send = args[1]; + struct iovec iov = { + .iov_base = (void*) "X", + .iov_len = 1, + }; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(int))]; + } control = {}; + struct msghdr msg = { + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = &control, + .msg_controllen = sizeof(control), + }; + struct cmsghdr *cmsg; + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + memcpy(CMSG_DATA(cmsg), &fd_to_send, sizeof(int)); + + return sd_fiber_sendmsg(sockfd, &msg, 0); +} + +static int socket_recvmsg_fd_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + char buf[1]; + struct iovec iov = { + .iov_base = buf, + .iov_len = sizeof(buf), + }; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(int))]; + } control = {}; + struct msghdr msg = { + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = &control, + .msg_controllen = sizeof(control), + }; + struct cmsghdr *cmsg; + int received_fd; + ssize_t n; + + n = sd_fiber_recvmsg(sockfd, &msg, 0); + if (n < 0) + return (int) n; + + if (n != 1 || buf[0] != 'X') + return -EIO; + + /* Extract the file descriptor */ + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg || cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) + return -EIO; + + memcpy(&received_fd, CMSG_DATA(cmsg), sizeof(int)); + + /* Verify we can use the fd */ + if (fcntl(received_fd, F_GETFD) < 0) + return -errno; + + close(received_fd); + return 0; +} + +TEST(fiber_io_sendmsg_recvmsg_fd) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd)); + + /* Create a test file descriptor to send */ + _cleanup_close_ int test_fd = open("/dev/null", O_RDONLY | O_CLOEXEC); + ASSERT_OK_ERRNO(test_fd); + + _cleanup_(sd_future_unrefp) sd_future *fs = NULL, *fr = NULL; + int args[2] = { sockfd[0], test_fd }; + ASSERT_OK(sd_fiber_new(e, "socket-recvmsg-fd", socket_recvmsg_fd_fiber, INT_TO_PTR(sockfd[1]), NULL, &fr)); + ASSERT_OK(sd_future_set_priority(fr, 1)); + ASSERT_OK(sd_fiber_new(e, "socket-sendmsg-fd", socket_sendmsg_fd_fiber, args, NULL, &fs)); + ASSERT_OK(sd_future_set_priority(fs, 0)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK(sd_future_result(fr)); + ASSERT_OK_EQ(sd_future_result(fs), 1); +} + +TEST(fiber_io_socket_fallback) { + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + char buf[STRLEN("fallback")] = {}; + + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, sockfd)); + + /* Test send/recv without fiber context */ + ASSERT_OK_EQ(sd_fiber_send(sockfd[0], "fallback", sizeof(buf), 0), (ssize_t) sizeof(buf)); + ASSERT_OK_EQ(sd_fiber_recv(sockfd[1], buf, sizeof(buf), 0), (ssize_t) sizeof(buf)); + + /* Test sendto/recvfrom without fiber context */ + ASSERT_OK_EQ(sd_fiber_sendto(sockfd[0], "fallback", sizeof(buf), 0, NULL, 0), (ssize_t) sizeof(buf)); + ASSERT_OK_EQ(sd_fiber_recvfrom(sockfd[1], buf, sizeof(buf), 0, NULL, NULL), (ssize_t) sizeof(buf)); +} + +static int blocking_recv_fiber(void *userdata) { + int sockfd = PTR_TO_INT(userdata); + char buf[64]; + + return sd_fiber_recv(sockfd, buf, sizeof(buf), 0); +} + +TEST(fiber_io_socket_cancel) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int sockfd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, sockfd)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "blocking-recv", blocking_recv_fiber, INT_TO_PTR(sockfd[0]), NULL, &f)); + + /* Run once - fiber will suspend on recv */ + ASSERT_OK_POSITIVE(sd_event_run(e, 0)); + + /* Cancel the fiber */ + ASSERT_OK(sd_future_cancel(f)); + + /* Run to completion */ + ASSERT_OK(sd_event_loop(e)); + + /* Should be cancelled */ + ASSERT_ERROR(sd_future_result(f), ECANCELED); +} + +/* Test: Basic accept operation */ +static int accept_fiber(void *userdata) { + int listen_fd = PTR_TO_INT(userdata); + struct sockaddr_un addr; + socklen_t addr_len = sizeof(addr); + int client_fd; + + client_fd = sd_fiber_accept(listen_fd, (struct sockaddr*) &addr, &addr_len, SOCK_CLOEXEC); + if (client_fd < 0) + return client_fd; + + close(client_fd); + return 0; +} + +TEST(fiber_io_accept_basic) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create listening socket with abstract namespace */ + _cleanup_close_ int listen_fd = -EBADF; + ASSERT_OK_ERRNO(listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)); + + struct sockaddr_un addr = { + .sun_family = AF_UNIX, + }; + addr.sun_path[0] = '\0'; + snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-accept-%d", getpid()); + + ASSERT_OK_ERRNO(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr))); + ASSERT_OK_ERRNO(listen(listen_fd, 1)); + + /* Create fiber to accept connection */ + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "accept", accept_fiber, INT_TO_PTR(listen_fd), NULL, &f)); + + /* Connect from outside fiber context */ + _cleanup_close_ int connect_fd = -EBADF; + ASSERT_OK(connect_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)); + ASSERT_OK(connect(connect_fd, (struct sockaddr*) &addr, sizeof(addr))); + + /* Run the event loop - accept should complete */ + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: Multiple sequential accepts */ +static int accept_multiple_fiber(void *userdata) { + int listen_fd = PTR_TO_INT(userdata); + struct sockaddr_un addr; + socklen_t addr_len; + int count = 0; + + for (int i = 0; i < 3; i++) { + _cleanup_close_ int client_fd = -EBADF; + + addr_len = sizeof(addr); + client_fd = sd_fiber_accept(listen_fd, (struct sockaddr*) &addr, &addr_len, SOCK_CLOEXEC); + if (client_fd < 0) + return client_fd; + + count++; + } + + return count; +} + +TEST(fiber_io_accept_multiple) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create listening socket */ + _cleanup_close_ int listen_fd = -EBADF; + ASSERT_OK(listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)); + + struct sockaddr_un addr = { + .sun_family = AF_UNIX, + }; + addr.sun_path[0] = '\0'; + snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-accept-multi-%d", getpid()); + + ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr))); + ASSERT_OK(listen(listen_fd, 5)); + + /* Create fiber to accept multiple connections */ + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "accept-multiple", accept_multiple_fiber, INT_TO_PTR(listen_fd), NULL, &f)); + + /* Connect multiple times */ + int connect_fds[3] = { -EBADF, -EBADF, -EBADF }; + for (size_t i = 0; i < 3; i++) { + connect_fds[i] = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + ASSERT_OK(connect_fds[i]); + ASSERT_OK(connect(connect_fds[i], (struct sockaddr*) &addr, sizeof(addr))); + } + + /* Run the event loop */ + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK_EQ(sd_future_result(f), 3); + + /* Clean up connection fds */ + for (size_t i = 0; i < 3; i++) + safe_close(connect_fds[i]); +} + +/* Test: Accept and exchange data */ +static int accept_and_read_fiber(void *userdata) { + int listen_fd = PTR_TO_INT(userdata); + _cleanup_close_ int client_fd = -EBADF; + char buf[64]; + ssize_t n; + + client_fd = sd_fiber_accept(listen_fd, NULL, NULL, SOCK_CLOEXEC); + if (client_fd < 0) + return client_fd; + + n = sd_fiber_read(client_fd, buf, sizeof(buf)); + if (n < 0) + return (int) n; + + if (n != 5 || memcmp(buf, "hello", 5) != 0) + return -EIO; + + return 0; +} + +TEST(fiber_io_accept_and_read) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create listening socket */ + _cleanup_close_ int listen_fd = -EBADF; + ASSERT_OK(listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)); + + struct sockaddr_un addr = { + .sun_family = AF_UNIX, + }; + addr.sun_path[0] = '\0'; + snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-accept-read-%d", getpid()); + + ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr))); + ASSERT_OK(listen(listen_fd, 1)); + + /* Create fiber to accept and read */ + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "accept-and-read", accept_and_read_fiber, INT_TO_PTR(listen_fd), NULL, &f)); + + /* Connect and send data */ + _cleanup_close_ int connect_fd = -EBADF; + ASSERT_OK(connect_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)); + ASSERT_OK(connect(connect_fd, (struct sockaddr*) &addr, sizeof(addr))); + ASSERT_OK_EQ_ERRNO(write(connect_fd, "hello", 5), 5); + + /* Run the event loop */ + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: poll with single fd ready immediately */ +static int poll_immediate_fiber(void *userdata) { + int *pipefd = userdata; + struct pollfd fds[] = { + { .fd = pipefd[0], .events = POLLIN }, + }; + int r; + + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL); + if (r < 0) + return r; + + /* Should have one fd ready */ + if (r != 1) + return -EIO; + + if (!(fds[0].revents & POLLIN)) + return -EIO; + + return 0; +} + +TEST(fiber_poll_immediate) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + /* Write data before creating fiber */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "X", 1), 1); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-immediate", poll_immediate_fiber, pipefd, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: poll with fd that becomes ready after suspension */ +static int poll_fiber(void *userdata) { + int *pipefd = userdata; + struct pollfd fds[] = { + { .fd = pipefd[0], .events = POLLIN }, + }; + int r; + + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL); + if (r < 0) + return r; + + if (r != 1 || !(fds[0].revents & POLLIN)) + return -EIO; + + /* Read the data */ + char buf[1]; + if (read(pipefd[0], buf, 1) != 1 || buf[0] != 'Y') + return -EIO; + + return 0; +} + +TEST(fiber_poll) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-suspend", poll_fiber, pipefd, NULL, &f)); + + /* Run once - fiber will suspend on poll */ + ASSERT_OK_POSITIVE(sd_event_run(e, 0)); + + /* Write data to wake it up */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "Y", 1), 1); + + /* Complete execution */ + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: poll with multiple fds */ +static int poll_multiple_fiber(void *userdata) { + int (*pipes)[2] = userdata; + struct pollfd fds[] = { + { .fd = pipes[0][0], .events = POLLIN }, + { .fd = pipes[1][0], .events = POLLIN }, + { .fd = pipes[2][0], .events = POLLIN }, + }; + int r; + + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL); + if (r < 0) + return r; + + /* Should have all three ready */ + if (r != 3) + return -EIO; + + for (size_t i = 0; i < 3; i++) { + if (!(fds[i].revents & POLLIN)) + return -EIO; + + char buf[1]; + if (read(fds[i].fd, buf, 1) != 1 || buf[0] != (char) ('A' + i)) + return -EIO; + } + + return 0; +} + +TEST(fiber_poll_multiple) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create three pipes */ + int pipes[3][2]; + for (size_t i = 0; i < 3; i++) + ASSERT_OK_ERRNO(pipe2(pipes[i], O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-multiple", poll_multiple_fiber, pipes, NULL, &f)); + + /* Run once - fiber will suspend waiting for data */ + ASSERT_OK_POSITIVE(sd_event_run(e, 0)); + + /* Write to all three pipes in different order */ + ASSERT_OK_EQ_ERRNO(write(pipes[2][1], "C", 1), 1); + ASSERT_OK_EQ_ERRNO(write(pipes[0][1], "A", 1), 1); + ASSERT_OK_EQ_ERRNO(write(pipes[1][1], "B", 1), 1); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); + + for (size_t i = 0; i < 3; i++) + safe_close_pair(pipes[i]); +} + +/* Test: poll with POLLOUT (write readiness) */ +static int poll_pollout_fiber(void *userdata) { + int *pipefd = userdata; + struct pollfd fds[] = { + { .fd = pipefd[1], .events = POLLOUT }, + }; + int r; + + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL); + if (r < 0) + return r; + + if (r != 1 || !(fds[0].revents & POLLOUT)) + return -EIO; + + /* Pipe should be writable */ + if (write(pipefd[1], "Z", 1) != 1) + return -errno; + + return 0; +} + +TEST(fiber_poll_pollout) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-pollout", poll_pollout_fiber, pipefd, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); + + /* Verify data was written */ + char buf[1]; + ASSERT_OK_EQ_ERRNO(read(pipefd[0], buf, 1), 1); + ASSERT_EQ(buf[0], 'Z'); +} + +/* Test: poll with timeout that expires */ +static int poll_timeout_fiber(void *userdata) { + int *pipefd = userdata; + struct pollfd fds[] = { + { .fd = pipefd[0], .events = POLLIN }, + }; + int r; + + /* Poll with 100ms timeout - no data will arrive */ + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), &(struct timespec) { .tv_nsec = 100 * NSEC_PER_MSEC }, NULL); + if (r < 0) + return r; + + /* Should timeout with no fds ready */ + if (r != 0) + return -EIO; + + return 0; +} + +TEST(fiber_poll_timeout) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-timeout", poll_timeout_fiber, pipefd, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: poll with zero timeout (should not block) */ +static int poll_zero_timeout_fiber(void *userdata) { + int *pipefd = userdata; + struct pollfd fds[] = { + { .fd = pipefd[0], .events = POLLIN }, + }; + int r; + + /* Poll with zero timeout - should return immediately */ + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), &(struct timespec) {}, NULL); + if (r < 0) + return r; + + /* No data available, so should return 0 */ + if (r != 0) + return -EIO; + + /* Now write data */ + if (write(pipefd[1], "Q", 1) != 1) + return -errno; + + /* Poll again with zero timeout - should see data */ + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL); + if (r < 0) + return r; + + if (r != 1 || !(fds[0].revents & POLLIN)) + return -EIO; + + return 0; +} + +TEST(fiber_poll_zero_timeout) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-zero-timeout", poll_zero_timeout_fiber, pipefd, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: poll with zero fds and zero timeout (should return immediately) */ +static int poll_zero_fds_fiber(void *userdata) { + return sd_fiber_ppoll(NULL, 0, &(struct timespec) {}, NULL); +} + +TEST(fiber_poll_zero_fds) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-zero-fds", poll_zero_fds_fiber, NULL, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK_EQ(sd_future_result(f), 0); +} + +/* Test: poll with zero fds and no timeout has no possible wakeup, must reject with -EINVAL */ +static int poll_zero_fds_no_timeout_fiber(void *userdata) { + return sd_fiber_ppoll(NULL, 0, NULL, NULL); +} + +TEST(fiber_poll_zero_fds_no_timeout) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-zero-fds-no-timeout", poll_zero_fds_no_timeout_fiber, NULL, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_ERROR(sd_future_result(f), EINVAL); +} + +/* Test: poll with negative fd (should be ignored) */ +static int poll_negative_fd_fiber(void *userdata) { + int *pipefd = userdata; + struct pollfd fds[] = { + { .fd = -1, .events = POLLIN }, + { .fd = pipefd[0], .events = POLLIN }, + }; + int r; + + r = sd_fiber_ppoll(fds, ELEMENTSOF(fds), NULL, NULL); + if (r < 0) + return r; + + /* Only the second fd should be ready */ + if (r != 1 || !(fds[1].revents & POLLIN)) + return -EIO; + + /* First fd should have no events */ + if (fds[0].revents != 0) + return -EIO; + + return 0; +} + +TEST(fiber_poll_negative_fd) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + /* Write data before creating fiber */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "N", 1), 1); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "poll-negative-fd", poll_negative_fd_fiber, pipefd, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +/* Test: Multiple fibers waiting on the same fd */ +typedef struct SharedFdArgs { + int pipefd; + int *counter; +} SharedFdArgs; + +static int shared_fd_read_fiber(void *userdata) { + SharedFdArgs *args = ASSERT_PTR(userdata); + char buf[1]; + ssize_t n; + + n = sd_fiber_read(args->pipefd, buf, sizeof(buf)); + if (n < 0) + return (int) n; + + if (n != 1) + return -EIO; + + /* Increment counter to track successful reads */ + (*args->counter)++; + + return 0; +} + +TEST(fiber_io_same_fd_multiple_fibers) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK)); + + /* Create 3 fibers all waiting on the same pipe read end */ + sd_future *fibers[3] = {}; + CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear); + SharedFdArgs args[3]; + int counter = 0; + + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) { + args[i].pipefd = pipefd[0]; + args[i].counter = &counter; + ASSERT_OK(sd_fiber_new(e, "shared-fd-read", shared_fd_read_fiber, &args[i], NULL, &fibers[i])); + } + + /* All fibers should suspend waiting for data */ + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) + ASSERT_OK_POSITIVE(sd_event_run(e, 0)); + + /* Write 3 bytes - each byte will wake one fiber */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "ABC", 3), 3); + + /* Run until all fibers complete */ + ASSERT_OK(sd_event_loop(e)); + + /* All should complete successfully and each should have read one byte */ + for (size_t i = 0; i < ELEMENTSOF(fibers); i++) + ASSERT_OK(sd_future_result(fibers[i])); + + ASSERT_EQ(counter, 3); +} + +static int blocking_fd_preserve_fiber(void *userdata) { + int *pipefd = ASSERT_PTR(userdata); + char buf[8] = {}; + ssize_t n; + + /* The pipe has data pre-filled, so this should succeed immediately on the fast path. + * This exercises the fd blocking state restore: fiber_io_operation() temporarily sets the fd + * to nonblocking, and must restore it to blocking on the success path. */ + n = sd_fiber_read(pipefd[0], buf, sizeof(buf)); + if (n < 0) + return (int) n; + + if ((size_t) n != sizeof(buf) || memcmp(buf, "blocking", sizeof(buf)) != 0) + return -EIO; + + return 0; +} + +TEST(fiber_io_blocking_fd_preserved) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create a blocking pipe (no O_NONBLOCK) */ + _cleanup_close_pair_ int pipefd[2] = EBADF_PAIR; + ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC)); + + /* Pre-fill the pipe so the read will succeed immediately */ + ASSERT_OK_EQ_ERRNO(write(pipefd[1], "blocking", 8), 8); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "blocking-fd-preserve", blocking_fd_preserve_fiber, pipefd, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); + + /* Verify the read end is still in blocking mode after the fiber completed */ + ASSERT_OK_ZERO(fd_nonblock(pipefd[0], false)); +} + +static int socket_connect_blocking_fiber(void *userdata) { + struct sockaddr_un *addr = userdata; + _cleanup_close_ int sockfd = -EBADF; + + /* Use a blocking socket (no SOCK_NONBLOCK). sd_fiber_connect() should temporarily set it + * to nonblocking, handle the EINPROGRESS path with getsockopt(SO_ERROR), and restore + * the blocking state. */ + sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (sockfd < 0) + return -errno; + + int r = sd_fiber_connect(sockfd, (struct sockaddr*) addr, sizeof(*addr)); + if (r < 0) + return r; + + /* Verify the socket is back in blocking mode */ + r = fd_nonblock(sockfd, false); + if (r < 0) + return r; + if (r > 0) + return -EBUSY; /* fd was nonblocking, but should have been restored to blocking */ + + return 0; +} + +TEST(fiber_io_connect_blocking) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + /* Create listening socket */ + _cleanup_close_ int listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + ASSERT_OK(listen_fd); + + struct sockaddr_un addr = { + .sun_family = AF_UNIX, + }; + addr.sun_path[0] = '\0'; + snprintf(addr.sun_path + 1, sizeof(addr.sun_path) - 1, "test-fiber-connect-blocking-%d", getpid()); + + ASSERT_OK(bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr))); + ASSERT_OK(listen(listen_fd, 1)); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + ASSERT_OK(sd_fiber_new(e, "connect-blocking", socket_connect_blocking_fiber, &addr, NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + ASSERT_OK(sd_future_result(f)); +} + +DEFINE_TEST_MAIN(LOG_DEBUG); diff --git a/src/systemd/sd-future.h b/src/systemd/sd-future.h index 9d0d03acf48..5e0fa225256 100644 --- a/src/systemd/sd-future.h +++ b/src/systemd/sd-future.h @@ -17,10 +17,18 @@ along with systemd; If not, see . ***/ +#include + #include "_sd-common.h" _SD_BEGIN_DECLARATIONS; +struct iovec; +struct pollfd; +struct sockaddr; +struct msghdr; +struct timespec; + typedef struct sd_event sd_event; typedef struct sd_future sd_future; typedef struct sd_future_ops sd_future_ops; @@ -96,6 +104,23 @@ sd_future* sd_fiber_timeout(uint64_t timeout); _SD_CONCATENATE(_sd_fto_b_, uniq); \ _SD_CONCATENATE(_sd_fto_b_, uniq) = NULL) +/* Fiber I/O operations - use sd-event for non-blocking I/O when in fiber context */ +ssize_t sd_fiber_read(int fd, void *buf, size_t count); +ssize_t sd_fiber_write(int fd, const void *buf, size_t count); +ssize_t sd_fiber_readv(int fd, const struct iovec *iov, int iovcnt); +ssize_t sd_fiber_writev(int fd, const struct iovec *iov, int iovcnt); +ssize_t sd_fiber_recv(int sockfd, void *buf, size_t len, int flags); +ssize_t sd_fiber_send(int sockfd, const void *buf, size_t len, int flags); +int sd_fiber_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +ssize_t sd_fiber_recvmsg(int sockfd, struct msghdr *msg, int flags); +ssize_t sd_fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags); +ssize_t sd_fiber_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); +ssize_t sd_fiber_sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); +int sd_fiber_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); +#ifndef __STRICT_ANSI__ +int sd_fiber_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask); +#endif + _SD_END_DECLARATIONS; #endif