'sd-bus/bus-dump.c',
'sd-bus/bus-dump-json.c',
'sd-bus/bus-error.c',
+ 'sd-bus/bus-future.c',
'sd-bus/bus-internal.c',
'sd-bus/bus-introspect.c',
'sd-bus/bus-kernel.c',
simple_tests += files(
'sd-bus/test-bus-creds.c',
+ 'sd-bus/test-bus-fiber.c',
'sd-bus/test-bus-introspect.c',
'sd-bus/test-bus-match.c',
'sd-bus/test-bus-vtable.c',
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include "sd-bus.h"
+#include "sd-future.h"
+
+#include "alloc-util.h"
+#include "bus-future.h"
+#include "bus-internal.h"
+#include "bus-message.h"
+
+typedef struct BusFuture {
+ sd_bus_slot *slot;
+ sd_bus_message *reply;
+} BusFuture;
+
+static void* bus_future_alloc(void) {
+ return new0(BusFuture, 1);
+}
+
+static void bus_future_free(sd_future *f) {
+ BusFuture *bf = ASSERT_PTR(sd_future_get_private(f));
+ sd_bus_slot_unref(bf->slot);
+ sd_bus_message_unref(bf->reply);
+ free(bf);
+}
+
+static int bus_future_cancel(sd_future *f) {
+ BusFuture *bf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+
+ bf->slot = sd_bus_slot_unref(bf->slot);
+ return sd_future_resolve(f, -ECANCELED);
+}
+
+static const sd_future_ops bus_future_ops = {
+ .size = sizeof(sd_future_ops),
+ .alloc = bus_future_alloc,
+ .free = bus_future_free,
+ .cancel = bus_future_cancel,
+};
+
+static int bus_future_handler(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+ sd_future *f = ASSERT_PTR(userdata);
+ BusFuture *bf = ASSERT_PTR(sd_future_get_private(f));
+ int r;
+
+ /* Resolve with 0 on a success reply and -errno (derived from the D-Bus error name) on a
+ * method error reply, so a caller awaiting the future learns about call failures from the
+ * resolution value alone. The reply itself is always stashed in bf->reply so
+ * future_get_bus_reply() can hand back the detailed sd_bus_error (name + message) on
+ * top of the bare errno. Cancellation surfaces as -ECANCELED via bus_future_cancel(),
+ * with bf->reply left NULL — callers can distinguish "got an error reply" from "no reply
+ * will arrive" by whether future_get_bus_reply() can produce a message. */
+ bf->slot = sd_bus_slot_unref(bf->slot);
+ bf->reply = sd_bus_message_ref(m);
+
+ r = sd_bus_message_is_method_error(m, NULL) ? -sd_bus_message_get_errno(m) : 0;
+ return sd_future_resolve(f, r);
+}
+
+int bus_call_future(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_future **ret) {
+ int r;
+
+ assert(bus);
+ assert(m);
+ assert(ret);
+
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ r = sd_future_new(&bus_future_ops, &f);
+ if (r < 0)
+ return r;
+
+ BusFuture *bf = sd_future_get_private(f);
+
+ r = sd_bus_call_async(bus, &bf->slot, m, bus_future_handler, f, usec);
+ if (r < 0)
+ return r;
+
+ *ret = TAKE_PTR(f);
+ return 0;
+}
+
+int future_get_bus_reply(sd_future *f, sd_bus_error *reterr_error, sd_bus_message **ret_reply) {
+ BusFuture *bf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+ sd_bus_message *reply = ASSERT_PTR(bf->reply);
+
+ assert(sd_future_get_ops(f) == &bus_future_ops);
+ assert(sd_future_state(f) == SD_FUTURE_RESOLVED);
+
+ if (sd_bus_message_is_method_error(reply, NULL)) {
+ if (reterr_error)
+ return sd_bus_error_copy(reterr_error, sd_bus_message_get_error(reply));
+ return -sd_bus_message_get_errno(reply);
+ }
+
+ if (reply->n_fds > 0 && !sd_bus_message_get_bus(reply)->accept_fd)
+ return sd_bus_error_set(reterr_error, SD_BUS_ERROR_INCONSISTENT_MESSAGE,
+ "Reply message contained file descriptors which I couldn't accept. Sorry.");
+
+ if (reterr_error)
+ *reterr_error = SD_BUS_ERROR_NULL;
+ if (ret_reply)
+ *ret_reply = sd_bus_message_ref(reply);
+
+ return 1;
+}
+
+int bus_call_suspend(
+ sd_bus *bus,
+ sd_bus_message *m,
+ uint64_t usec,
+ sd_bus_error *reterr_error,
+ sd_bus_message **ret_reply) {
+
+ int r;
+
+ assert(bus);
+ assert(m);
+ assert(sd_fiber_is_running());
+
+ _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f = NULL;
+ r = bus_call_future(bus, m, usec, &f);
+ if (r < 0)
+ return sd_bus_error_set_errno(reterr_error, r);
+
+ r = sd_fiber_suspend();
+
+ /* If the future isn't resolved, the suspend was interrupted before a reply arrived (fiber
+ * cancelled, fiber-wide SD_FIBER_TIMEOUT scope expired, …). There's no reply to extract,
+ * so surface the resume error directly. When the future is resolved, future_get_bus_reply()
+ * recovers either the reply or the detailed sd_bus_error from the error reply. */
+ if (sd_future_state(f) != SD_FUTURE_RESOLVED)
+ return sd_bus_error_set_errno(reterr_error, r);
+
+ return future_get_bus_reply(f, reterr_error, ret_reply);
+}
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+#include "sd-forward.h"
+
+int bus_call_future(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_future **ret);
+int future_get_bus_reply(sd_future *f, sd_bus_error *reterr_error, sd_bus_message **ret_reply);
+
+int bus_call_suspend(
+ sd_bus *bus,
+ sd_bus_message *m,
+ uint64_t usec,
+ sd_bus_error *reterr_error,
+ sd_bus_message **ret_reply);
#define DEFAULT_SYSTEM_BUS_ADDRESS "unix:path=/run/dbus/system_bus_socket"
#define DEFAULT_USER_BUS_ADDRESS_FMT "unix:path=%s/bus"
+/* Private vtable flag: dispatch the method handler on its own fiber, so it can use suspending
+ * primitives (sd_bus_call() on a fiber, sd_fiber_sleep(), loop_read_suspend(), ...) without
+ * blocking the event loop for other connections or method calls. Kept out of the public
+ * sd-bus-vtable.h so the fiber runtime stays an implementation detail of systemd. The bit value is
+ * reserved in sd-bus-vtable.h to make sure it never collides with a future public flag. */
+#define SD_BUS_VTABLE_METHOD_FIBER (UINT64_C(1) << 10)
+
typedef struct BusReplyCallback {
sd_bus_message_handler_t callback;
usec_t timeout_usec; /* this is a relative timeout until we reach the BUS_HELLO state, and an absolute one right after */
Set *vtable_methods;
Set *vtable_properties;
+ /* Futures for outstanding SD_BUS_VTABLE_METHOD_FIBER dispatches. Entries are added as the
+ * dispatcher spawns each fiber and removed when the fiber resolves. On bus_enter_closing()
+ * we cancel everything in here and then wait in process_closing() until the set drains,
+ * before tearing down the rest of the bus. */
+ Set *fiber_futures;
+
union sockaddr_union sockaddr;
socklen_t sockaddr_size;
#include <linux/capability.h>
#include "sd-bus.h"
+#include "sd-future.h"
#include "alloc-util.h"
#include "bus-internal.h"
return sd_bus_error_setf(reterr_error, SD_BUS_ERROR_ACCESS_DENIED, "Access to %s.%s() not permitted.", c->interface, c->member);
}
+typedef struct BusFiberData {
+ sd_bus *bus;
+ sd_bus_message *message;
+ sd_bus_slot *slot;
+ sd_bus_message_handler_t handler;
+ void *userdata;
+} BusFiberData;
+
+static BusFiberData* bus_fiber_data_free(BusFiberData *d) {
+ if (!d)
+ return NULL;
+
+ sd_bus_slot_unref(d->slot);
+ sd_bus_message_unref(d->message);
+ sd_bus_unref(d->bus);
+ return mfree(d);
+}
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(BusFiberData*, bus_fiber_data_free);
+
+static void bus_fiber_data_destroy(void *userdata) {
+ bus_fiber_data_free(userdata);
+}
+
+static void bus_fiber_future_unref(void *p) {
+ sd_future_unref(p);
+}
+
+DEFINE_PRIVATE_HASH_OPS_WITH_KEY_DESTRUCTOR(
+ bus_fiber_future_hash_ops,
+ void,
+ trivial_hash_func,
+ trivial_compare_func,
+ bus_fiber_future_unref);
+
+static int bus_fiber_resolved(sd_future *f) {
+ sd_bus *bus = ASSERT_PTR(sd_future_get_userdata(f));
+
+ assert_se(set_remove(bus->fiber_futures, f) == f);
+ sd_future_unref(f);
+ return 0;
+}
+
+static int bus_fiber_entry(void *userdata) {
+ BusFiberData *d = ASSERT_PTR(userdata);
+ _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
+ int r;
+
+ /* Note: unlike the synchronous dispatch path, we deliberately do NOT set
+ * bus->current_slot/handler/userdata around the callback. Those fields track the slot of the
+ * message currently being dispatched inline and must be NULL at each entry into
+ * bus_process_internal(). Because a fiber handler can yield and let the event loop dispatch
+ * other messages before it resumes, leaving current_slot non-NULL across yields would trip
+ * that invariant. Fiber handlers receive their slot's userdata via the handler argument, so
+ * sd_bus_get_current_slot()/handler()/userdata() simply aren't meaningful inside them — the
+ * handler should use the message/userdata parameters directly instead. */
+ r = d->handler(d->message, d->userdata, &error);
+
+ return bus_maybe_reply_error(d->message, r, &error);
+}
+
static int method_callbacks_run(
sd_bus *bus,
sd_bus_message *m,
slot = container_of(c->parent, sd_bus_slot, node_vtable);
+ if (FLAGS_SET(c->vtable->flags, SD_BUS_VTABLE_METHOD_FIBER)) {
+ /* A fiber-dispatched method requires an event loop to spawn the fiber on.
+ * By the time a method call actually arrives the bus is running, so the
+ * event loop should already be attached — if not, the caller set up the bus
+ * wrong and there's no meaningful recovery. */
+ assert(bus->event);
+
+ _cleanup_(bus_fiber_data_freep) BusFiberData *d = new(BusFiberData, 1);
+ if (!d)
+ return -ENOMEM;
+
+ *d = (BusFiberData) {
+ .bus = sd_bus_ref(bus),
+ .message = sd_bus_message_ref(m),
+ .slot = sd_bus_slot_ref(slot),
+ .handler = c->vtable->x.method.handler,
+ .userdata = u,
+ };
+
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ r = sd_fiber_new(bus->event, c->member, bus_fiber_entry, d, bus_fiber_data_destroy, &f);
+ if (r < 0)
+ return bus_maybe_reply_error(m, r, NULL);
+
+ /* The fiber now owns d via bus_fiber_data_destroy. Drop our cleanup before any
+ * further fallible calls, so a later failure unwinding f doesn't double-free d. */
+ TAKE_PTR(d);
+
+ r = set_ensure_put(&bus->fiber_futures, &bus_fiber_future_hash_ops, f);
+ if (r < 0)
+ return bus_maybe_reply_error(m, r, NULL);
+ assert(r > 0);
+
+ /* Track the future on the bus so shutdown can cancel it and wait for it. */
+ r = sd_future_set_callback(f, bus_fiber_resolved, bus);
+ if (r < 0) {
+ /* TAKE_PTR(f) hasn't run yet, so our cleanup attribute still owns the
+ * ref; set_remove() returns the raw pointer without firing the hash_ops
+ * destructor, and the cleanup will unref f on return. */
+ assert_se(set_remove(bus->fiber_futures, f) == f);
+ return bus_maybe_reply_error(m, r, NULL);
+ }
+
+ TAKE_PTR(f);
+ return 1;
+ }
+
bus->current_slot = sd_bus_slot_ref(slot);
bus->current_handler = c->vtable->x.method.handler;
bus->current_userdata = u;
#include "sd-bus.h"
#include "sd-event.h"
+#include "sd-future.h"
#include "af-list.h"
#include "alloc-util.h"
#include "bus-container.h"
#include "bus-control.h"
#include "bus-error.h"
+#include "bus-future.h"
#include "bus-internal.h"
#include "bus-kernel.h"
#include "bus-label.h"
ordered_hashmap_free(b->reply_callbacks);
prioq_free(b->reply_callbacks_prioq);
+ /* Outstanding fiber handlers pin the bus via their BusFiberData ref, so by the time refcount
+ * reaches zero and bus_free() runs, every fiber has already resolved and removed itself from
+ * this set. */
+ assert(set_isempty(b->fiber_futures));
+ set_free(b->fiber_futures);
+
assert(b->match_callbacks.type == BUS_MATCH_ROOT);
bus_match_free(&b->match_callbacks);
}
void bus_enter_closing(sd_bus *bus, int exit_code) {
+ sd_future *f;
+ int r;
+
assert(bus);
if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
bus_set_state(bus, BUS_CLOSING);
bus->exit_code = exit_code;
+
+ /* Cancel all outstanding fiber-dispatched method handlers. Most cancellations are scheduled
+ * asynchronously (fibers resolve with -ECANCELED the next time they run), but a fiber still
+ * in FIBER_STATE_INITIAL resolves synchronously, which fires bus_fiber_resolved() and
+ * removes f from this set mid-iteration. That's safe because SET_FOREACH permits removal of
+ * exactly the current entry — see the assertion in hashmap_iterate_entry(). Either way this
+ * doesn't block here: process_closing() waits for the fiber_futures set to drain before it
+ * continues tearing down the rest of the bus. */
+ SET_FOREACH(f, bus->fiber_futures) {
+ r = sd_future_cancel(f);
+ if (r < 0)
+ log_debug_errno(r, "Failed to cancel outstanding fiber method handler, ignoring: %m");
+ }
}
/* Define manually so we can add the PID check */
sd_bus_error *reterr_error,
sd_bus_message **ret_reply) {
- _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m);
usec_t timeout;
uint64_t cookie;
size_t i;
int r;
- bus_assert_return(m, -EINVAL, reterr_error);
- bus_assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL, reterr_error);
- bus_assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL, reterr_error);
+ bus_assert_return(_m, -EINVAL, reterr_error);
+ bus_assert_return(_m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL, reterr_error);
+ bus_assert_return(!(_m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL, reterr_error);
bus_assert_return(!bus_error_is_dirty(reterr_error), -EINVAL, reterr_error);
if (bus)
assert_return(bus = bus_resolve(bus), -ENOPKG);
else
- assert_return(bus = m->bus, -ENOTCONN);
+ assert_return(bus = _m->bus, -ENOTCONN);
bus_assert_return(!bus_origin_changed(bus), -ECHILD, reterr_error);
+ /* If the current fiber and the bus share their event loop, we can use sd_bus_call_suspend()
+ * instead which does an async method call. This allows multiple invocations of sd_bus_call() to
+ * happen across multiple fibers at once. */
+ if (sd_fiber_is_running() && bus->event == sd_fiber_get_event())
+ return bus_call_suspend(bus, _m, usec, reterr_error, ret_reply);
+
+ _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m);
+
if (!BUS_IS_OPEN(bus->state)) {
r = -ENOTCONN;
goto fail;
assert(bus);
assert(bus->state == BUS_CLOSING);
- /* First, fail all outstanding method calls */
+ /* Wait for any still-running fiber method handlers to finish unwinding their cancellation
+ * before tearing down the rest of the bus. bus_enter_closing() scheduled the cancel; each
+ * fiber resolves asynchronously and bus_fiber_resolved() removes it from the set. Returning
+ * 1 here keeps the bus in CLOSING state so the event loop drives the fibers to completion. */
+ if (!set_isempty(bus->fiber_futures))
+ return 1;
+
+ /* Then, fail all outstanding method calls */
c = ordered_hashmap_first(bus->reply_callbacks);
if (c)
return process_closing_reply_callback(bus, c);
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include <fcntl.h>
-#include <pthread.h>
#include <sys/resource.h>
#include <unistd.h>
#include "sd-bus.h"
+#include "sd-future.h"
#include "alloc-util.h"
#include "bus-error.h"
return 0;
}
-static int server(sd_bus *bus) {
+static int server(void *userdata) {
+ sd_bus *bus = ASSERT_PTR(userdata);
bool client1_gone = false, client2_gone = false;
int r;
client2_gone = true;
} else if (sd_bus_message_is_method_call(m, "org.freedesktop.systemd.test", "Slow")) {
- sleep(1);
+ r = sd_fiber_sleep(1 * USEC_PER_SEC);
+ if (r < 0)
+ return r;
r = sd_bus_reply_method_return(m, NULL);
if (r < 0)
log_info("Received fd=%d", fd);
- if (write(fd, &x, 1) < 0) {
- r = log_error_errno(errno, "Failed to write to fd: %m");
+ ssize_t n = sd_fiber_write(fd, &x, 1);
+ if (n < 0) {
safe_close(fd);
- return r;
+ return log_error_errno(n, "Failed to write to fd: %m");
}
r = sd_bus_reply_method_return(m, NULL);
return 0;
}
-static void* client1(void *p) {
+static int client1(void *userdata) {
_cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL;
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
goto finish;
}
- errno = 0;
- if (read(pp[0], &x, 1) <= 0) {
- log_error("Failed to read from pipe: %s", STRERROR_OR_EOF(errno));
+ ssize_t n = sd_fiber_read(pp[0], &x, 1);
+ if (n <= 0) {
+ log_error("Failed to read from pipe: %s", STRERROR_OR_EOF(n));
goto finish;
}
}
- return INT_TO_PTR(r);
+ return r;
}
static int quit_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
return 1;
}
-static void* client2(void *p) {
+static int client2(void *userdata) {
_cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL;
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
(void) sd_bus_send(bus, q, NULL);
}
- return INT_TO_PTR(r);
+ return r;
}
static ino_t get_inode(int fd) {
}
TEST(chat) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client1 = NULL, *f_client2 = NULL;
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
- pthread_t c1, c2;
- void *p;
int r;
test_setup_logging(LOG_INFO);
log_info("Initialized...");
- ASSERT_OK(-pthread_create(&c1, NULL, client1, NULL));
- ASSERT_OK(-pthread_create(&c2, NULL, client2, NULL));
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+ ASSERT_OK(sd_fiber_new(e, "client-1", client1, NULL, /* destroy= */ NULL, &f_client1));
+ ASSERT_OK(sd_fiber_new(e, "client-2", client2, NULL, /* destroy= */ NULL, &f_client2));
+ ASSERT_OK(sd_fiber_new(e, "server", server, bus, /* destroy= */ NULL, &f_server));
- r = server(bus);
+ ASSERT_OK(sd_event_loop(e));
- ASSERT_OK(-pthread_join(c1, &p));
- ASSERT_OK(PTR_TO_INT(p));
- ASSERT_OK(-pthread_join(c2, &p));
- ASSERT_OK(PTR_TO_INT(p));
- ASSERT_OK(r);
+ ASSERT_OK(sd_future_result(f_client1));
+ ASSERT_OK(sd_future_result(f_client2));
+ ASSERT_OK(sd_future_result(f_server));
}
DEFINE_TEST_MAIN(LOG_INFO);
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <sys/socket.h>
+
+#include "sd-bus.h"
+#include "sd-event.h"
+#include "sd-future.h"
+
+#include "bus-internal.h"
+#include "tests.h"
+#include "time-util.h"
+
+typedef struct Context {
+ /* Counters for the concurrency check: every Concurrent invocation bumps in_flight on entry
+ * and drops it on exit, and tracks the maximum observed concurrency. If fiber dispatch
+ * works, two overlapping client calls must both be inside the handler at the same time,
+ * giving a max of at least 2. */
+ int in_flight;
+ int max_in_flight;
+ sd_future *waiter;
+} Context;
+
+static int method_concurrent(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+ Context *c = ASSERT_PTR(userdata);
+
+ ASSERT_OK_POSITIVE(sd_fiber_is_running());
+
+ c->in_flight++;
+ if (c->in_flight > c->max_in_flight)
+ c->max_in_flight = c->in_flight;
+
+ /* Synchronize on the peer instead of sleeping: the first handler to enter stashes its
+ * fiber and suspends; the second resumes it and then yields, so the first runs to
+ * completion (sending its reply) before the second proceeds. Both are therefore in
+ * flight at the same time regardless of how long any individual dispatch takes, and
+ * the reply order matches the request order. */
+ if (c->in_flight < 2) {
+ assert(!c->waiter);
+ c->waiter = sd_fiber_get_current();
+ ASSERT_OK(sd_fiber_suspend());
+ } else {
+ ASSERT_OK(sd_fiber_resume(TAKE_PTR(c->waiter), 0));
+ ASSERT_OK(sd_fiber_yield());
+ }
+
+ c->in_flight--;
+
+ return sd_bus_reply_method_return(m, NULL);
+}
+
+static int method_fail_errno(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+ ASSERT_OK_POSITIVE(sd_fiber_is_running());
+
+ /* Yielding first exercises the deferred-error path in the fiber entry: the handler returns
+ * a negative errno after suspending, and bus_maybe_reply_error() must still turn that into
+ * a matching sd_bus error reply. */
+ ASSERT_OK(sd_fiber_sleep(1 * USEC_PER_MSEC));
+
+ return -EACCES;
+}
+
+static int method_fail_error(sd_bus_message *m, void *userdata, sd_bus_error *reterr_error) {
+ ASSERT_OK_POSITIVE(sd_fiber_is_running());
+
+ ASSERT_OK(sd_fiber_sleep(1 * USEC_PER_MSEC));
+
+ return sd_bus_error_set(reterr_error, SD_BUS_ERROR_INVALID_ARGS, "bad arguments from fiber");
+}
+
+static const sd_bus_vtable vtable[] = {
+ SD_BUS_VTABLE_START(0),
+ SD_BUS_METHOD("Concurrent", NULL, NULL, method_concurrent,
+ SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER),
+ SD_BUS_METHOD("FailErrno", NULL, NULL, method_fail_errno,
+ SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER),
+ SD_BUS_METHOD("FailError", NULL, NULL, method_fail_error,
+ SD_BUS_VTABLE_UNPRIVILEGED|SD_BUS_VTABLE_METHOD_FIBER),
+ SD_BUS_VTABLE_END,
+};
+
+typedef struct Setup {
+ int fds[2];
+ Context *c;
+} Setup;
+
+static int attach_pair(Setup *s, sd_bus **ret_server, sd_bus **ret_client) {
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL;
+ sd_id128_t id;
+
+ assert(ret_server);
+ assert(ret_client);
+
+ ASSERT_OK(sd_id128_randomize(&id));
+ ASSERT_OK(sd_bus_new(&server));
+ ASSERT_OK(sd_bus_set_description(server, "server"));
+ ASSERT_OK(sd_bus_set_fd(server, s->fds[0], s->fds[0]));
+ ASSERT_OK(sd_bus_set_server(server, true, id));
+ ASSERT_OK(sd_bus_attach_event(server, sd_fiber_get_event(), 0));
+ ASSERT_OK(sd_bus_add_object_vtable(server, NULL, "/test", "test.Fiber", vtable, s->c));
+ ASSERT_OK(sd_bus_start(server));
+
+ ASSERT_OK(sd_bus_new(&client));
+ ASSERT_OK(sd_bus_set_description(client, "client"));
+ ASSERT_OK(sd_bus_set_fd(client, s->fds[1], s->fds[1]));
+ ASSERT_OK(sd_bus_attach_event(client, sd_fiber_get_event(), 0));
+ ASSERT_OK(sd_bus_start(client));
+
+ *ret_server = TAKE_PTR(server);
+ *ret_client = TAKE_PTR(client);
+ return 0;
+}
+
+static int call_concurrent_fiber(void *userdata) {
+ sd_bus *client = ASSERT_PTR(userdata);
+
+ /* A plain suspending sd_bus_call() — on a fiber this goes through sd_bus_call_suspend()
+ * which multiplexes onto the single client connection, so multiple caller fibers can have
+ * calls in flight at the same time. */
+ return sd_bus_call_method(client, NULL, "/test", "test.Fiber", "Concurrent",
+ NULL, NULL, NULL);
+}
+
+static int concurrency_fiber(void *userdata) {
+ Setup *s = ASSERT_PTR(userdata);
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL;
+ _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f_a = NULL, *f_b = NULL;
+
+ ASSERT_OK(attach_pair(s, &server, &client));
+
+ /* Two concurrent calls on the shared client bus. Each lands in method_concurrent which
+ * blocks on the peer; if fiber dispatch works the second is entered while the first is
+ * suspended, so max_in_flight on the context reaches 2. */
+ ASSERT_OK(sd_fiber_new(sd_fiber_get_event(), "call-a", call_concurrent_fiber, client,
+ /* destroy= */ NULL, &f_a));
+ ASSERT_OK(sd_fiber_new(sd_fiber_get_event(), "call-b", call_concurrent_fiber, client,
+ /* destroy= */ NULL, &f_b));
+
+ ASSERT_OK(sd_fiber_await(f_a));
+ ASSERT_OK(sd_fiber_await(f_b));
+
+ ASSERT_OK(sd_future_result(f_a));
+ ASSERT_OK(sd_future_result(f_b));
+ return 0;
+}
+
+TEST(fiber_method_concurrency) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ Context c = {};
+ Setup s = { .c = &c };
+
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, s.fds));
+
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+ ASSERT_OK(sd_fiber_new(e, "concurrency", concurrency_fiber, &s, /* destroy= */ NULL, &f));
+
+ ASSERT_OK(sd_event_loop(e));
+
+ ASSERT_OK(sd_future_result(f));
+ ASSERT_GE(c.max_in_flight, 2);
+}
+
+static int errors_fiber(void *userdata) {
+ Setup *s = ASSERT_PTR(userdata);
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *server = NULL, *client = NULL;
+
+ ASSERT_OK(attach_pair(s, &server, &client));
+
+ /* A fiber handler that returns a negative errno gets turned into a matching sd_bus error
+ * reply (bus_maybe_reply_error → sd_bus_reply_method_errno). */
+ _cleanup_(sd_bus_error_free) sd_bus_error e1 = SD_BUS_ERROR_NULL;
+ ASSERT_ERROR(sd_bus_call_method(client, NULL, "/test", "test.Fiber", "FailErrno",
+ &e1, NULL, NULL),
+ EACCES);
+ ASSERT_TRUE(sd_bus_error_has_name(&e1, SD_BUS_ERROR_ACCESS_DENIED));
+
+ /* A fiber handler that populates sd_bus_error directly propagates both name and message. */
+ _cleanup_(sd_bus_error_free) sd_bus_error e2 = SD_BUS_ERROR_NULL;
+ ASSERT_FAIL(sd_bus_call_method(client, NULL, "/test", "test.Fiber", "FailError",
+ &e2, NULL, NULL));
+ ASSERT_TRUE(sd_bus_error_has_name(&e2, SD_BUS_ERROR_INVALID_ARGS));
+ ASSERT_STREQ(e2.message, "bad arguments from fiber");
+
+ return 0;
+}
+
+TEST(fiber_method_errors) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ Context c = {};
+ Setup s = { .c = &c };
+
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, s.fds));
+
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+ ASSERT_OK(sd_fiber_new(e, "errors", errors_fiber, &s, /* destroy= */ NULL, &f));
+
+ ASSERT_OK(sd_event_loop(e));
+
+ ASSERT_OK(sd_future_result(f));
+}
+
+DEFINE_TEST_MAIN(LOG_DEBUG);
/* SPDX-License-Identifier: LGPL-2.1-or-later */
-#include <pthread.h>
-
#include "sd-bus.h"
+#include "sd-future.h"
#include "alloc-util.h"
#include "bus-internal.h"
return 1;
}
-static void* server(void *p) {
- struct context *c = p;
- sd_bus *bus = NULL;
+static int server(void *userdata) {
+ struct context *c = ASSERT_PTR(userdata);
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
sd_id128_t id;
int r;
log_error("Loop!");
r = sd_bus_process(bus, NULL);
- if (r < 0) {
- log_error_errno(r, "Failed to process requests: %m");
- goto fail;
- }
+ if (r < 0)
+ return log_error_errno(r, "Failed to process requests: %m");
if (r == 0) {
r = sd_bus_wait(bus, UINT64_MAX);
- if (r < 0) {
- log_error_errno(r, "Failed to wait: %m");
- goto fail;
- }
+ if (r < 0)
+ return log_error_errno(r, "Failed to wait: %m");
continue;
}
}
- r = 0;
-
-fail:
- if (bus) {
- sd_bus_flush(bus);
- sd_bus_unref(bus);
- }
-
- return INT_TO_PTR(r);
+ return 0;
}
-static int client(struct context *c) {
+static int client(void *p) {
+ struct context *c = ASSERT_PTR(p);
_cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL;
- _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL;
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
_cleanup_strv_free_ char **lines = NULL;
const char *s;
ASSERT_OK(sd_bus_call_method(bus, "org.freedesktop.systemd.test", "/foo", "org.freedesktop.systemd.test", "Exit", &error, NULL, NULL));
- sd_bus_flush(bus);
-
return 0;
}
int main(int argc, char *argv[]) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL;
struct context c = {};
- pthread_t s;
- void *p;
- int r, q;
test_setup_logging(LOG_DEBUG);
ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM, 0, c.fds));
- r = pthread_create(&s, NULL, server, &c);
- if (r != 0)
- return -r;
-
- r = client(&c);
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
- q = pthread_join(s, &p);
- if (q != 0)
- return -q;
+ ASSERT_OK(sd_fiber_new(e, "server", server, &c, /* destroy= */ NULL, &f_server));
+ ASSERT_OK(sd_fiber_new(e, "client", client, &c, /* destroy= */ NULL, &f_client));
- if (r < 0)
- return r;
+ ASSERT_OK(sd_event_loop(e));
- if (PTR_TO_INT(p) < 0)
- return PTR_TO_INT(p);
+ ASSERT_OK(sd_future_result(f_server));
+ ASSERT_OK(sd_future_result(f_client));
free(c.something);
free(c.automatic_string_property);
/* SPDX-License-Identifier: LGPL-2.1-or-later */
-#include <pthread.h>
#include <unistd.h>
#include "sd-bus.h"
+#include "sd-future.h"
#include "bus-dump.h"
#include "fd-util.h"
gid_list_contained(b, m, a, n);
}
-static void* server(void *p) {
+static int server(void *userdata) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
- _cleanup_close_ int listen_fd = PTR_TO_INT(p), fd = -EBADF;
+ _cleanup_close_ int listen_fd = PTR_TO_INT(userdata), fd = -EBADF;
_cleanup_(sd_bus_creds_unrefp) sd_bus_creds *c = NULL;
_cleanup_free_ char *our_comm = NULL;
sd_id128_t id;
ASSERT_OK(sd_id128_randomize(&id));
- ASSERT_OK_ERRNO(fd = accept4(listen_fd, NULL, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK));
+ ASSERT_OK(fd = sd_fiber_accept(listen_fd, NULL, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK));
ASSERT_OK(sd_bus_new(&bus));
ASSERT_OK(sd_bus_set_fd(bus, fd, fd));
}
}
- return NULL;
+ return 0;
}
-static void* client(void *p) {
+static int client(void *userdata) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *reply = NULL;
const char *z;
ASSERT_OK(sd_bus_new(&bus));
ASSERT_OK(sd_bus_set_description(bus, "wuffwuff"));
- ASSERT_OK(sd_bus_set_address(bus, p));
+ ASSERT_OK(sd_bus_set_address(bus, userdata));
+ ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0));
ASSERT_OK(sd_bus_start(bus));
ASSERT_OK(sd_bus_call_method(bus, "foo.foo", "/foo", "foo.foo", "Foo", NULL, &reply, "s", "foo"));
ASSERT_OK(sd_bus_message_read(reply, "s", &z));
ASSERT_STREQ(z, "bar");
- return NULL;
+ return 0;
}
TEST(description) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL;
_cleanup_free_ char *a = NULL;
_cleanup_close_ int fd = -EBADF;
union sockaddr_union sa = {
.un.sun_family = AF_UNIX,
};
socklen_t salen;
- pthread_t s, c;
ASSERT_OK_ERRNO(fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0));
ASSERT_OK_ERRNO(bind(fd, &sa.sa, offsetof(struct sockaddr_un, sun_path))); /* force auto-bind */
ASSERT_OK(asprintf(&a, "unix:abstract=%s", sa.un.sun_path + 1));
- ASSERT_OK(-pthread_create(&s, NULL, server, INT_TO_PTR(fd)));
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
+
+ ASSERT_OK(sd_fiber_new(e, "server", server, INT_TO_PTR(fd), /* destroy= */ NULL, &f_server));
TAKE_FD(fd);
- ASSERT_OK(-pthread_create(&c, NULL, client, a));
+ ASSERT_OK(sd_fiber_new(e, "client", client, a, /* destroy= */ NULL, &f_client));
+
+ ASSERT_OK(sd_event_loop(e));
- ASSERT_OK(-pthread_join(s, NULL));
- ASSERT_OK(-pthread_join(c, NULL));
+ ASSERT_OK(sd_future_result(f_server));
+ ASSERT_OK(sd_future_result(f_client));
}
DEFINE_TEST_MAIN(LOG_INFO);
/* SPDX-License-Identifier: LGPL-2.1-or-later */
-#include <pthread.h>
#include <sys/socket.h>
#include "sd-bus.h"
+#include "sd-event.h"
+#include "sd-future.h"
+#include "errno-util.h"
#include "log.h"
#include "memory-util.h"
#include "string-util.h"
bool server_anonymous_auth;
};
-static int _server(struct context *c) {
+static int server(void *userdata) {
+ struct context *c = ASSERT_PTR(userdata);
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
sd_id128_t id;
bool quit = false;
ASSERT_OK(sd_id128_randomize(&id));
ASSERT_OK(sd_bus_new(&bus));
+ ASSERT_OK(sd_bus_set_description(bus, "server"));
ASSERT_OK(sd_bus_set_fd(bus, c->fds[0], c->fds[0]));
ASSERT_OK(sd_bus_set_server(bus, 1, id));
ASSERT_OK(sd_bus_set_anonymous(bus, c->server_anonymous_auth));
return 0;
}
-static void* server(void *p) {
- return INT_TO_PTR(_server(p));
-}
-
-static int client(struct context *c) {
+static int client(void *userdata) {
+ struct context *c = ASSERT_PTR(userdata);
_cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL, *reply = NULL;
- _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL;
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
ASSERT_OK(sd_bus_new(&bus));
+ ASSERT_OK(sd_bus_set_description(bus, "client"));
ASSERT_OK(sd_bus_set_fd(bus, c->fds[1], c->fds[1]));
+ ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0));
ASSERT_OK(sd_bus_negotiate_fds(bus, c->client_negotiate_unix_fds));
ASSERT_OK(sd_bus_set_anonymous(bus, c->client_anonymous_auth));
ASSERT_OK(sd_bus_start(bus));
static int test_one(bool client_negotiate_unix_fds, bool server_negotiate_unix_fds,
bool client_anonymous_auth, bool server_anonymous_auth) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client = NULL;
struct context c;
- pthread_t s;
- void *p;
- int r, q;
+ int r = 0;
zero(c);
c.client_anonymous_auth = client_anonymous_auth;
c.server_anonymous_auth = server_anonymous_auth;
- r = pthread_create(&s, NULL, server, &c);
- if (r != 0)
- return -r;
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
- r = client(&c);
+ ASSERT_OK(sd_fiber_new(e, "server", server, &c, /* destroy= */ NULL, &f_server));
+ ASSERT_OK(sd_fiber_new(e, "client", client, &c, /* destroy= */ NULL, &f_client));
- q = pthread_join(s, &p);
- if (q != 0)
- return -q;
+ ASSERT_OK(sd_event_loop(e));
- if (r < 0)
- return r;
+ RET_GATHER(r, sd_future_result(f_client));
+ RET_GATHER(r, sd_future_result(f_server));
- if (PTR_TO_INT(p) < 0)
- return PTR_TO_INT(p);
-
- return 0;
+ return r;
}
int main(int argc, char *argv[]) {
ASSERT_OK(test_one(false, false, false, false));
ASSERT_OK(test_one(true, true, true, true));
ASSERT_OK(test_one(true, true, false, true));
- ASSERT_ERROR(test_one(true, true, true, false), EPERM);
+ ASSERT_ERROR(test_one(true, true, true, false), EACCES);
return EXIT_SUCCESS;
}
/* SPDX-License-Identifier: LGPL-2.1-or-later */
-#include <pthread.h>
-#include <unistd.h>
-
#include "sd-bus.h"
#include "sd-event.h"
+#include "sd-future.h"
#include "sd-id128.h"
#include "alloc-util.h"
SD_BUS_VTABLE_END,
};
-static void* thread_server(void *p) {
+static int server(void *userdata) {
_cleanup_free_ char *suffixed = NULL, *suffixed_basename = NULL, *suffixed2 = NULL, *d = NULL;
_cleanup_close_ int fd = -EBADF;
union sockaddr_union u;
- const char *path = p;
+ const char *path = ASSERT_PTR(userdata);
int r;
log_debug("Initializing server");
/* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
ASSERT_OK(mkdir_parents(path, 0755));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
ASSERT_OK(path_extract_directory(path, &d));
ASSERT_OK(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()));
ASSERT_OK_ERRNO(rename(d, suffixed));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
ASSERT_OK(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()));
ASSERT_OK_ERRNO(symlink(suffixed2, d));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
ASSERT_OK(path_extract_filename(suffixed, &suffixed_basename));
ASSERT_OK_ERRNO(symlink(suffixed_basename, suffixed2));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
socklen_t sa_len;
r = sockaddr_un_set_path(&u.un, path);
ASSERT_OK_ERRNO(fd);
ASSERT_OK_ERRNO(bind(fd, &u.sa, sa_len));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
ASSERT_OK_ERRNO(listen(fd, SOMAXCONN_DELUXE));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
ASSERT_OK(touch(path));
- usleep_safe(100 * USEC_PER_MSEC);
+ ASSERT_OK(sd_fiber_sleep(100 * USEC_PER_MSEC));
log_debug("Initialized server");
ASSERT_OK(sd_event_new(&event));
- bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
- ASSERT_OK_ERRNO(bus_fd);
+ ASSERT_OK(bus_fd = sd_fiber_accept(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC));
log_debug("Accepted server connection");
log_debug("Server done");
- return NULL;
+ return 0;
}
-static void* thread_client1(void *p) {
+static int client1(void *userdata) {
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
- const char *path = p, *t;
+ const char *path = ASSERT_PTR(userdata), *t;
log_debug("Initializing client1");
log_debug("Client1 done");
- return NULL;
-}
-
-static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
- ASSERT_OK_ZERO(sd_bus_message_is_method_error(m, NULL));
- ASSERT_OK(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0));
return 0;
}
-static void* thread_client2(void *p) {
+static int client2(void *userdata) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
- _cleanup_(sd_event_unrefp) sd_event *event = NULL;
- const char *path = p, *t;
+ const char *path = ASSERT_PTR(userdata), *t;
log_debug("Initializing client2");
- ASSERT_OK(sd_event_new(&event));
ASSERT_OK(sd_bus_new(&bus));
ASSERT_OK(sd_bus_set_description(bus, "client2"));
t = strjoina("unix:path=", path);
ASSERT_OK(sd_bus_set_address(bus, t));
ASSERT_OK(sd_bus_set_watch_bind(bus, true));
- ASSERT_OK(sd_bus_attach_event(bus, event, 0));
+ ASSERT_OK(sd_bus_attach_event(bus, sd_fiber_get_event(), 0));
ASSERT_OK(sd_bus_start(bus));
- ASSERT_OK(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL));
-
- ASSERT_OK(sd_event_loop(event));
+ _cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL;
+ ASSERT_OK(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", NULL, &m, NULL));
+ ASSERT_OK_ZERO(sd_bus_message_is_method_error(m, NULL));
log_debug("Client2 done");
- return NULL;
+ return 0;
}
-static void request_exit(const char *path) {
+typedef struct RequestExitArgs {
+ const char *path;
+ sd_future *client1;
+ sd_future *client2;
+} RequestExitArgs;
+
+static int request_exit(void *userdata) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+ RequestExitArgs *args = ASSERT_PTR(userdata);
const char *t;
+ /* Wait for all client fibers to complete before requesting exit */
+ ASSERT_OK(sd_fiber_await(args->client1));
+ ASSERT_OK(sd_fiber_await(args->client2));
+
ASSERT_OK(sd_bus_new(&bus));
- t = strjoina("unix:path=", path);
+ t = strjoina("unix:path=", args->path);
ASSERT_OK(sd_bus_set_address(bus, t));
ASSERT_OK(sd_bus_set_watch_bind(bus, true));
ASSERT_OK(sd_bus_set_description(bus, "request-exit"));
ASSERT_OK(sd_bus_start(bus));
ASSERT_OK(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL));
+
+ return 0;
}
int main(int argc, char *argv[]) {
_cleanup_(rm_rf_physical_and_freep) char *d = NULL;
- pthread_t server, client1, client2;
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f_server = NULL, *f_client1 = NULL, *f_client2 = NULL, *f_exit = NULL;
char *path;
test_setup_logging(LOG_DEBUG);
path = strjoina(d, "/this/is/a/socket");
- ASSERT_OK(-pthread_create(&server, NULL, thread_server, path));
- ASSERT_OK(-pthread_create(&client1, NULL, thread_client1, path));
- ASSERT_OK(-pthread_create(&client2, NULL, thread_client2, path));
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
- ASSERT_OK(-pthread_join(client1, NULL));
- ASSERT_OK(-pthread_join(client2, NULL));
+ ASSERT_OK(sd_fiber_new(e, "server", server, path, /* destroy= */ NULL, &f_server));
- request_exit(path);
+ ASSERT_OK(sd_fiber_new(e, "client-1", client1, path, /* destroy= */ NULL, &f_client1));
+ ASSERT_OK(sd_fiber_new(e, "client-2", client2, path, /* destroy= */ NULL, &f_client2));
- ASSERT_OK(-pthread_join(server, NULL));
+ RequestExitArgs args = {
+ .path = path,
+ .client1 = f_client1,
+ .client2 = f_client2,
+ };
+ ASSERT_OK(sd_fiber_new(e, "request-exit", request_exit, &args, /* destroy= */ NULL, &f_exit));
- return 0;
+ ASSERT_OK(sd_event_loop(e));
+
+ ASSERT_OK(sd_future_result(f_client1));
+ ASSERT_OK(sd_future_result(f_client2));
+ ASSERT_OK(sd_future_result(f_exit));
+ ASSERT_OK(sd_future_result(f_server));
+
+ return EXIT_SUCCESS;
}
SD_BUS_VTABLE_PROPERTY_EXPLICIT = 1ULL << 7,
SD_BUS_VTABLE_SENSITIVE = 1ULL << 8, /* covers both directions: method call + reply */
SD_BUS_VTABLE_ABSOLUTE_OFFSET = 1ULL << 9,
+ /* Bit 10 is reserved for the private SD_BUS_VTABLE_METHOD_FIBER flag (see bus-internal.h). */
_SD_BUS_VTABLE_CAPABILITY_MASK = 0xFFFFULL << 40
};