From: Daan De Meyer Date: Mon, 11 May 2026 14:27:34 +0000 (+0200) Subject: sd-bus: make sd-bus fiber-aware X-Git-Tag: v261-rc1~40^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9727f6536c5890c3a20e45de1355791e8ed523d4;p=thirdparty%2Fsystemd.git sd-bus: make sd-bus fiber-aware Two changes to teach sd-bus how to behave when called from a fiber, in order of increasing depth: 2. sd_bus_call() now redirects to a new bus_call_suspend() helper when the caller is a fiber whose event loop is the same one the bus is attached to. The plain bus_poll() path serializes all bus traffic on the slot's reply (only one method call can be in flight per sd_bus*), which would defeat the point of running multiple fibers against one bus. bus_call_suspend() builds on the async sd-bus API: it wraps the call in a new BusFuture (sd-bus/bus-future.{c,h}) that resolves when the reply or method-error arrives, lets the fiber await that future, and surfaces the reply to the caller via future_get_bus_reply(). Because the futures live on the event loop rather than a per-bus slot, multiple fibers can drive concurrent method calls against the same bus. 3. A new private SD_BUS_VTABLE_METHOD_FIBER flag dispatches a vtable method handler on its own fiber, so handlers are free to use sd_bus_call() against the same bus, sd_fiber_sleep(), loop_read(), etc. without stalling the event loop for other connections or handlers. The flag stays out of sd-bus-vtable.h (its bit value is reserved there to prevent collisions) — the fiber runtime is a systemd-internal implementation detail. Lifecycle of fiber-dispatched handlers is tracked on the bus itself: a new bus->fiber_futures set holds a ref to each in-flight handler. bus_enter_closing() cancels every entry and process_closing() returns with the bus still in CLOSING state until the set drains, so we can be sure no fiber handler outlives the bus. bus_fiber_resolved() removes the entry on completion. bus_free()'s assert(set_isempty()) makes the invariant load-bearing. Note that plain sd_bus_call() already works correctly on a fiber as it calls ppoll_usec() which has already been modified to suspend when running on a fiber. To exercise these changes the existing thread-based client/server sd-bus tests (test-bus-chat, test-bus-objects, test-bus-peersockaddr, test-bus-server, test-bus-watch-bind) are migrated to fibers, and a new test-bus-fiber is added that covers SD_BUS_VTABLE_METHOD_FIBER — including handlers that issue nested sd_bus_call() on the same bus, the cancel-on-close path, and concurrent dispatches across multiple fibers. --- diff --git a/src/libsystemd/meson.build b/src/libsystemd/meson.build index 4e9fd28d442..0cecea3b0d1 100644 --- a/src/libsystemd/meson.build +++ b/src/libsystemd/meson.build @@ -49,6 +49,7 @@ sd_bus_sources = files( 'sd-bus/bus-dump.c', 'sd-bus/bus-dump-json.c', 'sd-bus/bus-error.c', + 'sd-bus/bus-future.c', 'sd-bus/bus-internal.c', 'sd-bus/bus-introspect.c', 'sd-bus/bus-kernel.c', @@ -185,6 +186,7 @@ libsystemd_pc = custom_target( simple_tests += files( 'sd-bus/test-bus-creds.c', + 'sd-bus/test-bus-fiber.c', 'sd-bus/test-bus-introspect.c', 'sd-bus/test-bus-match.c', 'sd-bus/test-bus-vtable.c', diff --git a/src/libsystemd/sd-bus/bus-future.c b/src/libsystemd/sd-bus/bus-future.c new file mode 100644 index 00000000000..d7037b3f15b --- /dev/null +++ b/src/libsystemd/sd-bus/bus-future.c @@ -0,0 +1,135 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include "sd-bus.h" +#include "sd-future.h" + +#include "alloc-util.h" +#include "bus-future.h" +#include "bus-internal.h" +#include "bus-message.h" + +typedef struct BusFuture { + sd_bus_slot *slot; + sd_bus_message *reply; +} BusFuture; + +static void* bus_future_alloc(void) { + return new0(BusFuture, 1); +} + +static void bus_future_free(sd_future *f) { + BusFuture *bf = ASSERT_PTR(sd_future_get_private(f)); + sd_bus_slot_unref(bf->slot); + sd_bus_message_unref(bf->reply); + free(bf); +} + +static int bus_future_cancel(sd_future *f) { + BusFuture *bf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f))); + + bf->slot = sd_bus_slot_unref(bf->slot); + return sd_future_resolve(f, -ECANCELED); +} + +static const sd_future_ops bus_future_ops = { + .size = sizeof(sd_future_ops), + .alloc = bus_future_alloc, + .free = bus_future_free, + .cancel = bus_future_cancel, +}; + +static int bus_future_handler(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) { + sd_future *f = ASSERT_PTR(userdata); + BusFuture *bf = ASSERT_PTR(sd_future_get_private(f)); + int r; + + /* Resolve with 0 on a success reply and -errno (derived from the D-Bus error name) on a + * method error reply, so a caller awaiting the future learns about call failures from the + * resolution value alone. The reply itself is always stashed in bf->reply so + * future_get_bus_reply() can hand back the detailed sd_bus_error (name + message) on + * top of the bare errno. Cancellation surfaces as -ECANCELED via bus_future_cancel(), + * with bf->reply left NULL — callers can distinguish "got an error reply" from "no reply + * will arrive" by whether future_get_bus_reply() can produce a message. */ + bf->slot = sd_bus_slot_unref(bf->slot); + bf->reply = sd_bus_message_ref(m); + + r = sd_bus_message_is_method_error(m, NULL) ? -sd_bus_message_get_errno(m) : 0; + return sd_future_resolve(f, r); +} + +int bus_call_future(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_future **ret) { + int r; + + assert(bus); + assert(m); + assert(ret); + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + r = sd_future_new(&bus_future_ops, &f); + if (r < 0) + return r; + + BusFuture *bf = sd_future_get_private(f); + + r = sd_bus_call_async(bus, &bf->slot, m, bus_future_handler, f, usec); + if (r < 0) + return r; + + *ret = TAKE_PTR(f); + return 0; +} + +int future_get_bus_reply(sd_future *f, sd_bus_error *reterr_error, sd_bus_message **ret_reply) { + BusFuture *bf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f))); + sd_bus_message *reply = ASSERT_PTR(bf->reply); + + assert(sd_future_get_ops(f) == &bus_future_ops); + assert(sd_future_state(f) == SD_FUTURE_RESOLVED); + + if (sd_bus_message_is_method_error(reply, NULL)) { + if (reterr_error) + return sd_bus_error_copy(reterr_error, sd_bus_message_get_error(reply)); + return -sd_bus_message_get_errno(reply); + } + + if (reply->n_fds > 0 && !sd_bus_message_get_bus(reply)->accept_fd) + return sd_bus_error_set(reterr_error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, + "Reply message contained file descriptors which I couldn't accept. Sorry."); + + if (reterr_error) + *reterr_error = SD_BUS_ERROR_NULL; + if (ret_reply) + *ret_reply = sd_bus_message_ref(reply); + + return 1; +} + +int bus_call_suspend( + sd_bus *bus, + sd_bus_message *m, + uint64_t usec, + sd_bus_error *reterr_error, + sd_bus_message **ret_reply) { + + int r; + + assert(bus); + assert(m); + assert(sd_fiber_is_running()); + + _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f = NULL; + r = bus_call_future(bus, m, usec, &f); + if (r < 0) + return sd_bus_error_set_errno(reterr_error, r); + + r = sd_fiber_suspend(); + + /* If the future isn't resolved, the suspend was interrupted before a reply arrived (fiber + * cancelled, fiber-wide SD_FIBER_TIMEOUT scope expired, …). There's no reply to extract, + * so surface the resume error directly. When the future is resolved, future_get_bus_reply() + * recovers either the reply or the detailed sd_bus_error from the error reply. */ + if (sd_future_state(f) != SD_FUTURE_RESOLVED) + return sd_bus_error_set_errno(reterr_error, r); + + return future_get_bus_reply(f, reterr_error, ret_reply); +} diff --git a/src/libsystemd/sd-bus/bus-future.h b/src/libsystemd/sd-bus/bus-future.h new file mode 100644 index 00000000000..ec9bd80b159 --- /dev/null +++ b/src/libsystemd/sd-bus/bus-future.h @@ -0,0 +1,14 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include "sd-forward.h" + +int bus_call_future(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_future **ret); +int future_get_bus_reply(sd_future *f, sd_bus_error *reterr_error, sd_bus_message **ret_reply); + +int bus_call_suspend( + sd_bus *bus, + sd_bus_message *m, + uint64_t usec, + sd_bus_error *reterr_error, + sd_bus_message **ret_reply); diff --git a/src/libsystemd/sd-bus/bus-internal.h b/src/libsystemd/sd-bus/bus-internal.h index 19a3b67d12f..3a52f738d6b 100644 --- a/src/libsystemd/sd-bus/bus-internal.h +++ b/src/libsystemd/sd-bus/bus-internal.h @@ -17,6 +17,13 @@ #define DEFAULT_SYSTEM_BUS_ADDRESS "unix:path=/run/dbus/system_bus_socket" #define DEFAULT_USER_BUS_ADDRESS_FMT "unix:path=%s/bus" +/* Private vtable flag: dispatch the method handler on its own fiber, so it can use suspending + * primitives (sd_bus_call() on a fiber, sd_fiber_sleep(), loop_read_suspend(), ...) without + * blocking the event loop for other connections or method calls. Kept out of the public + * sd-bus-vtable.h so the fiber runtime stays an implementation detail of systemd. The bit value is + * reserved in sd-bus-vtable.h to make sure it never collides with a future public flag. */ +#define SD_BUS_VTABLE_METHOD_FIBER (UINT64_C(1) << 10) + typedef struct BusReplyCallback { sd_bus_message_handler_t callback; usec_t timeout_usec; /* this is a relative timeout until we reach the BUS_HELLO state, and an absolute one right after */ @@ -222,6 +229,12 @@ typedef struct sd_bus { Set *vtable_methods; Set *vtable_properties; + /* Futures for outstanding SD_BUS_VTABLE_METHOD_FIBER dispatches. Entries are added as the + * dispatcher spawns each fiber and removed when the fiber resolves. On bus_enter_closing() + * we cancel everything in here and then wait in process_closing() until the set drains, + * before tearing down the rest of the bus. */ + Set *fiber_futures; + union sockaddr_union sockaddr; socklen_t sockaddr_size; diff --git a/src/libsystemd/sd-bus/bus-objects.c b/src/libsystemd/sd-bus/bus-objects.c index 83ba3a52399..795cc688376 100644 --- a/src/libsystemd/sd-bus/bus-objects.c +++ b/src/libsystemd/sd-bus/bus-objects.c @@ -3,6 +3,7 @@ #include #include "sd-bus.h" +#include "sd-future.h" #include "alloc-util.h" #include "bus-internal.h" @@ -337,6 +338,67 @@ static int check_access(sd_bus *bus, sd_bus_message *m, BusVTableMember *c, sd_b return sd_bus_error_setf(reterr_error, SD_BUS_ERROR_ACCESS_DENIED, "Access to %s.%s() not permitted.", c->interface, c->member); } +typedef struct BusFiberData { + sd_bus *bus; + sd_bus_message *message; + sd_bus_slot *slot; + sd_bus_message_handler_t handler; + void *userdata; +} BusFiberData; + +static BusFiberData* bus_fiber_data_free(BusFiberData *d) { + if (!d) + return NULL; + + sd_bus_slot_unref(d->slot); + sd_bus_message_unref(d->message); + sd_bus_unref(d->bus); + return mfree(d); +} + +DEFINE_TRIVIAL_CLEANUP_FUNC(BusFiberData*, bus_fiber_data_free); + +static void bus_fiber_data_destroy(void *userdata) { + bus_fiber_data_free(userdata); +} + +static void bus_fiber_future_unref(void *p) { + sd_future_unref(p); +} + +DEFINE_PRIVATE_HASH_OPS_WITH_KEY_DESTRUCTOR( + bus_fiber_future_hash_ops, + void, + trivial_hash_func, + trivial_compare_func, + bus_fiber_future_unref); + +static int bus_fiber_resolved(sd_future *f) { + sd_bus *bus = ASSERT_PTR(sd_future_get_userdata(f)); + + assert_se(set_remove(bus->fiber_futures, f) == f); + sd_future_unref(f); + return 0; +} + +static int bus_fiber_entry(void *userdata) { + BusFiberData *d = ASSERT_PTR(userdata); + _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; + int r; + + /* Note: unlike the synchronous dispatch path, we deliberately do NOT set + * bus->current_slot/handler/userdata around the callback. Those fields track the slot of the + * message currently being dispatched inline and must be NULL at each entry into + * bus_process_internal(). Because a fiber handler can yield and let the event loop dispatch + * other messages before it resumes, leaving current_slot non-NULL across yields would trip + * that invariant. Fiber handlers receive their slot's userdata via the handler argument, so + * sd_bus_get_current_slot()/handler()/userdata() simply aren't meaningful inside them — the + * handler should use the message/userdata parameters directly instead. */ + r = d->handler(d->message, d->userdata, &error); + + return bus_maybe_reply_error(d->message, r, &error); +} + static int method_callbacks_run( sd_bus *bus, sd_bus_message *m, @@ -407,6 +469,53 @@ static int method_callbacks_run( slot = container_of(c->parent, sd_bus_slot, node_vtable); + if (FLAGS_SET(c->vtable->flags, SD_BUS_VTABLE_METHOD_FIBER)) { + /* A fiber-dispatched method requires an event loop to spawn the fiber on. + * By the time a method call actually arrives the bus is running, so the + * event loop should already be attached — if not, the caller set up the bus + * wrong and there's no meaningful recovery. */ + assert(bus->event); + + _cleanup_(bus_fiber_data_freep) BusFiberData *d = new(BusFiberData, 1); + if (!d) + return -ENOMEM; + + *d = (BusFiberData) { + .bus = sd_bus_ref(bus), + .message = sd_bus_message_ref(m), + .slot = sd_bus_slot_ref(slot), + .handler = c->vtable->x.method.handler, + .userdata = u, + }; + + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + r = sd_fiber_new(bus->event, c->member, bus_fiber_entry, d, bus_fiber_data_destroy, &f); + if (r < 0) + return bus_maybe_reply_error(m, r, NULL); + + /* The fiber now owns d via bus_fiber_data_destroy. Drop our cleanup before any + * further fallible calls, so a later failure unwinding f doesn't double-free d. */ + TAKE_PTR(d); + + r = set_ensure_put(&bus->fiber_futures, &bus_fiber_future_hash_ops, f); + if (r < 0) + return bus_maybe_reply_error(m, r, NULL); + assert(r > 0); + + /* Track the future on the bus so shutdown can cancel it and wait for it. */ + r = sd_future_set_callback(f, bus_fiber_resolved, bus); + if (r < 0) { + /* TAKE_PTR(f) hasn't run yet, so our cleanup attribute still owns the + * ref; set_remove() returns the raw pointer without firing the hash_ops + * destructor, and the cleanup will unref f on return. */ + assert_se(set_remove(bus->fiber_futures, f) == f); + return bus_maybe_reply_error(m, r, NULL); + } + + TAKE_PTR(f); + return 1; + } + bus->current_slot = sd_bus_slot_ref(slot); bus->current_handler = c->vtable->x.method.handler; bus->current_userdata = u; diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c index 27f788d9955..e44c439fad8 100644 --- a/src/libsystemd/sd-bus/sd-bus.c +++ b/src/libsystemd/sd-bus/sd-bus.c @@ -10,12 +10,14 @@ #include "sd-bus.h" #include "sd-event.h" +#include "sd-future.h" #include "af-list.h" #include "alloc-util.h" #include "bus-container.h" #include "bus-control.h" #include "bus-error.h" +#include "bus-future.h" #include "bus-internal.h" #include "bus-kernel.h" #include "bus-label.h" @@ -222,6 +224,12 @@ static sd_bus* bus_free(sd_bus *b) { ordered_hashmap_free(b->reply_callbacks); prioq_free(b->reply_callbacks_prioq); + /* Outstanding fiber handlers pin the bus via their BusFiberData ref, so by the time refcount + * reaches zero and bus_free() runs, every fiber has already resolved and removed itself from + * this set. */ + assert(set_isempty(b->fiber_futures)); + set_free(b->fiber_futures); + assert(b->match_callbacks.type == BUS_MATCH_ROOT); bus_match_free(&b->match_callbacks); @@ -1809,6 +1817,9 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) { } void bus_enter_closing(sd_bus *bus, int exit_code) { + sd_future *f; + int r; + assert(bus); if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING)) @@ -1816,6 +1827,19 @@ void bus_enter_closing(sd_bus *bus, int exit_code) { bus_set_state(bus, BUS_CLOSING); bus->exit_code = exit_code; + + /* Cancel all outstanding fiber-dispatched method handlers. Most cancellations are scheduled + * asynchronously (fibers resolve with -ECANCELED the next time they run), but a fiber still + * in FIBER_STATE_INITIAL resolves synchronously, which fires bus_fiber_resolved() and + * removes f from this set mid-iteration. That's safe because SET_FOREACH permits removal of + * exactly the current entry — see the assertion in hashmap_iterate_entry(). Either way this + * doesn't block here: process_closing() waits for the fiber_futures set to drain before it + * continues tearing down the rest of the bus. */ + SET_FOREACH(f, bus->fiber_futures) { + r = sd_future_cancel(f); + if (r < 0) + log_debug_errno(r, "Failed to cancel outstanding fiber method handler, ignoring: %m"); + } } /* Define manually so we can add the PID check */ @@ -2388,23 +2412,30 @@ _public_ int sd_bus_call( sd_bus_error *reterr_error, sd_bus_message **ret_reply) { - _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m); usec_t timeout; uint64_t cookie; size_t i; int r; - bus_assert_return(m, -EINVAL, reterr_error); - bus_assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL, reterr_error); - bus_assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL, reterr_error); + bus_assert_return(_m, -EINVAL, reterr_error); + bus_assert_return(_m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL, reterr_error); + bus_assert_return(!(_m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL, reterr_error); bus_assert_return(!bus_error_is_dirty(reterr_error), -EINVAL, reterr_error); if (bus) assert_return(bus = bus_resolve(bus), -ENOPKG); else - assert_return(bus = m->bus, -ENOTCONN); + assert_return(bus = _m->bus, -ENOTCONN); bus_assert_return(!bus_origin_changed(bus), -ECHILD, reterr_error); + /* If the current fiber and the bus share their event loop, we can use sd_bus_call_suspend() + * instead which does an async method call. This allows multiple invocations of sd_bus_call() to + * happen across multiple fibers at once. */ + if (sd_fiber_is_running() && bus->event == sd_fiber_get_event()) + return bus_call_suspend(bus, _m, usec, reterr_error, ret_reply); + + _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m); + if (!BUS_IS_OPEN(bus->state)) { r = -ENOTCONN; goto fail; @@ -3177,7 +3208,14 @@ static int process_closing(sd_bus *bus, sd_bus_message **ret) { assert(bus); assert(bus->state == BUS_CLOSING); - /* First, fail all outstanding method calls */ + /* Wait for any still-running fiber method handlers to finish unwinding their cancellation + * before tearing down the rest of the bus. bus_enter_closing() scheduled the cancel; each + * fiber resolves asynchronously and bus_fiber_resolved() removes it from the set. Returning + * 1 here keeps the bus in CLOSING state so the event loop drives the fibers to completion. */ + if (!set_isempty(bus->fiber_futures)) + return 1; + + /* Then, fail all outstanding method calls */ c = ordered_hashmap_first(bus->reply_callbacks); if (c) return process_closing_reply_callback(bus, c); diff --git a/src/libsystemd/sd-bus/test-bus-chat.c b/src/libsystemd/sd-bus/test-bus-chat.c index 1f358ccd339..d6f4860c415 100644 --- a/src/libsystemd/sd-bus/test-bus-chat.c +++ b/src/libsystemd/sd-bus/test-bus-chat.c @@ -1,11 +1,11 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ #include -#include #include #include #include "sd-bus.h" +#include "sd-future.h" #include "alloc-util.h" #include "bus-error.h" @@ -102,7 +102,8 @@ static int server_init(sd_bus **ret) { return 0; } -static int server(sd_bus *bus) { +static int server(void *userdata) { + sd_bus *bus = ASSERT_PTR(userdata); bool client1_gone = false, client2_gone = false; int r; @@ -178,7 +179,9 @@ static int server(sd_bus *bus) { client2_gone = true; } else if (sd_bus_message_is_method_call(m, "org.freedesktop.systemd.test", "Slow")) { - sleep(1); + r = sd_fiber_sleep(1 * USEC_PER_SEC); + if (r < 0) + return r; r = sd_bus_reply_method_return(m, NULL); if (r < 0) @@ -194,10 +197,10 @@ static int server(sd_bus *bus) { log_info("Received fd=%d", fd); - if (write(fd, &x, 1) < 0) { - r = log_error_errno(errno, "Failed to write to fd: %m"); + ssize_t n = sd_fiber_write(fd, &x, 1); + if (n < 0) { safe_close(fd); - return r; + return log_error_errno(n, "Failed to write to fd: %m"); } r = sd_bus_reply_method_return(m, NULL); @@ -217,7 +220,7 @@ static int server(sd_bus *bus) { return 0; } -static void* client1(void *p) { +static int client1(void *userdata) { _cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL; _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; @@ -277,9 +280,9 @@ static void* client1(void *p) { goto finish; } - errno = 0; - if (read(pp[0], &x, 1) <= 0) { - log_error("Failed to read from pipe: %s", STRERROR_OR_EOF(errno)); + ssize_t n = sd_fiber_read(pp[0], &x, 1); + if (n <= 0) { + log_error("Failed to read from pipe: %s", STRERROR_OR_EOF(n)); goto finish; } @@ -303,7 +306,7 @@ finish: } - return INT_TO_PTR(r); + return r; } static int quit_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) { @@ -315,7 +318,7 @@ static int quit_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_er return 1; } -static void* client2(void *p) { +static int client2(void *userdata) { _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL; _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; @@ -494,7 +497,7 @@ finish: (void) sd_bus_send(bus, q, NULL); } - return INT_TO_PTR(r); + return r; } static ino_t get_inode(int fd) { @@ -626,9 +629,9 @@ TEST(ctrunc) { } TEST(chat) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client1 = NULL, *f_client2 = NULL; _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; - pthread_t c1, c2; - void *p; int r; test_setup_logging(LOG_INFO); @@ -639,16 +642,18 @@ TEST(chat) { log_info("Initialized..."); - ASSERT_OK(-pthread_create(&c1, NULL, client1, NULL)); - ASSERT_OK(-pthread_create(&c2, NULL, client2, NULL)); + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + ASSERT_OK(sd_fiber_new(e, "client-1", client1, NULL, /* destroy= */ NULL, &f_client1)); + ASSERT_OK(sd_fiber_new(e, "client-2", client2, NULL, /* destroy= */ NULL, &f_client2)); + ASSERT_OK(sd_fiber_new(e, "server", server, bus, /* destroy= */ NULL, &f_server)); - r = server(bus); + ASSERT_OK(sd_event_loop(e)); - ASSERT_OK(-pthread_join(c1, &p)); - ASSERT_OK(PTR_TO_INT(p)); - ASSERT_OK(-pthread_join(c2, &p)); - ASSERT_OK(PTR_TO_INT(p)); - ASSERT_OK(r); + ASSERT_OK(sd_future_result(f_client1)); + ASSERT_OK(sd_future_result(f_client2)); + ASSERT_OK(sd_future_result(f_server)); } DEFINE_TEST_MAIN(LOG_INFO); diff --git a/src/libsystemd/sd-bus/test-bus-fiber.c b/src/libsystemd/sd-bus/test-bus-fiber.c new file mode 100644 index 00000000000..2cadf1dc175 --- /dev/null +++ b/src/libsystemd/sd-bus/test-bus-fiber.c @@ -0,0 +1,207 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include + +#include "sd-bus.h" +#include "sd-event.h" +#include "sd-future.h" + +#include "bus-internal.h" +#include "tests.h" +#include "time-util.h" + +typedef struct Context { + /* Counters for the concurrency check: every Concurrent invocation bumps in_flight on entry + * and drops it on exit, and tracks the maximum observed concurrency. If fiber dispatch + * works, two overlapping client calls must both be inside the handler at the same time, + * giving a max of at least 2. */ + int in_flight; + int max_in_flight; + sd_future *waiter; +} Context; + +static int method_concurrent(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) { + Context *c = ASSERT_PTR(userdata); + + ASSERT_OK_POSITIVE(sd_fiber_is_running()); + + c->in_flight++; + if (c->in_flight > c->max_in_flight) + c->max_in_flight = c->in_flight; + + /* Synchronize on the peer instead of sleeping: the first handler to enter stashes its + * fiber and suspends; the second resumes it and then yields, so the first runs to + * completion (sending its reply) before the second proceeds. Both are therefore in + * flight at the same time regardless of how long any individual dispatch takes, and + * the reply order matches the request order. */ + if (c->in_flight < 2) { + assert(!c->waiter); + c->waiter = sd_fiber_get_current(); + ASSERT_OK(sd_fiber_suspend()); + } else { + ASSERT_OK(sd_fiber_resume(TAKE_PTR(c->waiter), 0)); + ASSERT_OK(sd_fiber_yield()); + } + + c->in_flight--; + + return sd_bus_reply_method_return(m, NULL); +} + +static int method_fail_errno(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) { + ASSERT_OK_POSITIVE(sd_fiber_is_running()); + + /* Yielding first exercises the deferred-error path in the fiber entry: the handler returns + * a negative errno after suspending, and bus_maybe_reply_error() must still turn that into + * a matching sd_bus error reply. */ + ASSERT_OK(sd_fiber_sleep(1 * USEC_PER_MSEC)); + + return -EACCES; +} + +static int method_fail_error(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) { + ASSERT_OK_POSITIVE(sd_fiber_is_running()); + + ASSERT_OK(sd_fiber_sleep(1 * USEC_PER_MSEC)); + + return sd_bus_error_set(reterr_error, SD_BUS_ERROR_INVALID_ARGS, "bad arguments from fiber"); +} + +static const sd_bus_vtable vtable[] = { + SD_BUS_VTABLE_START(0), + SD_BUS_METHOD("Concurrent", NULL, NULL, method_concurrent, + SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER), + SD_BUS_METHOD("FailErrno", NULL, NULL, method_fail_errno, + SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER), + SD_BUS_METHOD("FailError", NULL, NULL, method_fail_error, + SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER), + SD_BUS_VTABLE_END, +}; + +typedef struct Setup { + int fds[2]; + Context *c; +} Setup; + +static int attach_pair(Setup *s, sd_bus **ret_server, sd_bus **ret_client) { + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL; + sd_id128_t id; + + assert(ret_server); + assert(ret_client); + + ASSERT_OK(sd_id128_randomize(&id)); + ASSERT_OK(sd_bus_new(&server)); + ASSERT_OK(sd_bus_set_description(server, "server")); + ASSERT_OK(sd_bus_set_fd(server, s->fds[0], s->fds[0])); + ASSERT_OK(sd_bus_set_server(server, true, id)); + ASSERT_OK(sd_bus_attach_event(server, sd_fiber_get_event(), 0)); + ASSERT_OK(sd_bus_add_object_vtable(server, NULL, "/test", "test.Fiber", vtable, s->c)); + ASSERT_OK(sd_bus_start(server)); + + ASSERT_OK(sd_bus_new(&client)); + ASSERT_OK(sd_bus_set_description(client, "client")); + ASSERT_OK(sd_bus_set_fd(client, s->fds[1], s->fds[1])); + ASSERT_OK(sd_bus_attach_event(client, sd_fiber_get_event(), 0)); + ASSERT_OK(sd_bus_start(client)); + + *ret_server = TAKE_PTR(server); + *ret_client = TAKE_PTR(client); + return 0; +} + +static int call_concurrent_fiber(void *userdata) { + sd_bus *client = ASSERT_PTR(userdata); + + /* A plain suspending sd_bus_call() — on a fiber this goes through sd_bus_call_suspend() + * which multiplexes onto the single client connection, so multiple caller fibers can have + * calls in flight at the same time. */ + return sd_bus_call_method(client, NULL, "/test", "test.Fiber", "Concurrent", + NULL, NULL, NULL); +} + +static int concurrency_fiber(void *userdata) { + Setup *s = ASSERT_PTR(userdata); + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL; + _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f_a = NULL, *f_b = NULL; + + ASSERT_OK(attach_pair(s, &server, &client)); + + /* Two concurrent calls on the shared client bus. Each lands in method_concurrent which + * blocks on the peer; if fiber dispatch works the second is entered while the first is + * suspended, so max_in_flight on the context reaches 2. */ + ASSERT_OK(sd_fiber_new(sd_fiber_get_event(), "call-a", call_concurrent_fiber, client, + /* destroy= */ NULL, &f_a)); + ASSERT_OK(sd_fiber_new(sd_fiber_get_event(), "call-b", call_concurrent_fiber, client, + /* destroy= */ NULL, &f_b)); + + ASSERT_OK(sd_fiber_await(f_a)); + ASSERT_OK(sd_fiber_await(f_b)); + + ASSERT_OK(sd_future_result(f_a)); + ASSERT_OK(sd_future_result(f_b)); + return 0; +} + +TEST(fiber_method_concurrency) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + Context c = {}; + Setup s = { .c = &c }; + + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, s.fds)); + + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + ASSERT_OK(sd_fiber_new(e, "concurrency", concurrency_fiber, &s, /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK(sd_future_result(f)); + ASSERT_GE(c.max_in_flight, 2); +} + +static int errors_fiber(void *userdata) { + Setup *s = ASSERT_PTR(userdata); + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL; + + ASSERT_OK(attach_pair(s, &server, &client)); + + /* A fiber handler that returns a negative errno gets turned into a matching sd_bus error + * reply (bus_maybe_reply_error → sd_bus_reply_method_errno). */ + _cleanup_(sd_bus_error_free) sd_bus_error e1 = SD_BUS_ERROR_NULL; + ASSERT_ERROR(sd_bus_call_method(client, NULL, "/test", "test.Fiber", "FailErrno", + &e1, NULL, NULL), + EACCES); + ASSERT_TRUE(sd_bus_error_has_name(&e1, SD_BUS_ERROR_ACCESS_DENIED)); + + /* A fiber handler that populates sd_bus_error directly propagates both name and message. */ + _cleanup_(sd_bus_error_free) sd_bus_error e2 = SD_BUS_ERROR_NULL; + ASSERT_FAIL(sd_bus_call_method(client, NULL, "/test", "test.Fiber", "FailError", + &e2, NULL, NULL)); + ASSERT_TRUE(sd_bus_error_has_name(&e2, SD_BUS_ERROR_INVALID_ARGS)); + ASSERT_STREQ(e2.message, "bad arguments from fiber"); + + return 0; +} + +TEST(fiber_method_errors) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f = NULL; + Context c = {}; + Setup s = { .c = &c }; + + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, s.fds)); + + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + ASSERT_OK(sd_fiber_new(e, "errors", errors_fiber, &s, /* destroy= */ NULL, &f)); + + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK(sd_future_result(f)); +} + +DEFINE_TEST_MAIN(LOG_DEBUG); diff --git a/src/libsystemd/sd-bus/test-bus-objects.c b/src/libsystemd/sd-bus/test-bus-objects.c index 4ad60f0d582..ac33086a6f3 100644 --- a/src/libsystemd/sd-bus/test-bus-objects.c +++ b/src/libsystemd/sd-bus/test-bus-objects.c @@ -1,8 +1,7 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ -#include - #include "sd-bus.h" +#include "sd-future.h" #include "alloc-util.h" #include "bus-internal.h" @@ -211,9 +210,9 @@ static int enumerator3_callback(sd_bus *bus, const char *path, void *userdata, c return 1; } -static void* server(void *p) { - struct context *c = p; - sd_bus *bus = NULL; +static int server(void *userdata) { + struct context *c = ASSERT_PTR(userdata); + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; sd_id128_t id; int r; @@ -242,36 +241,25 @@ static void* server(void *p) { log_error("Loop!"); r = sd_bus_process(bus, NULL); - if (r < 0) { - log_error_errno(r, "Failed to process requests: %m"); - goto fail; - } + if (r < 0) + return log_error_errno(r, "Failed to process requests: %m"); if (r == 0) { r = sd_bus_wait(bus, UINT64_MAX); - if (r < 0) { - log_error_errno(r, "Failed to wait: %m"); - goto fail; - } + if (r < 0) + return log_error_errno(r, "Failed to wait: %m"); continue; } } - r = 0; - -fail: - if (bus) { - sd_bus_flush(bus); - sd_bus_unref(bus); - } - - return INT_TO_PTR(r); + return 0; } -static int client(struct context *c) { +static int client(void *p) { + struct context *c = ASSERT_PTR(p); _cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL; - _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL; + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; _cleanup_strv_free_ char **lines = NULL; const char *s; @@ -575,16 +563,13 @@ static int client(struct context *c) { ASSERT_OK(sd_bus_call_method(bus, "org.freedesktop.systemd.test", "/foo", "org.freedesktop.systemd.test", "Exit", &error, NULL, NULL)); - sd_bus_flush(bus); - return 0; } int main(int argc, char *argv[]) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL; struct context c = {}; - pthread_t s; - void *p; - int r, q; test_setup_logging(LOG_DEBUG); @@ -593,21 +578,16 @@ int main(int argc, char *argv[]) { ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, c.fds)); - r = pthread_create(&s, NULL, server, &c); - if (r != 0) - return -r; - - r = client(&c); + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); - q = pthread_join(s, &p); - if (q != 0) - return -q; + ASSERT_OK(sd_fiber_new(e, "server", server, &c, /* destroy= */ NULL, &f_server)); + ASSERT_OK(sd_fiber_new(e, "client", client, &c, /* destroy= */ NULL, &f_client)); - if (r < 0) - return r; + ASSERT_OK(sd_event_loop(e)); - if (PTR_TO_INT(p) < 0) - return PTR_TO_INT(p); + ASSERT_OK(sd_future_result(f_server)); + ASSERT_OK(sd_future_result(f_client)); free(c.something); free(c.automatic_string_property); diff --git a/src/libsystemd/sd-bus/test-bus-peersockaddr.c b/src/libsystemd/sd-bus/test-bus-peersockaddr.c index 2cac35dde40..bee76c9b10c 100644 --- a/src/libsystemd/sd-bus/test-bus-peersockaddr.c +++ b/src/libsystemd/sd-bus/test-bus-peersockaddr.c @@ -1,9 +1,9 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ -#include #include #include "sd-bus.h" +#include "sd-future.h" #include "bus-dump.h" #include "fd-util.h" @@ -38,9 +38,9 @@ static bool gid_list_same(const gid_t *a, size_t n, const gid_t *b, size_t m) { gid_list_contained(b, m, a, n); } -static void* server(void *p) { +static int server(void *userdata) { _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; - _cleanup_close_ int listen_fd = PTR_TO_INT(p), fd = -EBADF; + _cleanup_close_ int listen_fd = PTR_TO_INT(userdata), fd = -EBADF; _cleanup_(sd_bus_creds_unrefp) sd_bus_creds *c = NULL; _cleanup_free_ char *our_comm = NULL; sd_id128_t id; @@ -48,7 +48,7 @@ static void* server(void *p) { ASSERT_OK(sd_id128_randomize(&id)); - ASSERT_OK_ERRNO(fd = accept4(listen_fd, NULL, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK)); + ASSERT_OK(fd = sd_fiber_accept(listen_fd, NULL, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK)); ASSERT_OK(sd_bus_new(&bus)); ASSERT_OK(sd_bus_set_fd(bus, fd, fd)); @@ -114,17 +114,18 @@ static void* server(void *p) { } } - return NULL; + return 0; } -static void* client(void *p) { +static int client(void *userdata) { _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; _cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL; const char *z; ASSERT_OK(sd_bus_new(&bus)); ASSERT_OK(sd_bus_set_description(bus, "wuffwuff")); - ASSERT_OK(sd_bus_set_address(bus, p)); + ASSERT_OK(sd_bus_set_address(bus, userdata)); + ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0)); ASSERT_OK(sd_bus_start(bus)); ASSERT_OK(sd_bus_call_method(bus, "foo.foo", "/foo", "foo.foo", "Foo", NULL, &reply, "s", "foo")); @@ -132,17 +133,18 @@ static void* client(void *p) { ASSERT_OK(sd_bus_message_read(reply, "s", &z)); ASSERT_STREQ(z, "bar"); - return NULL; + return 0; } TEST(description) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL; _cleanup_free_ char *a = NULL; _cleanup_close_ int fd = -EBADF; union sockaddr_union sa = { .un.sun_family = AF_UNIX, }; socklen_t salen; - pthread_t s, c; ASSERT_OK_ERRNO(fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0)); ASSERT_OK_ERRNO(bind(fd, &sa.sa, offsetof(struct sockaddr_un, sun_path))); /* force auto-bind */ @@ -155,13 +157,18 @@ TEST(description) { ASSERT_OK(asprintf(&a, "unix:abstract=%s", sa.un.sun_path + 1)); - ASSERT_OK(-pthread_create(&s, NULL, server, INT_TO_PTR(fd))); + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); + + ASSERT_OK(sd_fiber_new(e, "server", server, INT_TO_PTR(fd), /* destroy= */ NULL, &f_server)); TAKE_FD(fd); - ASSERT_OK(-pthread_create(&c, NULL, client, a)); + ASSERT_OK(sd_fiber_new(e, "client", client, a, /* destroy= */ NULL, &f_client)); + + ASSERT_OK(sd_event_loop(e)); - ASSERT_OK(-pthread_join(s, NULL)); - ASSERT_OK(-pthread_join(c, NULL)); + ASSERT_OK(sd_future_result(f_server)); + ASSERT_OK(sd_future_result(f_client)); } DEFINE_TEST_MAIN(LOG_INFO); diff --git a/src/libsystemd/sd-bus/test-bus-server.c b/src/libsystemd/sd-bus/test-bus-server.c index 989d2bf10dc..1edcec858f2 100644 --- a/src/libsystemd/sd-bus/test-bus-server.c +++ b/src/libsystemd/sd-bus/test-bus-server.c @@ -1,10 +1,12 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ -#include #include #include "sd-bus.h" +#include "sd-event.h" +#include "sd-future.h" +#include "errno-util.h" #include "log.h" #include "memory-util.h" #include "string-util.h" @@ -20,7 +22,8 @@ struct context { bool server_anonymous_auth; }; -static int _server(struct context *c) { +static int server(void *userdata) { + struct context *c = ASSERT_PTR(userdata); _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; sd_id128_t id; bool quit = false; @@ -29,6 +32,7 @@ static int _server(struct context *c) { ASSERT_OK(sd_id128_randomize(&id)); ASSERT_OK(sd_bus_new(&bus)); + ASSERT_OK(sd_bus_set_description(bus, "server")); ASSERT_OK(sd_bus_set_fd(bus, c->fds[0], c->fds[0])); ASSERT_OK(sd_bus_set_server(bus, 1, id)); ASSERT_OK(sd_bus_set_anonymous(bus, c->server_anonymous_auth)); @@ -74,17 +78,16 @@ static int _server(struct context *c) { return 0; } -static void* server(void *p) { - return INT_TO_PTR(_server(p)); -} - -static int client(struct context *c) { +static int client(void *userdata) { + struct context *c = ASSERT_PTR(userdata); _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL; - _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL; + _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; ASSERT_OK(sd_bus_new(&bus)); + ASSERT_OK(sd_bus_set_description(bus, "client")); ASSERT_OK(sd_bus_set_fd(bus, c->fds[1], c->fds[1])); + ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0)); ASSERT_OK(sd_bus_negotiate_fds(bus, c->client_negotiate_unix_fds)); ASSERT_OK(sd_bus_set_anonymous(bus, c->client_anonymous_auth)); ASSERT_OK(sd_bus_start(bus)); @@ -103,10 +106,10 @@ static int client(struct context *c) { static int test_one(bool client_negotiate_unix_fds, bool server_negotiate_unix_fds, bool client_anonymous_auth, bool server_anonymous_auth) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL; struct context c; - pthread_t s; - void *p; - int r, q; + int r = 0; zero(c); @@ -117,23 +120,18 @@ static int test_one(bool client_negotiate_unix_fds, bool server_negotiate_unix_f c.client_anonymous_auth = client_anonymous_auth; c.server_anonymous_auth = server_anonymous_auth; - r = pthread_create(&s, NULL, server, &c); - if (r != 0) - return -r; + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); - r = client(&c); + ASSERT_OK(sd_fiber_new(e, "server", server, &c, /* destroy= */ NULL, &f_server)); + ASSERT_OK(sd_fiber_new(e, "client", client, &c, /* destroy= */ NULL, &f_client)); - q = pthread_join(s, &p); - if (q != 0) - return -q; + ASSERT_OK(sd_event_loop(e)); - if (r < 0) - return r; + RET_GATHER(r, sd_future_result(f_client)); + RET_GATHER(r, sd_future_result(f_server)); - if (PTR_TO_INT(p) < 0) - return PTR_TO_INT(p); - - return 0; + return r; } int main(int argc, char *argv[]) { @@ -145,7 +143,7 @@ int main(int argc, char *argv[]) { ASSERT_OK(test_one(false, false, false, false)); ASSERT_OK(test_one(true, true, true, true)); ASSERT_OK(test_one(true, true, false, true)); - ASSERT_ERROR(test_one(true, true, true, false), EPERM); + ASSERT_ERROR(test_one(true, true, true, false), EACCES); return EXIT_SUCCESS; } diff --git a/src/libsystemd/sd-bus/test-bus-watch-bind.c b/src/libsystemd/sd-bus/test-bus-watch-bind.c index 1bf4ee70171..6561633b8a8 100644 --- a/src/libsystemd/sd-bus/test-bus-watch-bind.c +++ b/src/libsystemd/sd-bus/test-bus-watch-bind.c @@ -1,10 +1,8 @@ /* SPDX-License-Identifier: LGPL-2.1-or-later */ -#include -#include - #include "sd-bus.h" #include "sd-event.h" +#include "sd-future.h" #include "sd-id128.h" #include "alloc-util.h" @@ -44,33 +42,33 @@ static const sd_bus_vtable vtable[] = { SD_BUS_VTABLE_END, }; -static void* thread_server(void *p) { +static int server(void *userdata) { _cleanup_free_ char *suffixed = NULL, *suffixed_basename = NULL, *suffixed2 = NULL, *d = NULL; _cleanup_close_ int fd = -EBADF; union sockaddr_union u; - const char *path = p; + const char *path = ASSERT_PTR(userdata); int r; log_debug("Initializing server"); /* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */ - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); ASSERT_OK(mkdir_parents(path, 0755)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); ASSERT_OK(path_extract_directory(path, &d)); ASSERT_OK(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64())); ASSERT_OK_ERRNO(rename(d, suffixed)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); ASSERT_OK(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64())); ASSERT_OK_ERRNO(symlink(suffixed2, d)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); ASSERT_OK(path_extract_filename(suffixed, &suffixed_basename)); ASSERT_OK_ERRNO(symlink(suffixed_basename, suffixed2)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); socklen_t sa_len; r = sockaddr_un_set_path(&u.un, path); @@ -81,13 +79,13 @@ static void* thread_server(void *p) { ASSERT_OK_ERRNO(fd); ASSERT_OK_ERRNO(bind(fd, &u.sa, sa_len)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); ASSERT_OK_ERRNO(listen(fd, SOMAXCONN_DELUXE)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); ASSERT_OK(touch(path)); - usleep_safe(100 * USEC_PER_MSEC); + ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC)); log_debug("Initialized server"); @@ -101,8 +99,7 @@ static void* thread_server(void *p) { ASSERT_OK(sd_event_new(&event)); - bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); - ASSERT_OK_ERRNO(bus_fd); + ASSERT_OK(bus_fd = sd_fiber_accept(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC)); log_debug("Accepted server connection"); @@ -129,13 +126,13 @@ static void* thread_server(void *p) { log_debug("Server done"); - return NULL; + return 0; } -static void* thread_client1(void *p) { +static int client1(void *userdata) { _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; - const char *path = p, *t; + const char *path = ASSERT_PTR(userdata), *t; log_debug("Initializing client1"); @@ -151,59 +148,65 @@ static void* thread_client1(void *p) { log_debug("Client1 done"); - return NULL; -} - -static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) { - ASSERT_OK_ZERO(sd_bus_message_is_method_error(m, NULL)); - ASSERT_OK(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0)); return 0; } -static void* thread_client2(void *p) { +static int client2(void *userdata) { _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; - _cleanup_(sd_event_unrefp) sd_event *event = NULL; - const char *path = p, *t; + const char *path = ASSERT_PTR(userdata), *t; log_debug("Initializing client2"); - ASSERT_OK(sd_event_new(&event)); ASSERT_OK(sd_bus_new(&bus)); ASSERT_OK(sd_bus_set_description(bus, "client2")); t = strjoina("unix:path=", path); ASSERT_OK(sd_bus_set_address(bus, t)); ASSERT_OK(sd_bus_set_watch_bind(bus, true)); - ASSERT_OK(sd_bus_attach_event(bus, event, 0)); + ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0)); ASSERT_OK(sd_bus_start(bus)); - ASSERT_OK(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL)); - - ASSERT_OK(sd_event_loop(event)); + _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL; + ASSERT_OK(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", NULL, &m, NULL)); + ASSERT_OK_ZERO(sd_bus_message_is_method_error(m, NULL)); log_debug("Client2 done"); - return NULL; + return 0; } -static void request_exit(const char *path) { +typedef struct RequestExitArgs { + const char *path; + sd_future *client1; + sd_future *client2; +} RequestExitArgs; + +static int request_exit(void *userdata) { _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; + RequestExitArgs *args = ASSERT_PTR(userdata); const char *t; + /* Wait for all client fibers to complete before requesting exit */ + ASSERT_OK(sd_fiber_await(args->client1)); + ASSERT_OK(sd_fiber_await(args->client2)); + ASSERT_OK(sd_bus_new(&bus)); - t = strjoina("unix:path=", path); + t = strjoina("unix:path=", args->path); ASSERT_OK(sd_bus_set_address(bus, t)); ASSERT_OK(sd_bus_set_watch_bind(bus, true)); ASSERT_OK(sd_bus_set_description(bus, "request-exit")); ASSERT_OK(sd_bus_start(bus)); ASSERT_OK(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL)); + + return 0; } int main(int argc, char *argv[]) { _cleanup_(rm_rf_physical_and_freep) char *d = NULL; - pthread_t server, client1, client2; + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client1 = NULL, *f_client2 = NULL, *f_exit = NULL; char *path; test_setup_logging(LOG_DEBUG); @@ -214,16 +217,27 @@ int main(int argc, char *argv[]) { path = strjoina(d, "/this/is/a/socket"); - ASSERT_OK(-pthread_create(&server, NULL, thread_server, path)); - ASSERT_OK(-pthread_create(&client1, NULL, thread_client1, path)); - ASSERT_OK(-pthread_create(&client2, NULL, thread_client2, path)); + ASSERT_OK(sd_event_new(&e)); + ASSERT_OK(sd_event_set_exit_on_idle(e, true)); - ASSERT_OK(-pthread_join(client1, NULL)); - ASSERT_OK(-pthread_join(client2, NULL)); + ASSERT_OK(sd_fiber_new(e, "server", server, path, /* destroy= */ NULL, &f_server)); - request_exit(path); + ASSERT_OK(sd_fiber_new(e, "client-1", client1, path, /* destroy= */ NULL, &f_client1)); + ASSERT_OK(sd_fiber_new(e, "client-2", client2, path, /* destroy= */ NULL, &f_client2)); - ASSERT_OK(-pthread_join(server, NULL)); + RequestExitArgs args = { + .path = path, + .client1 = f_client1, + .client2 = f_client2, + }; + ASSERT_OK(sd_fiber_new(e, "request-exit", request_exit, &args, /* destroy= */ NULL, &f_exit)); - return 0; + ASSERT_OK(sd_event_loop(e)); + + ASSERT_OK(sd_future_result(f_client1)); + ASSERT_OK(sd_future_result(f_client2)); + ASSERT_OK(sd_future_result(f_exit)); + ASSERT_OK(sd_future_result(f_server)); + + return EXIT_SUCCESS; } diff --git a/src/systemd/sd-bus-vtable.h b/src/systemd/sd-bus-vtable.h index 5c11ca8ae5b..036bda3fe47 100644 --- a/src/systemd/sd-bus-vtable.h +++ b/src/systemd/sd-bus-vtable.h @@ -44,6 +44,7 @@ __extension__ enum { SD_BUS_VTABLE_PROPERTY_EXPLICIT = 1ULL << 7, SD_BUS_VTABLE_SENSITIVE = 1ULL << 8, /* covers both directions: method call + reply */ SD_BUS_VTABLE_ABSOLUTE_OFFSET = 1ULL << 9, + /* Bit 10 is reserved for the private SD_BUS_VTABLE_METHOD_FIBER flag (see bus-internal.h). */ _SD_BUS_VTABLE_CAPABILITY_MASK = 0xFFFFULL << 40 };