]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-bus: make sd-bus fiber-aware
authorDaan De Meyer <daan@amutable.com>
Mon, 11 May 2026 14:27:34 +0000 (16:27 +0200)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
Two changes to teach sd-bus how to behave when called from a fiber, in
order of increasing depth:

2. sd_bus_call() now redirects to a new bus_call_suspend() helper when
   the caller is a fiber whose event loop is the same one the bus is
   attached to. The plain bus_poll() path serializes all bus traffic on
   the slot's reply (only one method call can be in flight per
   sd_bus*), which would defeat the point of running multiple fibers
   against one bus. bus_call_suspend() builds on the async sd-bus API:
   it wraps the call in a new BusFuture (sd-bus/bus-future.{c,h}) that
   resolves when the reply or method-error arrives, lets the fiber
   await that future, and surfaces the reply to the caller via
   future_get_bus_reply(). Because the futures live on the event loop
   rather than a per-bus slot, multiple fibers can drive concurrent
   method calls against the same bus.

3. A new private SD_BUS_VTABLE_METHOD_FIBER flag dispatches a vtable
   method handler on its own fiber, so handlers are free to use
   sd_bus_call() against the same bus, sd_fiber_sleep(), loop_read(),
   etc. without stalling the event loop for other connections or
   handlers. The flag stays out of sd-bus-vtable.h (its bit value is
   reserved there to prevent collisions) — the fiber runtime is a
   systemd-internal implementation detail.

Lifecycle of fiber-dispatched handlers is tracked on the bus itself: a
new bus->fiber_futures set holds a ref to each in-flight handler.
bus_enter_closing() cancels every entry and process_closing() returns
with the bus still in CLOSING state until the set drains, so we can be
sure no fiber handler outlives the bus. bus_fiber_resolved() removes
the entry on completion. bus_free()'s assert(set_isempty()) makes the
invariant load-bearing.

Note that plain sd_bus_call() already works correctly on a fiber as it
calls ppoll_usec() which has already been modified to suspend when
running on a fiber.

To exercise these changes the existing thread-based client/server
sd-bus tests (test-bus-chat, test-bus-objects, test-bus-peersockaddr,
test-bus-server, test-bus-watch-bind) are migrated to fibers, and a
new test-bus-fiber is added that covers SD_BUS_VTABLE_METHOD_FIBER —
including handlers that issue nested sd_bus_call() on the same bus, the
cancel-on-close path, and concurrent dispatches across multiple fibers.

13 files changed:
src/libsystemd/meson.build
src/libsystemd/sd-bus/bus-future.c [new file with mode: 0644]
src/libsystemd/sd-bus/bus-future.h [new file with mode: 0644]
src/libsystemd/sd-bus/bus-internal.h
src/libsystemd/sd-bus/bus-objects.c
src/libsystemd/sd-bus/sd-bus.c
src/libsystemd/sd-bus/test-bus-chat.c
src/libsystemd/sd-bus/test-bus-fiber.c [new file with mode: 0644]
src/libsystemd/sd-bus/test-bus-objects.c
src/libsystemd/sd-bus/test-bus-peersockaddr.c
src/libsystemd/sd-bus/test-bus-server.c
src/libsystemd/sd-bus/test-bus-watch-bind.c
src/systemd/sd-bus-vtable.h

index 4e9fd28d44231a9181299a801030717d2c1c7e48..0cecea3b0d194e5cd9329eb0b9757abcf01ee91a 100644 (file)
@@ -49,6 +49,7 @@ sd_bus_sources = files(
         'sd-bus/bus-dump.c',
         'sd-bus/bus-dump-json.c',
         'sd-bus/bus-error.c',
+        'sd-bus/bus-future.c',
         'sd-bus/bus-internal.c',
         'sd-bus/bus-introspect.c',
         'sd-bus/bus-kernel.c',
@@ -185,6 +186,7 @@ libsystemd_pc = custom_target(
 
 simple_tests += files(
         'sd-bus/test-bus-creds.c',
+        'sd-bus/test-bus-fiber.c',
         'sd-bus/test-bus-introspect.c',
         'sd-bus/test-bus-match.c',
         'sd-bus/test-bus-vtable.c',
diff --git a/src/libsystemd/sd-bus/bus-future.c b/src/libsystemd/sd-bus/bus-future.c
new file mode 100644 (file)
index 0000000..d7037b3
--- /dev/null
@@ -0,0 +1,135 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include "sd-bus.h"
+#include "sd-future.h"
+
+#include "alloc-util.h"
+#include "bus-future.h"
+#include "bus-internal.h"
+#include "bus-message.h"
+
+typedef struct BusFuture {
+        sd_bus_slot *slot;
+        sd_bus_message *reply;
+} BusFuture;
+
+static void* bus_future_alloc(void) {
+        return new0(BusFuture, 1);
+}
+
+static void bus_future_free(sd_future *f) {
+        BusFuture *bf = ASSERT_PTR(sd_future_get_private(f));
+        sd_bus_slot_unref(bf->slot);
+        sd_bus_message_unref(bf->reply);
+        free(bf);
+}
+
+static int bus_future_cancel(sd_future *f) {
+        BusFuture *bf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+
+        bf->slot = sd_bus_slot_unref(bf->slot);
+        return sd_future_resolve(f, -ECANCELED);
+}
+
+static const sd_future_ops bus_future_ops = {
+        .size = sizeof(sd_future_ops),
+        .alloc = bus_future_alloc,
+        .free = bus_future_free,
+        .cancel = bus_future_cancel,
+};
+
+static int bus_future_handler(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+        sd_future *f = ASSERT_PTR(userdata);
+        BusFuture *bf = ASSERT_PTR(sd_future_get_private(f));
+        int r;
+
+        /* Resolve with 0 on a success reply and -errno (derived from the D-Bus error name) on a
+         * method error reply, so a caller awaiting the future learns about call failures from the
+         * resolution value alone. The reply itself is always stashed in bf->reply so
+         * future_get_bus_reply() can hand back the detailed sd_bus_error (name + message) on
+         * top of the bare errno. Cancellation surfaces as -ECANCELED via bus_future_cancel(),
+         * with bf->reply left NULL — callers can distinguish "got an error reply" from "no reply
+         * will arrive" by whether future_get_bus_reply() can produce a message. */
+        bf->slot = sd_bus_slot_unref(bf->slot);
+        bf->reply = sd_bus_message_ref(m);
+
+        r = sd_bus_message_is_method_error(m, NULL) ? -sd_bus_message_get_errno(m) : 0;
+        return sd_future_resolve(f, r);
+}
+
+int bus_call_future(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_future **ret) {
+        int r;
+
+        assert(bus);
+        assert(m);
+        assert(ret);
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        r = sd_future_new(&bus_future_ops, &f);
+        if (r < 0)
+                return r;
+
+        BusFuture *bf = sd_future_get_private(f);
+
+        r = sd_bus_call_async(bus, &bf->slot, m, bus_future_handler, f, usec);
+        if (r < 0)
+                return r;
+
+        *ret = TAKE_PTR(f);
+        return 0;
+}
+
+int future_get_bus_reply(sd_future *f, sd_bus_error *reterr_error, sd_bus_message **ret_reply) {
+        BusFuture *bf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+        sd_bus_message *reply = ASSERT_PTR(bf->reply);
+
+        assert(sd_future_get_ops(f) == &bus_future_ops);
+        assert(sd_future_state(f) == SD_FUTURE_RESOLVED);
+
+        if (sd_bus_message_is_method_error(reply, NULL)) {
+                if (reterr_error)
+                        return sd_bus_error_copy(reterr_error, sd_bus_message_get_error(reply));
+                return -sd_bus_message_get_errno(reply);
+        }
+
+        if (reply->n_fds > 0 && !sd_bus_message_get_bus(reply)->accept_fd)
+                return sd_bus_error_set(reterr_error, SD_BUS_ERROR_INCONSISTENT_MESSAGE,
+                                        "Reply message contained file descriptors which I couldn't accept. Sorry.");
+
+        if (reterr_error)
+                *reterr_error = SD_BUS_ERROR_NULL;
+        if (ret_reply)
+                *ret_reply = sd_bus_message_ref(reply);
+
+        return 1;
+}
+
+int bus_call_suspend(
+                sd_bus *bus,
+                sd_bus_message *m,
+                uint64_t usec,
+                sd_bus_error *reterr_error,
+                sd_bus_message **ret_reply) {
+
+        int r;
+
+        assert(bus);
+        assert(m);
+        assert(sd_fiber_is_running());
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f = NULL;
+        r = bus_call_future(bus, m, usec, &f);
+        if (r < 0)
+                return sd_bus_error_set_errno(reterr_error, r);
+
+        r = sd_fiber_suspend();
+
+        /* If the future isn't resolved, the suspend was interrupted before a reply arrived (fiber
+         * cancelled, fiber-wide SD_FIBER_TIMEOUT scope expired, …). There's no reply to extract,
+         * so surface the resume error directly. When the future is resolved, future_get_bus_reply()
+         * recovers either the reply or the detailed sd_bus_error from the error reply. */
+        if (sd_future_state(f) != SD_FUTURE_RESOLVED)
+                return sd_bus_error_set_errno(reterr_error, r);
+
+        return future_get_bus_reply(f, reterr_error, ret_reply);
+}
diff --git a/src/libsystemd/sd-bus/bus-future.h b/src/libsystemd/sd-bus/bus-future.h
new file mode 100644 (file)
index 0000000..ec9bd80
--- /dev/null
@@ -0,0 +1,14 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+#include "sd-forward.h"
+
+int bus_call_future(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_future **ret);
+int future_get_bus_reply(sd_future *f, sd_bus_error *reterr_error, sd_bus_message **ret_reply);
+
+int bus_call_suspend(
+                sd_bus *bus,
+                sd_bus_message *m,
+                uint64_t usec,
+                sd_bus_error *reterr_error,
+                sd_bus_message **ret_reply);
index 19a3b67d12f6aeb69b9b6976d568c27668e99491..3a52f738d6bd7149953bba15915526119ae3cb07 100644 (file)
 #define DEFAULT_SYSTEM_BUS_ADDRESS "unix:path=/run/dbus/system_bus_socket"
 #define DEFAULT_USER_BUS_ADDRESS_FMT "unix:path=%s/bus"
 
+/* Private vtable flag: dispatch the method handler on its own fiber, so it can use suspending
+ * primitives (sd_bus_call() on a fiber, sd_fiber_sleep(), loop_read_suspend(), ...) without
+ * blocking the event loop for other connections or method calls. Kept out of the public
+ * sd-bus-vtable.h so the fiber runtime stays an implementation detail of systemd. The bit value is
+ * reserved in sd-bus-vtable.h to make sure it never collides with a future public flag. */
+#define SD_BUS_VTABLE_METHOD_FIBER (UINT64_C(1) << 10)
+
 typedef struct BusReplyCallback {
         sd_bus_message_handler_t callback;
         usec_t timeout_usec; /* this is a relative timeout until we reach the BUS_HELLO state, and an absolute one right after */
@@ -222,6 +229,12 @@ typedef struct sd_bus {
         Set *vtable_methods;
         Set *vtable_properties;
 
+        /* Futures for outstanding SD_BUS_VTABLE_METHOD_FIBER dispatches. Entries are added as the
+         * dispatcher spawns each fiber and removed when the fiber resolves. On bus_enter_closing()
+         * we cancel everything in here and then wait in process_closing() until the set drains,
+         * before tearing down the rest of the bus. */
+        Set *fiber_futures;
+
         union sockaddr_union sockaddr;
         socklen_t sockaddr_size;
 
index 83ba3a523992b8699d5dac1289ffea98ea9e4b1d..795cc688376557adfa45862c0a7991e12afb4e8a 100644 (file)
@@ -3,6 +3,7 @@
 #include <linux/capability.h>
 
 #include "sd-bus.h"
+#include "sd-future.h"
 
 #include "alloc-util.h"
 #include "bus-internal.h"
@@ -337,6 +338,67 @@ static int check_access(sd_bus *bus, sd_bus_message *m, BusVTableMember *c, sd_b
         return sd_bus_error_setf(reterr_error, SD_BUS_ERROR_ACCESS_DENIED, "Access to %s.%s() not permitted.", c->interface, c->member);
 }
 
+typedef struct BusFiberData {
+        sd_bus *bus;
+        sd_bus_message *message;
+        sd_bus_slot *slot;
+        sd_bus_message_handler_t handler;
+        void *userdata;
+} BusFiberData;
+
+static BusFiberData* bus_fiber_data_free(BusFiberData *d) {
+        if (!d)
+                return NULL;
+
+        sd_bus_slot_unref(d->slot);
+        sd_bus_message_unref(d->message);
+        sd_bus_unref(d->bus);
+        return mfree(d);
+}
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(BusFiberData*, bus_fiber_data_free);
+
+static void bus_fiber_data_destroy(void *userdata) {
+        bus_fiber_data_free(userdata);
+}
+
+static void bus_fiber_future_unref(void *p) {
+        sd_future_unref(p);
+}
+
+DEFINE_PRIVATE_HASH_OPS_WITH_KEY_DESTRUCTOR(
+                bus_fiber_future_hash_ops,
+                void,
+                trivial_hash_func,
+                trivial_compare_func,
+                bus_fiber_future_unref);
+
+static int bus_fiber_resolved(sd_future *f) {
+        sd_bus *bus = ASSERT_PTR(sd_future_get_userdata(f));
+
+        assert_se(set_remove(bus->fiber_futures, f) == f);
+        sd_future_unref(f);
+        return 0;
+}
+
+static int bus_fiber_entry(void *userdata) {
+        BusFiberData *d = ASSERT_PTR(userdata);
+        _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
+        int r;
+
+        /* Note: unlike the synchronous dispatch path, we deliberately do NOT set
+         * bus->current_slot/handler/userdata around the callback. Those fields track the slot of the
+         * message currently being dispatched inline and must be NULL at each entry into
+         * bus_process_internal(). Because a fiber handler can yield and let the event loop dispatch
+         * other messages before it resumes, leaving current_slot non-NULL across yields would trip
+         * that invariant. Fiber handlers receive their slot's userdata via the handler argument, so
+         * sd_bus_get_current_slot()/handler()/userdata() simply aren't meaningful inside them — the
+         * handler should use the message/userdata parameters directly instead. */
+        r = d->handler(d->message, d->userdata, &error);
+
+        return bus_maybe_reply_error(d->message, r, &error);
+}
+
 static int method_callbacks_run(
                 sd_bus *bus,
                 sd_bus_message *m,
@@ -407,6 +469,53 @@ static int method_callbacks_run(
 
                 slot = container_of(c->parent, sd_bus_slot, node_vtable);
 
+                if (FLAGS_SET(c->vtable->flags, SD_BUS_VTABLE_METHOD_FIBER)) {
+                        /* A fiber-dispatched method requires an event loop to spawn the fiber on.
+                         * By the time a method call actually arrives the bus is running, so the
+                         * event loop should already be attached — if not, the caller set up the bus
+                         * wrong and there's no meaningful recovery. */
+                        assert(bus->event);
+
+                        _cleanup_(bus_fiber_data_freep) BusFiberData *d = new(BusFiberData, 1);
+                        if (!d)
+                                return -ENOMEM;
+
+                        *d = (BusFiberData) {
+                                .bus = sd_bus_ref(bus),
+                                .message = sd_bus_message_ref(m),
+                                .slot = sd_bus_slot_ref(slot),
+                                .handler = c->vtable->x.method.handler,
+                                .userdata = u,
+                        };
+
+                        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+                        r = sd_fiber_new(bus->event, c->member, bus_fiber_entry, d, bus_fiber_data_destroy, &f);
+                        if (r < 0)
+                                return bus_maybe_reply_error(m, r, NULL);
+
+                        /* The fiber now owns d via bus_fiber_data_destroy. Drop our cleanup before any
+                         * further fallible calls, so a later failure unwinding f doesn't double-free d. */
+                        TAKE_PTR(d);
+
+                        r = set_ensure_put(&bus->fiber_futures, &bus_fiber_future_hash_ops, f);
+                        if (r < 0)
+                                return bus_maybe_reply_error(m, r, NULL);
+                        assert(r > 0);
+
+                        /* Track the future on the bus so shutdown can cancel it and wait for it. */
+                        r = sd_future_set_callback(f, bus_fiber_resolved, bus);
+                        if (r < 0) {
+                                /* TAKE_PTR(f) hasn't run yet, so our cleanup attribute still owns the
+                                 * ref; set_remove() returns the raw pointer without firing the hash_ops
+                                 * destructor, and the cleanup will unref f on return. */
+                                assert_se(set_remove(bus->fiber_futures, f) == f);
+                                return bus_maybe_reply_error(m, r, NULL);
+                        }
+
+                        TAKE_PTR(f);
+                        return 1;
+                }
+
                 bus->current_slot = sd_bus_slot_ref(slot);
                 bus->current_handler = c->vtable->x.method.handler;
                 bus->current_userdata = u;
index 27f788d99557642b0834d1bb653febf08bd4289e..e44c439fad862aa4e262b437edf6d6363a395e26 100644 (file)
 
 #include "sd-bus.h"
 #include "sd-event.h"
+#include "sd-future.h"
 
 #include "af-list.h"
 #include "alloc-util.h"
 #include "bus-container.h"
 #include "bus-control.h"
 #include "bus-error.h"
+#include "bus-future.h"
 #include "bus-internal.h"
 #include "bus-kernel.h"
 #include "bus-label.h"
@@ -222,6 +224,12 @@ static sd_bus* bus_free(sd_bus *b) {
         ordered_hashmap_free(b->reply_callbacks);
         prioq_free(b->reply_callbacks_prioq);
 
+        /* Outstanding fiber handlers pin the bus via their BusFiberData ref, so by the time refcount
+         * reaches zero and bus_free() runs, every fiber has already resolved and removed itself from
+         * this set. */
+        assert(set_isempty(b->fiber_futures));
+        set_free(b->fiber_futures);
+
         assert(b->match_callbacks.type == BUS_MATCH_ROOT);
         bus_match_free(&b->match_callbacks);
 
@@ -1809,6 +1817,9 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
 }
 
 void bus_enter_closing(sd_bus *bus, int exit_code) {
+        sd_future *f;
+        int r;
+
         assert(bus);
 
         if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
@@ -1816,6 +1827,19 @@ void bus_enter_closing(sd_bus *bus, int exit_code) {
 
         bus_set_state(bus, BUS_CLOSING);
         bus->exit_code = exit_code;
+
+        /* Cancel all outstanding fiber-dispatched method handlers. Most cancellations are scheduled
+         * asynchronously (fibers resolve with -ECANCELED the next time they run), but a fiber still
+         * in FIBER_STATE_INITIAL resolves synchronously, which fires bus_fiber_resolved() and
+         * removes f from this set mid-iteration. That's safe because SET_FOREACH permits removal of
+         * exactly the current entry — see the assertion in hashmap_iterate_entry(). Either way this
+         * doesn't block here: process_closing() waits for the fiber_futures set to drain before it
+         * continues tearing down the rest of the bus. */
+        SET_FOREACH(f, bus->fiber_futures) {
+                r = sd_future_cancel(f);
+                if (r < 0)
+                        log_debug_errno(r, "Failed to cancel outstanding fiber method handler, ignoring: %m");
+        }
 }
 
 /* Define manually so we can add the PID check */
@@ -2388,23 +2412,30 @@ _public_ int sd_bus_call(
                 sd_bus_error *reterr_error,
                 sd_bus_message **ret_reply) {
 
-        _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m);
         usec_t timeout;
         uint64_t cookie;
         size_t i;
         int r;
 
-        bus_assert_return(m, -EINVAL, reterr_error);
-        bus_assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL, reterr_error);
-        bus_assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL, reterr_error);
+        bus_assert_return(_m, -EINVAL, reterr_error);
+        bus_assert_return(_m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL, reterr_error);
+        bus_assert_return(!(_m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL, reterr_error);
         bus_assert_return(!bus_error_is_dirty(reterr_error), -EINVAL, reterr_error);
 
         if (bus)
                 assert_return(bus = bus_resolve(bus), -ENOPKG);
         else
-                assert_return(bus = m->bus, -ENOTCONN);
+                assert_return(bus = _m->bus, -ENOTCONN);
         bus_assert_return(!bus_origin_changed(bus), -ECHILD, reterr_error);
 
+        /* If the current fiber and the bus share their event loop, we can use sd_bus_call_suspend()
+         * instead which does an async method call. This allows multiple invocations of sd_bus_call() to
+         * happen across multiple fibers at once. */
+        if (sd_fiber_is_running() && bus->event == sd_fiber_get_event())
+                return bus_call_suspend(bus, _m, usec, reterr_error, ret_reply);
+
+        _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m);
+
         if (!BUS_IS_OPEN(bus->state)) {
                 r = -ENOTCONN;
                 goto fail;
@@ -3177,7 +3208,14 @@ static int process_closing(sd_bus *bus, sd_bus_message **ret) {
         assert(bus);
         assert(bus->state == BUS_CLOSING);
 
-        /* First, fail all outstanding method calls */
+        /* Wait for any still-running fiber method handlers to finish unwinding their cancellation
+         * before tearing down the rest of the bus. bus_enter_closing() scheduled the cancel; each
+         * fiber resolves asynchronously and bus_fiber_resolved() removes it from the set. Returning
+         * 1 here keeps the bus in CLOSING state so the event loop drives the fibers to completion. */
+        if (!set_isempty(bus->fiber_futures))
+                return 1;
+
+        /* Then, fail all outstanding method calls */
         c = ordered_hashmap_first(bus->reply_callbacks);
         if (c)
                 return process_closing_reply_callback(bus, c);
index 1f358ccd3396e9f87d7432e7e211b64838dc41d1..d6f4860c41511e04817127c8fd87c6629a3d2666 100644 (file)
@@ -1,11 +1,11 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
 #include <fcntl.h>
-#include <pthread.h>
 #include <sys/resource.h>
 #include <unistd.h>
 
 #include "sd-bus.h"
+#include "sd-future.h"
 
 #include "alloc-util.h"
 #include "bus-error.h"
@@ -102,7 +102,8 @@ static int server_init(sd_bus **ret) {
         return 0;
 }
 
-static int server(sd_bus *bus) {
+static int server(void *userdata) {
+        sd_bus *bus = ASSERT_PTR(userdata);
         bool client1_gone = false, client2_gone = false;
         int r;
 
@@ -178,7 +179,9 @@ static int server(sd_bus *bus) {
                         client2_gone = true;
                 } else if (sd_bus_message_is_method_call(m, "org.freedesktop.systemd.test", "Slow")) {
 
-                        sleep(1);
+                        r = sd_fiber_sleep(1 * USEC_PER_SEC);
+                        if (r < 0)
+                                return r;
 
                         r = sd_bus_reply_method_return(m, NULL);
                         if (r < 0)
@@ -194,10 +197,10 @@ static int server(sd_bus *bus) {
 
                         log_info("Received fd=%d", fd);
 
-                        if (write(fd, &x, 1) < 0) {
-                                r = log_error_errno(errno, "Failed to write to fd: %m");
+                        ssize_t n = sd_fiber_write(fd, &x, 1);
+                        if (n < 0) {
                                 safe_close(fd);
-                                return r;
+                                return log_error_errno(n, "Failed to write to fd: %m");
                         }
 
                         r = sd_bus_reply_method_return(m, NULL);
@@ -217,7 +220,7 @@ static int server(sd_bus *bus) {
         return 0;
 }
 
-static void* client1(void *p) {
+static int client1(void *userdata) {
         _cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL;
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
@@ -277,9 +280,9 @@ static void* client1(void *p) {
                 goto finish;
         }
 
-        errno = 0;
-        if (read(pp[0], &x, 1) <= 0) {
-                log_error("Failed to read from pipe: %s", STRERROR_OR_EOF(errno));
+        ssize_t n = sd_fiber_read(pp[0], &x, 1);
+        if (n <= 0) {
+                log_error("Failed to read from pipe: %s", STRERROR_OR_EOF(n));
                 goto finish;
         }
 
@@ -303,7 +306,7 @@ finish:
 
         }
 
-        return INT_TO_PTR(r);
+        return r;
 }
 
 static int quit_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
@@ -315,7 +318,7 @@ static int quit_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_er
         return 1;
 }
 
-static void* client2(void *p) {
+static int client2(void *userdata) {
         _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL;
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
@@ -494,7 +497,7 @@ finish:
                 (void) sd_bus_send(bus, q, NULL);
         }
 
-        return INT_TO_PTR(r);
+        return r;
 }
 
 static ino_t get_inode(int fd) {
@@ -626,9 +629,9 @@ TEST(ctrunc) {
 }
 
 TEST(chat) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client1 = NULL, *f_client2 = NULL;
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
-        pthread_t c1, c2;
-        void *p;
         int r;
 
         test_setup_logging(LOG_INFO);
@@ -639,16 +642,18 @@ TEST(chat) {
 
         log_info("Initialized...");
 
-        ASSERT_OK(-pthread_create(&c1, NULL, client1, NULL));
-        ASSERT_OK(-pthread_create(&c2, NULL, client2, NULL));
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        ASSERT_OK(sd_fiber_new(e, "client-1", client1, NULL, /* destroy= */ NULL, &f_client1));
+        ASSERT_OK(sd_fiber_new(e, "client-2", client2, NULL, /* destroy= */ NULL, &f_client2));
+        ASSERT_OK(sd_fiber_new(e, "server", server, bus, /* destroy= */ NULL, &f_server));
 
-        r = server(bus);
+        ASSERT_OK(sd_event_loop(e));
 
-        ASSERT_OK(-pthread_join(c1, &p));
-        ASSERT_OK(PTR_TO_INT(p));
-        ASSERT_OK(-pthread_join(c2, &p));
-        ASSERT_OK(PTR_TO_INT(p));
-        ASSERT_OK(r);
+        ASSERT_OK(sd_future_result(f_client1));
+        ASSERT_OK(sd_future_result(f_client2));
+        ASSERT_OK(sd_future_result(f_server));
 }
 
 DEFINE_TEST_MAIN(LOG_INFO);
diff --git a/src/libsystemd/sd-bus/test-bus-fiber.c b/src/libsystemd/sd-bus/test-bus-fiber.c
new file mode 100644 (file)
index 0000000..2cadf1d
--- /dev/null
@@ -0,0 +1,207 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <sys/socket.h>
+
+#include "sd-bus.h"
+#include "sd-event.h"
+#include "sd-future.h"
+
+#include "bus-internal.h"
+#include "tests.h"
+#include "time-util.h"
+
+typedef struct Context {
+        /* Counters for the concurrency check: every Concurrent invocation bumps in_flight on entry
+         * and drops it on exit, and tracks the maximum observed concurrency. If fiber dispatch
+         * works, two overlapping client calls must both be inside the handler at the same time,
+         * giving a max of at least 2. */
+        int in_flight;
+        int max_in_flight;
+        sd_future *waiter;
+} Context;
+
+static int method_concurrent(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+        Context *c = ASSERT_PTR(userdata);
+
+        ASSERT_OK_POSITIVE(sd_fiber_is_running());
+
+        c->in_flight++;
+        if (c->in_flight > c->max_in_flight)
+                c->max_in_flight = c->in_flight;
+
+        /* Synchronize on the peer instead of sleeping: the first handler to enter stashes its
+         * fiber and suspends; the second resumes it and then yields, so the first runs to
+         * completion (sending its reply) before the second proceeds. Both are therefore in
+         * flight at the same time regardless of how long any individual dispatch takes, and
+         * the reply order matches the request order. */
+        if (c->in_flight < 2) {
+                assert(!c->waiter);
+                c->waiter = sd_fiber_get_current();
+                ASSERT_OK(sd_fiber_suspend());
+        } else {
+                ASSERT_OK(sd_fiber_resume(TAKE_PTR(c->waiter), 0));
+                ASSERT_OK(sd_fiber_yield());
+        }
+
+        c->in_flight--;
+
+        return sd_bus_reply_method_return(m, NULL);
+}
+
+static int method_fail_errno(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+        ASSERT_OK_POSITIVE(sd_fiber_is_running());
+
+        /* Yielding first exercises the deferred-error path in the fiber entry: the handler returns
+         * a negative errno after suspending, and bus_maybe_reply_error() must still turn that into
+         * a matching sd_bus error reply. */
+        ASSERT_OK(sd_fiber_sleep(1 * USEC_PER_MSEC));
+
+        return -EACCES;
+}
+
+static int method_fail_error(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+        ASSERT_OK_POSITIVE(sd_fiber_is_running());
+
+        ASSERT_OK(sd_fiber_sleep(1 * USEC_PER_MSEC));
+
+        return sd_bus_error_set(reterr_error, SD_BUS_ERROR_INVALID_ARGS, "bad arguments from fiber");
+}
+
+static const sd_bus_vtable vtable[] = {
+        SD_BUS_VTABLE_START(0),
+        SD_BUS_METHOD("Concurrent", NULL, NULL, method_concurrent,
+                      SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER),
+        SD_BUS_METHOD("FailErrno", NULL, NULL, method_fail_errno,
+                      SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER),
+        SD_BUS_METHOD("FailError", NULL, NULL, method_fail_error,
+                      SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER),
+        SD_BUS_VTABLE_END,
+};
+
+typedef struct Setup {
+        int fds[2];
+        Context *c;
+} Setup;
+
+static int attach_pair(Setup *s, sd_bus **ret_server, sd_bus **ret_client) {
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL;
+        sd_id128_t id;
+
+        assert(ret_server);
+        assert(ret_client);
+
+        ASSERT_OK(sd_id128_randomize(&id));
+        ASSERT_OK(sd_bus_new(&server));
+        ASSERT_OK(sd_bus_set_description(server, "server"));
+        ASSERT_OK(sd_bus_set_fd(server, s->fds[0], s->fds[0]));
+        ASSERT_OK(sd_bus_set_server(server, true, id));
+        ASSERT_OK(sd_bus_attach_event(server, sd_fiber_get_event(), 0));
+        ASSERT_OK(sd_bus_add_object_vtable(server, NULL, "/test", "test.Fiber", vtable, s->c));
+        ASSERT_OK(sd_bus_start(server));
+
+        ASSERT_OK(sd_bus_new(&client));
+        ASSERT_OK(sd_bus_set_description(client, "client"));
+        ASSERT_OK(sd_bus_set_fd(client, s->fds[1], s->fds[1]));
+        ASSERT_OK(sd_bus_attach_event(client, sd_fiber_get_event(), 0));
+        ASSERT_OK(sd_bus_start(client));
+
+        *ret_server = TAKE_PTR(server);
+        *ret_client = TAKE_PTR(client);
+        return 0;
+}
+
+static int call_concurrent_fiber(void *userdata) {
+        sd_bus *client = ASSERT_PTR(userdata);
+
+        /* A plain suspending sd_bus_call() — on a fiber this goes through sd_bus_call_suspend()
+         * which multiplexes onto the single client connection, so multiple caller fibers can have
+         * calls in flight at the same time. */
+        return sd_bus_call_method(client, NULL, "/test", "test.Fiber", "Concurrent",
+                                  NULL, NULL, NULL);
+}
+
+static int concurrency_fiber(void *userdata) {
+        Setup *s = ASSERT_PTR(userdata);
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL;
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f_a = NULL, *f_b = NULL;
+
+        ASSERT_OK(attach_pair(s, &server, &client));
+
+        /* Two concurrent calls on the shared client bus. Each lands in method_concurrent which
+         * blocks on the peer; if fiber dispatch works the second is entered while the first is
+         * suspended, so max_in_flight on the context reaches 2. */
+        ASSERT_OK(sd_fiber_new(sd_fiber_get_event(), "call-a", call_concurrent_fiber, client,
+                               /* destroy= */ NULL, &f_a));
+        ASSERT_OK(sd_fiber_new(sd_fiber_get_event(), "call-b", call_concurrent_fiber, client,
+                               /* destroy= */ NULL, &f_b));
+
+        ASSERT_OK(sd_fiber_await(f_a));
+        ASSERT_OK(sd_fiber_await(f_b));
+
+        ASSERT_OK(sd_future_result(f_a));
+        ASSERT_OK(sd_future_result(f_b));
+        return 0;
+}
+
+TEST(fiber_method_concurrency) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        Context c = {};
+        Setup s = { .c = &c };
+
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, s.fds));
+
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        ASSERT_OK(sd_fiber_new(e, "concurrency", concurrency_fiber, &s, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(sd_future_result(f));
+        ASSERT_GE(c.max_in_flight, 2);
+}
+
+static int errors_fiber(void *userdata) {
+        Setup *s = ASSERT_PTR(userdata);
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL;
+
+        ASSERT_OK(attach_pair(s, &server, &client));
+
+        /* A fiber handler that returns a negative errno gets turned into a matching sd_bus error
+         * reply (bus_maybe_reply_error → sd_bus_reply_method_errno). */
+        _cleanup_(sd_bus_error_free) sd_bus_error e1 = SD_BUS_ERROR_NULL;
+        ASSERT_ERROR(sd_bus_call_method(client, NULL, "/test", "test.Fiber", "FailErrno",
+                                        &e1, NULL, NULL),
+                     EACCES);
+        ASSERT_TRUE(sd_bus_error_has_name(&e1, SD_BUS_ERROR_ACCESS_DENIED));
+
+        /* A fiber handler that populates sd_bus_error directly propagates both name and message. */
+        _cleanup_(sd_bus_error_free) sd_bus_error e2 = SD_BUS_ERROR_NULL;
+        ASSERT_FAIL(sd_bus_call_method(client, NULL, "/test", "test.Fiber", "FailError",
+                                       &e2, NULL, NULL));
+        ASSERT_TRUE(sd_bus_error_has_name(&e2, SD_BUS_ERROR_INVALID_ARGS));
+        ASSERT_STREQ(e2.message, "bad arguments from fiber");
+
+        return 0;
+}
+
+TEST(fiber_method_errors) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        Context c = {};
+        Setup s = { .c = &c };
+
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, s.fds));
+
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        ASSERT_OK(sd_fiber_new(e, "errors", errors_fiber, &s, /* destroy= */ NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(sd_future_result(f));
+}
+
+DEFINE_TEST_MAIN(LOG_DEBUG);
index 4ad60f0d582252c906f2bca04b94e08f042376e8..ac33086a6f374d340ccf72aa55ea2044c09a3842 100644 (file)
@@ -1,8 +1,7 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
-#include <pthread.h>
-
 #include "sd-bus.h"
+#include "sd-future.h"
 
 #include "alloc-util.h"
 #include "bus-internal.h"
@@ -211,9 +210,9 @@ static int enumerator3_callback(sd_bus *bus, const char *path, void *userdata, c
         return 1;
 }
 
-static void* server(void *p) {
-        struct context *c = p;
-        sd_bus *bus = NULL;
+static int server(void *userdata) {
+        struct context *c = ASSERT_PTR(userdata);
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         sd_id128_t id;
         int r;
 
@@ -242,36 +241,25 @@ static void* server(void *p) {
                 log_error("Loop!");
 
                 r = sd_bus_process(bus, NULL);
-                if (r < 0) {
-                        log_error_errno(r, "Failed to process requests: %m");
-                        goto fail;
-                }
+                if (r < 0)
+                        return log_error_errno(r, "Failed to process requests: %m");
 
                 if (r == 0) {
                         r = sd_bus_wait(bus, UINT64_MAX);
-                        if (r < 0) {
-                                log_error_errno(r, "Failed to wait: %m");
-                                goto fail;
-                        }
+                        if (r < 0)
+                                return log_error_errno(r, "Failed to wait: %m");
 
                         continue;
                 }
         }
 
-        r = 0;
-
-fail:
-        if (bus) {
-                sd_bus_flush(bus);
-                sd_bus_unref(bus);
-        }
-
-        return INT_TO_PTR(r);
+        return 0;
 }
 
-static int client(struct context *c) {
+static int client(void *p) {
+        struct context *c = ASSERT_PTR(p);
         _cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL;
-        _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL;
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
         _cleanup_strv_free_ char **lines = NULL;
         const char *s;
@@ -575,16 +563,13 @@ static int client(struct context *c) {
 
         ASSERT_OK(sd_bus_call_method(bus, "org.freedesktop.systemd.test", "/foo", "org.freedesktop.systemd.test", "Exit", &error, NULL, NULL));
 
-        sd_bus_flush(bus);
-
         return 0;
 }
 
 int main(int argc, char *argv[]) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL;
         struct context c = {};
-        pthread_t s;
-        void *p;
-        int r, q;
 
         test_setup_logging(LOG_DEBUG);
 
@@ -593,21 +578,16 @@ int main(int argc, char *argv[]) {
 
         ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, c.fds));
 
-        r = pthread_create(&s, NULL, server, &c);
-        if (r != 0)
-                return -r;
-
-        r = client(&c);
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
 
-        q = pthread_join(s, &p);
-        if (q != 0)
-                return -q;
+        ASSERT_OK(sd_fiber_new(e, "server", server, &c, /* destroy= */ NULL, &f_server));
+        ASSERT_OK(sd_fiber_new(e, "client", client, &c, /* destroy= */ NULL, &f_client));
 
-        if (r < 0)
-                return r;
+        ASSERT_OK(sd_event_loop(e));
 
-        if (PTR_TO_INT(p) < 0)
-                return PTR_TO_INT(p);
+        ASSERT_OK(sd_future_result(f_server));
+        ASSERT_OK(sd_future_result(f_client));
 
         free(c.something);
         free(c.automatic_string_property);
index 2cac35dde4033493e2e103b9ebc55d9be89c0433..bee76c9b10ca76e1c1b1cae542e5f48a2460076a 100644 (file)
@@ -1,9 +1,9 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
-#include <pthread.h>
 #include <unistd.h>
 
 #include "sd-bus.h"
+#include "sd-future.h"
 
 #include "bus-dump.h"
 #include "fd-util.h"
@@ -38,9 +38,9 @@ static bool gid_list_same(const gid_t *a, size_t n, const gid_t *b, size_t m) {
                 gid_list_contained(b, m, a, n);
 }
 
-static void* server(void *p) {
+static int server(void *userdata) {
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
-        _cleanup_close_ int listen_fd = PTR_TO_INT(p), fd = -EBADF;
+        _cleanup_close_ int listen_fd = PTR_TO_INT(userdata), fd = -EBADF;
         _cleanup_(sd_bus_creds_unrefp) sd_bus_creds *c = NULL;
         _cleanup_free_ char *our_comm = NULL;
         sd_id128_t id;
@@ -48,7 +48,7 @@ static void* server(void *p) {
 
         ASSERT_OK(sd_id128_randomize(&id));
 
-        ASSERT_OK_ERRNO(fd = accept4(listen_fd, NULL, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK));
+        ASSERT_OK(fd = sd_fiber_accept(listen_fd, NULL, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK));
 
         ASSERT_OK(sd_bus_new(&bus));
         ASSERT_OK(sd_bus_set_fd(bus, fd, fd));
@@ -114,17 +114,18 @@ static void* server(void *p) {
                 }
         }
 
-        return NULL;
+        return 0;
 }
 
-static void* client(void *p) {
+static int client(void *userdata) {
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         _cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL;
         const char *z;
 
         ASSERT_OK(sd_bus_new(&bus));
         ASSERT_OK(sd_bus_set_description(bus, "wuffwuff"));
-        ASSERT_OK(sd_bus_set_address(bus, p));
+        ASSERT_OK(sd_bus_set_address(bus, userdata));
+        ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0));
         ASSERT_OK(sd_bus_start(bus));
 
         ASSERT_OK(sd_bus_call_method(bus, "foo.foo", "/foo", "foo.foo", "Foo", NULL, &reply, "s", "foo"));
@@ -132,17 +133,18 @@ static void* client(void *p) {
         ASSERT_OK(sd_bus_message_read(reply, "s", &z));
         ASSERT_STREQ(z, "bar");
 
-        return NULL;
+        return 0;
 }
 
 TEST(description) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL;
         _cleanup_free_ char *a = NULL;
         _cleanup_close_ int fd = -EBADF;
         union sockaddr_union sa = {
                 .un.sun_family = AF_UNIX,
         };
         socklen_t salen;
-        pthread_t s, c;
 
         ASSERT_OK_ERRNO(fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0));
         ASSERT_OK_ERRNO(bind(fd, &sa.sa, offsetof(struct sockaddr_un, sun_path))); /* force auto-bind */
@@ -155,13 +157,18 @@ TEST(description) {
 
         ASSERT_OK(asprintf(&a, "unix:abstract=%s", sa.un.sun_path + 1));
 
-        ASSERT_OK(-pthread_create(&s, NULL, server, INT_TO_PTR(fd)));
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+        ASSERT_OK(sd_fiber_new(e, "server", server, INT_TO_PTR(fd), /* destroy= */ NULL, &f_server));
         TAKE_FD(fd);
 
-        ASSERT_OK(-pthread_create(&c, NULL, client, a));
+        ASSERT_OK(sd_fiber_new(e, "client", client, a, /* destroy= */ NULL, &f_client));
+
+        ASSERT_OK(sd_event_loop(e));
 
-        ASSERT_OK(-pthread_join(s, NULL));
-        ASSERT_OK(-pthread_join(c, NULL));
+        ASSERT_OK(sd_future_result(f_server));
+        ASSERT_OK(sd_future_result(f_client));
 }
 
 DEFINE_TEST_MAIN(LOG_INFO);
index 989d2bf10dcaab740698bef67902f944e5993d69..1edcec858f2ac9a4eac5f8f975959b1a92c4888c 100644 (file)
@@ -1,10 +1,12 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
-#include <pthread.h>
 #include <sys/socket.h>
 
 #include "sd-bus.h"
+#include "sd-event.h"
+#include "sd-future.h"
 
+#include "errno-util.h"
 #include "log.h"
 #include "memory-util.h"
 #include "string-util.h"
@@ -20,7 +22,8 @@ struct context {
         bool server_anonymous_auth;
 };
 
-static int _server(struct context *c) {
+static int server(void *userdata) {
+        struct context *c = ASSERT_PTR(userdata);
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         sd_id128_t id;
         bool quit = false;
@@ -29,6 +32,7 @@ static int _server(struct context *c) {
         ASSERT_OK(sd_id128_randomize(&id));
 
         ASSERT_OK(sd_bus_new(&bus));
+        ASSERT_OK(sd_bus_set_description(bus, "server"));
         ASSERT_OK(sd_bus_set_fd(bus, c->fds[0], c->fds[0]));
         ASSERT_OK(sd_bus_set_server(bus, 1, id));
         ASSERT_OK(sd_bus_set_anonymous(bus, c->server_anonymous_auth));
@@ -74,17 +78,16 @@ static int _server(struct context *c) {
         return 0;
 }
 
-static void* server(void *p) {
-        return INT_TO_PTR(_server(p));
-}
-
-static int client(struct context *c) {
+static int client(void *userdata) {
+        struct context *c = ASSERT_PTR(userdata);
         _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL;
-        _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL;
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
         _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
 
         ASSERT_OK(sd_bus_new(&bus));
+        ASSERT_OK(sd_bus_set_description(bus, "client"));
         ASSERT_OK(sd_bus_set_fd(bus, c->fds[1], c->fds[1]));
+        ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0));
         ASSERT_OK(sd_bus_negotiate_fds(bus, c->client_negotiate_unix_fds));
         ASSERT_OK(sd_bus_set_anonymous(bus, c->client_anonymous_auth));
         ASSERT_OK(sd_bus_start(bus));
@@ -103,10 +106,10 @@ static int client(struct context *c) {
 static int test_one(bool client_negotiate_unix_fds, bool server_negotiate_unix_fds,
                     bool client_anonymous_auth, bool server_anonymous_auth) {
 
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL;
         struct context c;
-        pthread_t s;
-        void *p;
-        int r, q;
+        int r = 0;
 
         zero(c);
 
@@ -117,23 +120,18 @@ static int test_one(bool client_negotiate_unix_fds, bool server_negotiate_unix_f
         c.client_anonymous_auth = client_anonymous_auth;
         c.server_anonymous_auth = server_anonymous_auth;
 
-        r = pthread_create(&s, NULL, server, &c);
-        if (r != 0)
-                return -r;
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
 
-        r = client(&c);
+        ASSERT_OK(sd_fiber_new(e, "server", server, &c, /* destroy= */ NULL, &f_server));
+        ASSERT_OK(sd_fiber_new(e, "client", client, &c, /* destroy= */ NULL, &f_client));
 
-        q = pthread_join(s, &p);
-        if (q != 0)
-                return -q;
+        ASSERT_OK(sd_event_loop(e));
 
-        if (r < 0)
-                return r;
+        RET_GATHER(r, sd_future_result(f_client));
+        RET_GATHER(r, sd_future_result(f_server));
 
-        if (PTR_TO_INT(p) < 0)
-                return PTR_TO_INT(p);
-
-        return 0;
+        return r;
 }
 
 int main(int argc, char *argv[]) {
@@ -145,7 +143,7 @@ int main(int argc, char *argv[]) {
         ASSERT_OK(test_one(false, false, false, false));
         ASSERT_OK(test_one(true, true, true, true));
         ASSERT_OK(test_one(true, true, false, true));
-        ASSERT_ERROR(test_one(true, true, true, false), EPERM);
+        ASSERT_ERROR(test_one(true, true, true, false), EACCES);
 
         return EXIT_SUCCESS;
 }
index 1bf4ee7017119f70f119885b82ed744563ceccc8..6561633b8a82358ce6838e9a7fb1aba53560e6e9 100644 (file)
@@ -1,10 +1,8 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
-#include <pthread.h>
-#include <unistd.h>
-
 #include "sd-bus.h"
 #include "sd-event.h"
+#include "sd-future.h"
 #include "sd-id128.h"
 
 #include "alloc-util.h"
@@ -44,33 +42,33 @@ static const sd_bus_vtable vtable[] = {
         SD_BUS_VTABLE_END,
 };
 
-static void* thread_server(void *p) {
+static int server(void *userdata) {
         _cleanup_free_ char *suffixed = NULL, *suffixed_basename = NULL, *suffixed2 = NULL, *d = NULL;
         _cleanup_close_ int fd = -EBADF;
         union sockaddr_union u;
-        const char *path = p;
+        const char *path = ASSERT_PTR(userdata);
         int r;
 
         log_debug("Initializing server");
 
         /* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         ASSERT_OK(mkdir_parents(path, 0755));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         ASSERT_OK(path_extract_directory(path, &d));
         ASSERT_OK(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()));
         ASSERT_OK_ERRNO(rename(d, suffixed));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         ASSERT_OK(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()));
         ASSERT_OK_ERRNO(symlink(suffixed2, d));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         ASSERT_OK(path_extract_filename(suffixed, &suffixed_basename));
         ASSERT_OK_ERRNO(symlink(suffixed_basename, suffixed2));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         socklen_t sa_len;
         r = sockaddr_un_set_path(&u.un, path);
@@ -81,13 +79,13 @@ static void* thread_server(void *p) {
         ASSERT_OK_ERRNO(fd);
 
         ASSERT_OK_ERRNO(bind(fd, &u.sa, sa_len));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         ASSERT_OK_ERRNO(listen(fd, SOMAXCONN_DELUXE));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         ASSERT_OK(touch(path));
-        usleep_safe(100 * USEC_PER_MSEC);
+        ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
 
         log_debug("Initialized server");
 
@@ -101,8 +99,7 @@ static void* thread_server(void *p) {
 
                 ASSERT_OK(sd_event_new(&event));
 
-                bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
-                ASSERT_OK_ERRNO(bus_fd);
+                ASSERT_OK(bus_fd = sd_fiber_accept(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC));
 
                 log_debug("Accepted server connection");
 
@@ -129,13 +126,13 @@ static void* thread_server(void *p) {
 
         log_debug("Server done");
 
-        return NULL;
+        return 0;
 }
 
-static void* thread_client1(void *p) {
+static int client1(void *userdata) {
         _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
-        const char *path = p, *t;
+        const char *path = ASSERT_PTR(userdata), *t;
 
         log_debug("Initializing client1");
 
@@ -151,59 +148,65 @@ static void* thread_client1(void *p) {
 
         log_debug("Client1 done");
 
-        return NULL;
-}
-
-static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
-        ASSERT_OK_ZERO(sd_bus_message_is_method_error(m, NULL));
-        ASSERT_OK(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0));
         return 0;
 }
 
-static void* thread_client2(void *p) {
+static int client2(void *userdata) {
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
-        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
-        const char *path = p, *t;
+        const char *path = ASSERT_PTR(userdata), *t;
 
         log_debug("Initializing client2");
 
-        ASSERT_OK(sd_event_new(&event));
         ASSERT_OK(sd_bus_new(&bus));
         ASSERT_OK(sd_bus_set_description(bus, "client2"));
 
         t = strjoina("unix:path=", path);
         ASSERT_OK(sd_bus_set_address(bus, t));
         ASSERT_OK(sd_bus_set_watch_bind(bus, true));
-        ASSERT_OK(sd_bus_attach_event(bus, event, 0));
+        ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0));
         ASSERT_OK(sd_bus_start(bus));
 
-        ASSERT_OK(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL));
-
-        ASSERT_OK(sd_event_loop(event));
+        _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL;
+        ASSERT_OK(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", NULL, &m, NULL));
 
+        ASSERT_OK_ZERO(sd_bus_message_is_method_error(m, NULL));
         log_debug("Client2 done");
 
-        return NULL;
+        return 0;
 }
 
-static void request_exit(const char *path) {
+typedef struct RequestExitArgs {
+        const char *path;
+        sd_future *client1;
+        sd_future *client2;
+} RequestExitArgs;
+
+static int request_exit(void *userdata) {
         _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+        RequestExitArgs *args = ASSERT_PTR(userdata);
         const char *t;
 
+        /* Wait for all client fibers to complete before requesting exit */
+        ASSERT_OK(sd_fiber_await(args->client1));
+        ASSERT_OK(sd_fiber_await(args->client2));
+
         ASSERT_OK(sd_bus_new(&bus));
 
-        t = strjoina("unix:path=", path);
+        t = strjoina("unix:path=", args->path);
         ASSERT_OK(sd_bus_set_address(bus, t));
         ASSERT_OK(sd_bus_set_watch_bind(bus, true));
         ASSERT_OK(sd_bus_set_description(bus, "request-exit"));
         ASSERT_OK(sd_bus_start(bus));
 
         ASSERT_OK(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL));
+
+        return 0;
 }
 
 int main(int argc, char *argv[]) {
         _cleanup_(rm_rf_physical_and_freep) char *d = NULL;
-        pthread_t server, client1, client2;
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client1 = NULL, *f_client2 = NULL, *f_exit = NULL;
         char *path;
 
         test_setup_logging(LOG_DEBUG);
@@ -214,16 +217,27 @@ int main(int argc, char *argv[]) {
 
         path = strjoina(d, "/this/is/a/socket");
 
-        ASSERT_OK(-pthread_create(&server, NULL, thread_server, path));
-        ASSERT_OK(-pthread_create(&client1, NULL, thread_client1, path));
-        ASSERT_OK(-pthread_create(&client2, NULL, thread_client2, path));
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
 
-        ASSERT_OK(-pthread_join(client1, NULL));
-        ASSERT_OK(-pthread_join(client2, NULL));
+        ASSERT_OK(sd_fiber_new(e, "server", server, path, /* destroy= */ NULL, &f_server));
 
-        request_exit(path);
+        ASSERT_OK(sd_fiber_new(e, "client-1", client1, path, /* destroy= */ NULL, &f_client1));
+        ASSERT_OK(sd_fiber_new(e, "client-2", client2, path, /* destroy= */ NULL, &f_client2));
 
-        ASSERT_OK(-pthread_join(server, NULL));
+        RequestExitArgs args = {
+                .path = path,
+                .client1 = f_client1,
+                .client2 = f_client2,
+        };
+        ASSERT_OK(sd_fiber_new(e, "request-exit", request_exit, &args, /* destroy= */ NULL, &f_exit));
 
-        return 0;
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(sd_future_result(f_client1));
+        ASSERT_OK(sd_future_result(f_client2));
+        ASSERT_OK(sd_future_result(f_exit));
+        ASSERT_OK(sd_future_result(f_server));
+
+        return EXIT_SUCCESS;
 }
index 5c11ca8ae5b7193768322f00e532305b74783da4..036bda3fe47e9b07c92805a06ebffab4d1055be0 100644 (file)
@@ -44,6 +44,7 @@ __extension__ enum {
         SD_BUS_VTABLE_PROPERTY_EXPLICIT            = 1ULL << 7,
         SD_BUS_VTABLE_SENSITIVE                    = 1ULL << 8, /* covers both directions: method call + reply */
         SD_BUS_VTABLE_ABSOLUTE_OFFSET              = 1ULL << 9,
+        /* Bit 10 is reserved for the private SD_BUS_VTABLE_METHOD_FIBER flag (see bus-internal.h). */
         _SD_BUS_VTABLE_CAPABILITY_MASK             = 0xFFFFULL << 40
 };