]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-varlink: make sd-varlink fiber-aware
authorDaan De Meyer <daan@amutable.com>
Tue, 14 Apr 2026 08:54:49 +0000 (08:54 +0000)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
Add varlink_server_bind_fiber() and varlink_server_bind_fiber_many()
in varlink-util.{c,h} for registering a method handler that should
run on a dedicated fiber per dispatch. The fiber-bound methods live
in a separate s->fiber_methods map alongside the regular s->methods;
bind_internal()/bind_many_internal() are factored out so the regular
and fiber bind variants share their parsing/insertion code.
Registering the same method in both maps is rejected because the
dispatcher consults the regular map first and would otherwise
silently shadow the fiber binding.

varlink_dispatch_fiber() builds a VarlinkFiberData (refs to the
connection, parameters, and method name), spawns a fiber via
sd_fiber_new(), and makes the future floating so the fiber
self-manages its lifetime — neither the dispatcher nor the
connection has to track it. The fiber's priority is set to one
below the connection's quit event source so that on graceful
shutdown the fiber's exit handler fires (and runs its cleanup)
before varlink's quit_callback() closes the connection underneath
it; this is what lets a fiber-bound handler reply or flush its
sentinel on a still-open connection during shutdown.

The connection state transitions are reordered so they happen before
the fiber spawn rather than after the synchronous callback returns:
the fiber runs after dispatch has already moved past PROCESSING, which
matches the behaviour expected for a deferred reply (the fiber may
either reply immediately, or stash the connection and reply later, in
which case the post-callback logic treats it as a PENDING_METHOD).

Note that all the synchronous varlink APIs (sd_varlink_call() and friends)
already behave properly when on a fiber because they call json_stream_wait()
which calls ppoll_usec() which we already fixed to suspend when called from
a fiber.

The client/server varlink tests are migrated to fibers (threads → mock
server fibers on the same event loop) to exercise the new paths.

src/libsystemd/sd-varlink/sd-varlink.c
src/libsystemd/sd-varlink/test-varlink.c
src/libsystemd/sd-varlink/varlink-internal.h
src/libsystemd/sd-varlink/varlink-util.h

index 8e43e38800bde70b2cff77fa2c086fef7b295e44..af9dd30973b4d71adb27b75855d41a22ba6fcd42 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "sd-daemon.h"
 #include "sd-event.h"
+#include "sd-future.h"
 #include "sd-varlink.h"
 
 #include "alloc-util.h"
@@ -37,6 +38,7 @@
 #include "varlink-internal.h"
 #include "varlink-io.systemd.h"
 #include "varlink-org.varlink.service.h"
+#include "varlink-util.h"
 
 #define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
 #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 128U
@@ -956,6 +958,178 @@ static int generic_method_get_interface_description(
                         SD_JSON_BUILD_PAIR_STRING("description", text));
 }
 
+static int varlink_dispatch_sentinel(sd_varlink *v) {
+        int r;
+
+        assert(v);
+        assert(v->sentinel);
+
+        if (v->previous) {
+                r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
+                if (r >= 0) {
+                        v->previous = sd_json_variant_unref(v->previous);
+                        v->previous_fds = mfree(v->previous_fds);
+                        v->n_previous_fds = 0;
+                        /* Mirror sd_varlink_reply()'s post-enqueue state machine: PENDING_* means we're
+                         * outside the dispatch stack frame (e.g. called from varlink_fiber_entry after
+                         * the fiber returned), so we go straight to IDLE_SERVER ourselves. PROCESSING_*
+                         * means we're inside varlink_dispatch_method(), which will transition us. */
+                        if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
+                                varlink_clear_current(v);
+                                varlink_set_state(v, VARLINK_IDLE_SERVER);
+                        } else
+                                varlink_set_state(v, VARLINK_PROCESSED_METHOD);
+                }
+
+                return r;
+        }
+
+        char *sentinel = TAKE_PTR(v->sentinel);
+
+        /* Propagate the sentinel to the client if one was configured and no replies were enqueued by
+         * the callback. */
+        if (sentinel == POINTER_MAX)
+                r = sd_varlink_reply(v, NULL);
+        else {
+                r = sd_varlink_error(v, sentinel, NULL);
+                /* sd_varlink_error() deliberately returns a negative
+                 * errno mapped from the error id on success (so method
+                 * callbacks can `return sd_varlink_error(...);` to
+                 * enqueue a reply and propagate a matching errno in one
+                 * go). For sentinel dispatch we don't care about that
+                 * mapping — the reply is either enqueued or not, which
+                 * we detect via the state transition instead. */
+                if (IN_SET(v->state, VARLINK_PROCESSED_METHOD, VARLINK_IDLE_SERVER))
+                        r = 0;
+        }
+
+        if (sentinel != POINTER_MAX)
+                free(sentinel);
+
+        return r;
+}
+
+typedef struct VarlinkFiberData {
+        sd_varlink *link;
+        sd_json_variant *parameters;
+        sd_varlink_method_flags_t flags;
+        void *userdata;
+        sd_varlink_method_t callback;
+} VarlinkFiberData;
+
+static VarlinkFiberData* varlink_fiber_data_free(VarlinkFiberData *d) {
+        if (!d)
+                return NULL;
+
+        sd_json_variant_unref(d->parameters);
+        sd_varlink_unref(d->link);
+        return mfree(d);
+}
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkFiberData*, varlink_fiber_data_free);
+
+static void varlink_fiber_data_destroy(void *userdata) {
+        varlink_fiber_data_free(userdata);
+}
+
+static int varlink_fiber_entry(void *userdata) {
+        VarlinkFiberData *d = ASSERT_PTR(userdata);
+        sd_varlink *v = d->link;
+        int r;
+
+        r = d->callback(v, d->parameters, d->flags, d->userdata);
+
+        /* The fiber runs after varlink_dispatch_method() has already transitioned the state from
+         * VARLINK_PROCESSING_METHOD{,_MORE} to VARLINK_PENDING_METHOD{,_MORE}, so that's what we match
+         * here to decide whether the call still needs a reply. Any other state (e.g. IDLE_SERVER after
+         * the callback replied, or DISCONNECTED after sd_varlink_close()) means no fixup is needed. */
+        if (!IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
+                return r;
+
+        if (r < 0) {
+                varlink_log_errno(v, r, "Fiber returned error: %m");
+
+                /* Propagate error to the client if the method call remains unanswered. */
+                r = sd_varlink_error_errno(v, r);
+        } else if (v->sentinel) {
+                r = varlink_dispatch_sentinel(v);
+                if (r < 0)
+                        varlink_log_errno(v, r, "Failed to process sentinel: %m");
+        } else if (v->n_ref <= 2) {
+                /* Bare minimum refs (server + fiber data) means the connection wasn't stashed
+                 * to reply later, so the fiber was supposed to reply itself but didn't. */
+                r = varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
+                                      "Fiber returned without enqueuing a reply or stashing connection, failing.");
+                goto fail;
+        } else
+                r = 0;
+
+        /* If we didn't manage to enqueue a response, then fail the connection completely. */
+        if (r < 0 && IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
+                goto fail;
+
+        return r;
+
+fail:
+        varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
+        varlink_dispatch_local_error(v, SD_VARLINK_ERROR_PROTOCOL);
+        sd_varlink_close(v);
+
+        return r;
+}
+
+static int varlink_dispatch_fiber(sd_varlink *v, const char *method, sd_varlink_method_t callback, sd_json_variant *parameters, sd_varlink_method_flags_t flags) {
+        int r;
+
+        assert(v);
+        assert(v->server);
+        assert(method);
+        assert(callback);
+
+        if (!v->server->event)
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(EDEADLK),
+                                         "Cannot dispatch fiber method without event loop.");
+
+        _cleanup_(varlink_fiber_data_freep) VarlinkFiberData *d = new(VarlinkFiberData, 1);
+        if (!d)
+                return log_oom_debug();
+
+        *d = (VarlinkFiberData) {
+                .link = sd_varlink_ref(v),
+                .parameters = sd_json_variant_ref(parameters),
+                .flags = flags,
+                .userdata = v->userdata,
+                .callback = callback,
+        };
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        r = sd_fiber_new(v->server->event, method, varlink_fiber_entry, d, varlink_fiber_data_destroy, &f);
+        if (r < 0)
+                return r;
+
+        TAKE_PTR(d); /* The fiber owns the data now. */
+
+        /* Run the fiber at a higher priority than the connection's quit event source, so that on event
+         * loop exit the fiber's exit source (which cancels it and drives its cleanup) fires before
+         * varlink's quit_callback closes the connection. This lets a fiber handler reply with an error
+         * or flush its sentinel on a still-open connection during graceful shutdown. */
+        int64_t priority;
+        r = sd_event_source_get_priority(v->quit_event_source, &priority);
+        if (r < 0)
+                return r;
+
+        r = sd_future_set_priority(f, priority > INT64_MIN ? priority - 1 : priority);
+        if (r < 0)
+                return r;
+
+        /* Hand the future's lifetime over to the event loop: it'll auto-unref on resolve. */
+        r = sd_fiber_set_floating(f, true);
+        if (r < 0)
+                return r;
+
+        return 0;
+}
+
 static int varlink_dispatch_method(sd_varlink *v) {
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *parameters = NULL;
         sd_varlink_method_flags_t flags = 0;
@@ -1053,7 +1227,13 @@ static int varlink_dispatch_method(sd_varlink *v) {
                         v->protocol_upgrade || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE));
 
         /* First consult user supplied method implementations */
+        bool is_fiber = false;
         callback = hashmap_get(v->server->methods, method);
+        if (!callback) {
+                callback = hashmap_get(v->server->fiber_methods, method);
+                if (callback)
+                        is_fiber = true;
+        }
         if (!callback) {
                 if (streq(method, "org.varlink.service.GetInfo"))
                         callback = generic_method_get_info;
@@ -1105,7 +1285,13 @@ static int varlink_dispatch_method(sd_varlink *v) {
                 }
 
                 if (!invalid) {
-                        r = callback(v, parameters, flags, v->userdata);
+                        if (is_fiber)
+                                /* Spawn a fiber to run the callback. The VarlinkFiberData takes a ref on the
+                                 * connection (bumping n_ref above 2), so the post-callback logic below treats
+                                 * this as a deferred reply and moves state to PENDING_METHOD. */
+                                r = varlink_dispatch_fiber(v, method, callback, parameters, flags);
+                        else
+                                r = callback(v, parameters, flags, v->userdata);
                         if (VARLINK_STATE_WANTS_REPLY(v->state)) {
                                 if (r < 0) {
                                         varlink_log_errno(v, r, "Callback for '%s' returned error: %m", method);
@@ -1114,37 +1300,7 @@ static int varlink_dispatch_method(sd_varlink *v) {
                                          * if the method call remains unanswered. */
                                         r = sd_varlink_error_errno(v, r);
                                 } else if (v->sentinel) {
-                                        if (v->previous) {
-                                                r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
-                                                if (r >= 0) {
-                                                        v->previous = sd_json_variant_unref(v->previous);
-                                                        v->previous_fds = mfree(v->previous_fds);
-                                                        v->n_previous_fds = 0;
-                                                        varlink_set_state(v, VARLINK_PROCESSED_METHOD);
-                                                }
-                                        } else {
-                                                char *sentinel = TAKE_PTR(v->sentinel);
-
-                                                /* Propagate the sentinel to the client if one was configured
-                                                 * and no replies were enqueued by the callback. */
-                                                if (sentinel == POINTER_MAX)
-                                                        r = sd_varlink_reply(v, NULL);
-                                                else {
-                                                        r = sd_varlink_error(v, sentinel, NULL);
-                                                        /* sd_varlink_error() deliberately returns a negative
-                                                         * errno mapped from the error id on success (so method
-                                                         * callbacks can `return sd_varlink_error(...);` to
-                                                         * enqueue a reply and propagate a matching errno in one
-                                                         * go). For sentinel dispatch we don't care about that
-                                                         * mapping — the reply is either enqueued or not, which
-                                                         * we detect via the state transition instead. */
-                                                        if (v->state == VARLINK_PROCESSED_METHOD)
-                                                                r = 0;
-                                                }
-
-                                                if (sentinel != POINTER_MAX)
-                                                        free(sentinel);
-                                        }
+                                        r = varlink_dispatch_sentinel(v);
                                         if (r < 0)
                                                 varlink_log_errno(v, r, "Failed to process sentinel for method '%s': %m", method);
                                 } else {
@@ -2596,8 +2752,12 @@ _public_ int sd_varlink_set_sentinel(sd_varlink *v, const char *error_id) {
         if (v->state == VARLINK_PROCESSING_METHOD_ONEWAY)
                 return 0;
 
-        /* This has to be called during a callback, and not after it has exited. */
-        assert_return(IN_SET(v->state, VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE),
+        /* This has to be called during a callback, and not after it has exited. The PENDING states
+         * apply to fiber callbacks, which run after varlink_dispatch_method() has already transitioned
+         * the state from PROCESSING to PENDING. */
+        assert_return(IN_SET(v->state,
+                             VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
+                             VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE),
                       -EUCLEAN);
 
         char *s = NULL;
@@ -2899,7 +3059,11 @@ static sd_varlink_server* varlink_server_destroy(sd_varlink_server *s) {
         while ((m = hashmap_steal_first_key(s->methods)))
                 free(m);
 
+        while ((m = hashmap_steal_first_key(s->fiber_methods)))
+                free(m);
+
         hashmap_free(s->methods);
+        hashmap_free(s->fiber_methods);
         hashmap_free(s->interfaces);
         hashmap_free(s->symbols);
         hashmap_free(s->by_uid);
@@ -3590,23 +3754,32 @@ static bool varlink_symbol_in_interface(const char *method, const char *interfac
         return !strchr(p+1, '.');
 }
 
-_public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
+static int varlink_server_bind_internal(sd_varlink_server *s, Hashmap **methods, const char *method, sd_varlink_method_t callback) {
         _cleanup_free_ char *m = NULL;
         int r;
 
-        assert_return(s, -EINVAL);
-        assert_return(method, -EINVAL);
-        assert_return(callback, -EINVAL);
+        assert(s);
+        assert(methods);
+        assert(method);
+        assert(callback);
 
         if (varlink_symbol_in_interface(method, "org.varlink.service") ||
             varlink_symbol_in_interface(method, "io.systemd"))
                 return varlink_server_log_errno(s, SYNTHETIC_ERRNO(EEXIST), "Cannot bind server to '%s'.", method);
 
+        /* Refuse to register the same method in both the regular and fiber method maps: the dispatcher
+         * always consults methods first and would silently ignore a shadowed fiber_methods entry (or vice
+         * versa), hiding the misconfiguration. */
+        Hashmap *other = methods == &s->methods ? s->fiber_methods : s->methods;
+        if (hashmap_contains(other, method))
+                return varlink_server_log_errno(s, SYNTHETIC_ERRNO(EEXIST),
+                                                "Method '%s' is already bound in the other method map.", method);
+
         m = strdup(method);
         if (!m)
                 return log_oom_debug();
 
-        r = hashmap_ensure_put(&s->methods, &string_hash_ops, m, callback);
+        r = hashmap_ensure_put(methods, &string_hash_ops, m, callback);
         if (r == -ENOMEM)
                 return log_oom_debug();
         if (r < 0)
@@ -3617,13 +3790,12 @@ _public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *met
         return 0;
 }
 
-_public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, ...) {
-        va_list ap;
+static int varlink_server_bind_many_internal(sd_varlink_server *s, Hashmap **methods, va_list ap) {
         int r = 0;
 
-        assert_return(s, -EINVAL);
+        assert(s);
+        assert(methods);
 
-        va_start(ap, s);
         for (;;) {
                 sd_varlink_method_t callback;
                 const char *method;
@@ -3634,10 +3806,51 @@ _public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, .
 
                 callback = va_arg(ap, sd_varlink_method_t);
 
-                r = sd_varlink_server_bind_method(s, method, callback);
+                r = varlink_server_bind_internal(s, methods, method, callback);
                 if (r < 0)
                         break;
         }
+
+        return r;
+}
+
+_public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
+        assert_return(s, -EINVAL);
+        assert_return(method, -EINVAL);
+        assert_return(callback, -EINVAL);
+
+        return varlink_server_bind_internal(s, &s->methods, method, callback);
+}
+
+_public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, ...) {
+        va_list ap;
+        int r;
+
+        assert_return(s, -EINVAL);
+
+        va_start(ap, s);
+        r = varlink_server_bind_many_internal(s, &s->methods, ap);
+        va_end(ap);
+
+        return r;
+}
+
+int varlink_server_bind_fiber(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
+        assert_return(s, -EINVAL);
+        assert_return(method, -EINVAL);
+        assert_return(callback, -EINVAL);
+
+        return varlink_server_bind_internal(s, &s->fiber_methods, method, callback);
+}
+
+int varlink_server_bind_fiber_many_internal(sd_varlink_server *s, ...) {
+        va_list ap;
+        int r;
+
+        assert_return(s, -EINVAL);
+
+        va_start(ap, s);
+        r = varlink_server_bind_many_internal(s, &s->fiber_methods, ap);
         va_end(ap);
 
         return r;
index cca859c2f6867d80173c3fa007858c54d8f2ba6b..31b7cb46f92058a486d51e79196c6be8db103326 100644 (file)
@@ -2,13 +2,13 @@
 
 #include <fcntl.h>
 #include <poll.h>
-#include <pthread.h>
 #include <sys/resource.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
 #include <unistd.h>
 
 #include "sd-event.h"
+#include "sd-future.h"
 #include "sd-json.h"
 #include "sd-varlink.h"
 
@@ -216,7 +216,12 @@ static void flood_test(const char *address) {
         /* Block the main event loop while we flood */
         ASSERT_OK_EQ_ERRNO(write(block_write_fd, &x, sizeof(x)), (ssize_t) sizeof(x));
 
-        ASSERT_OK(sd_event_default(&e));
+        /* Create a fresh event loop for the flood test — we can't reuse the default event because the
+         * main test (and the fiber we're running in) is already running it, and sd_event_loop() asserts
+         * the event is in the INITIAL state. Exit-on-idle so the nested loop terminates once the
+         * overload reply has been received and all other work is quiesced. */
+        ASSERT_OK(sd_event_new(&e));
+        ASSERT_OK(sd_event_set_exit_on_idle(e, true));
 
         /* Flood the server with connections */
         ASSERT_NOT_NULL(connections = new0(sd_varlink*, OVERLOAD_CONNECTIONS));
@@ -251,7 +256,7 @@ static void flood_test(const char *address) {
                 connections[k] = sd_varlink_unref(connections[k]);
 }
 
-static void *thread(void *arg) {
+static int client_fiber(void *arg) {
         _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *i = NULL;
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *wrong = NULL;
@@ -263,7 +268,7 @@ static void *thread(void *arg) {
                                                    SD_JSON_BUILD_PAIR_INTEGER("b", 99))));
 
         ASSERT_OK(sd_varlink_connect_address(&c, arg));
-        ASSERT_OK(sd_varlink_set_description(c, "thread-client"));
+        ASSERT_OK(sd_varlink_set_description(c, "fiber-client"));
         ASSERT_OK(sd_varlink_set_allow_fd_passing_input(c, true));
         ASSERT_OK(sd_varlink_set_allow_fd_passing_output(c, true));
 
@@ -321,7 +326,7 @@ static void *thread(void *arg) {
 
         ASSERT_OK(sd_varlink_send(c, "io.test.Done", NULL));
 
-        return NULL;
+        return 0;
 }
 
 static int block_fd_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
@@ -348,8 +353,8 @@ TEST(chat) {
         _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = NULL;
         _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
         _cleanup_close_pair_ int block_fds[2] = EBADF_PAIR;
-        pthread_t t;
         const char *sp;
 
         ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
@@ -388,11 +393,11 @@ TEST(chat) {
 
         ASSERT_OK(sd_varlink_attach_event(c, e, 0));
 
-        ASSERT_OK(-pthread_create(&t, NULL, thread, (void*) sp));
+        ASSERT_OK(sd_fiber_new(e, "client", client_fiber, (void*) sp, /* destroy= */ NULL, &f));
 
         ASSERT_OK(sd_event_loop(e));
 
-        ASSERT_OK(-pthread_join(t, NULL));
+        ASSERT_OK(sd_future_result(f));
 }
 
 static int method_invalid(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
@@ -618,6 +623,173 @@ TEST(sentinel_oneway) {
         ASSERT_OK(sd_event_loop(e));
 }
 
+static int method_fiber_sentinel_error(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Set an error sentinel from a fiber callback and return without sending a reply. The sentinel
+         * error should still be propagated by the fiber's post-callback logic, even though the varlink
+         * state has already been transitioned to VARLINK_PENDING_METHOD by the time the fiber runs. */
+        ASSERT_OK(sd_varlink_set_sentinel(link, "io.test.SentinelError"));
+        return 0;
+}
+
+TEST(fiber_sentinel_error) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberSentinelError", method_fiber_sentinel_error));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_error));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberSentinelError", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_fiber_errno(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Return a negative errno without sending a reply. The fiber's post-callback logic should
+         * convert this into a SD_VARLINK_ERROR_SYSTEM reply. */
+        return -ENOSYS;
+}
+
+static int reply_fiber_errno(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        ASSERT_STREQ(error_id, SD_VARLINK_ERROR_SYSTEM);
+        ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "errno")), ENOSYS);
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(fiber_errno) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberErrno", method_fiber_errno));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_fiber_errno));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberErrno", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_fiber_no_reply(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Return success without replying and without stashing a ref. The fiber's post-callback
+         * logic should detect this and fail the connection. */
+        return 0;
+}
+
+static int reply_fiber_no_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        ASSERT_STREQ(error_id, SD_VARLINK_ERROR_DISCONNECTED);
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(fiber_no_reply) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberNoReply", method_fiber_no_reply));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_fiber_no_reply));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberNoReply", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int fiber_stashed_deferred_reply(sd_event_source *s, void *userdata) {
+        _cleanup_(sd_varlink_unrefp) sd_varlink *link = ASSERT_PTR(userdata);
+
+        sd_event_source_disable_unref(s);
+        ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "stashed")));
+        return 0;
+}
+
+static int method_fiber_stash(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Stash a ref on the connection so n_ref > 2 when the fiber returns, and reply later from a
+         * deferred event source. The fiber's post-callback logic should see the extra ref and treat
+         * this as a valid deferred-reply case instead of failing the connection. */
+        sd_event_source *source;
+
+        ASSERT_OK(sd_event_add_defer(sd_varlink_get_event(link), &source, fiber_stashed_deferred_reply, sd_varlink_ref(link)));
+        ASSERT_OK(sd_event_source_set_enabled(source, SD_EVENT_ONESHOT));
+        return 0;
+}
+
+static int reply_fiber_stash(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        ASSERT_NULL(error_id);
+        ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "stashed");
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(fiber_stash) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberStash", method_fiber_stash));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_fiber_stash));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberStash", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
 static int method_with_fd_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
         _cleanup_close_ int fd1 = -EBADF, fd2 = -EBADF;
 
@@ -768,8 +940,9 @@ static int method_upgrade(sd_varlink *link, sd_json_variant *parameters, sd_varl
         if (r < 0)
                 return r;
 
-        /* After upgrade, do raw I/O: read until EOF, reverse, write back.
-         * The client shuts down its write side after sending, so we get a clean EOF. */
+        /* After upgrade, do raw I/O: read until the client shuts down its write side (giving us a clean
+         * EOF), reverse what we got, and write it back. Use suspending I/O so other fibers (the client)
+         * can make progress while we're waiting on the socket. */
         char buf[64] = {};
         ssize_t n = ASSERT_OK(loop_read(input_fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
         ASSERT_GT(n, 0);
@@ -789,12 +962,10 @@ static int method_upgrade_without_flag(sd_varlink *link, sd_json_variant *parame
         /* Calling reply_and_upgrade without the client requesting it should fail with -EPROTO */
         ASSERT_ERROR(sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd), EPROTO);
 
-        sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS);
-
         return sd_varlink_reply(link, /* parameters= */ NULL);
 }
 
-static void *upgrade_thread(void *arg) {
+static int upgrade_client_fiber(void *arg) {
         _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
         _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
         sd_json_variant *o = NULL;
@@ -827,14 +998,15 @@ static void *upgrade_thread(void *arg) {
         ASSERT_OK(sd_varlink_call(c2, "io.test.UpgradeWithoutFlag", /* parameters= */ NULL, &o, &error_id));
         ASSERT_NULL(error_id);
 
-        return NULL;
+        ASSERT_OK(sd_event_exit(sd_fiber_get_event(), EXIT_SUCCESS));
+        return 0;
 }
 
 TEST(upgrade) {
         _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
         _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
         _cleanup_(sd_event_unrefp) sd_event *e = NULL;
-        pthread_t t;
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
         const char *sp;
 
         ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
@@ -844,31 +1016,23 @@ TEST(upgrade) {
 
         ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
         ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
-        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
+        /* The method does raw I/O on the upgraded socket — bind it as a fiber method so it can
+         * suspend on loop_read()/loop_write() and the client fiber can make progress concurrently. */
+        ASSERT_OK(varlink_server_bind_fiber(s, "io.test.Upgrade", method_upgrade));
         ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
         ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
         ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
 
-        ASSERT_OK(-pthread_create(&t, NULL, upgrade_thread, (void*) sp));
+        ASSERT_OK(sd_fiber_new(e, "upgrade-client", upgrade_client_fiber, (void*) sp, /* destroy= */ NULL, &f));
 
-        /* Run the event loop until no more connections (the thread will disconnect when done) */
+        /* Run the event loop. Exits on idle once the client fiber completes and all server connections
+         * have been torn down. */
         ASSERT_OK(sd_event_loop(e));
 
-        ASSERT_OK(-pthread_join(t, NULL));
+        ASSERT_OK(sd_future_result(f));
 }
 
-static int method_upgrade_and_exit(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
-        sd_event *event = ASSERT_PTR(userdata);
-
-        int r = method_upgrade(link, parameters, flags, /* userdata= */ NULL);
-
-        /* Exit the event loop after the upgrade is handled. We can't use sd_varlink_get_event()
-         * here because the connection is already disconnected after reply_and_upgrade. */
-        (void) sd_event_exit(event, r < 0 ? r : EXIT_SUCCESS);
-        return r;
-}
-
-static void *upgrade_pipelining_thread(void *arg) {
+static int upgrade_pipelining_client_fiber(void *arg) {
         union sockaddr_union sa = {};
         _cleanup_close_ int fd = -EBADF;
 
@@ -895,8 +1059,8 @@ static void *upgrade_pipelining_thread(void *arg) {
         /* Shut down write side so server's method_upgrade sees EOF after raw payload */
         ASSERT_OK_ERRNO(shutdown(fd, SHUT_WR));
 
-        /* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes
-         * the connection after writing, so loop_read() reads until EOF and gets it all. */
+        /* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes the
+         * connection after writing, so loop_read() reads until EOF and gets it all. */
         char buf[256] = {};
         ssize_t n = ASSERT_OK(loop_read(fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
         ASSERT_GT(n, 0);
@@ -911,14 +1075,15 @@ static void *upgrade_pipelining_thread(void *arg) {
         ASSERT_EQ(raw_size, strlen(raw_payload));
         ASSERT_STREQ(strndupa_safe(raw, raw_size), "!denilepiP");
 
-        return NULL;
+        ASSERT_OK(sd_event_exit(sd_fiber_get_event(), EXIT_SUCCESS));
+        return 0;
 }
 
 TEST(upgrade_pipelining) {
         _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
         _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
         _cleanup_(sd_event_unrefp) sd_event *e = NULL;
-        pthread_t t;
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
         const char *sp;
 
         ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
@@ -926,25 +1091,23 @@ TEST(upgrade_pipelining) {
 
         ASSERT_OK(sd_event_new(&e));
 
-        ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE|SD_VARLINK_SERVER_INHERIT_USERDATA));
+        ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
         ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-pipelining-server"));
-        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade_and_exit));
+        /* method_upgrade does raw I/O on the upgraded socket, so bind as a fiber method. */
+        ASSERT_OK(varlink_server_bind_fiber(s, "io.test.Upgrade", method_upgrade));
         ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
         ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
-        sd_varlink_server_set_userdata(s, e);
 
-        ASSERT_OK(-pthread_create(&t, NULL, upgrade_pipelining_thread, (void*) sp));
+        ASSERT_OK(sd_fiber_new(e, "upgrade-pipelining-client", upgrade_pipelining_client_fiber, (void*) sp, /* destroy= */ NULL, &f));
 
         ASSERT_OK(sd_event_loop(e));
 
-        ASSERT_OK(-pthread_join(t, NULL));
+        ASSERT_OK(sd_future_result(f));
 }
 
 typedef struct ExecDirServer {
         sd_varlink_server *server;
-        sd_event *event;
         const char *name;
-        pthread_t thread;
 } ExecDirServer;
 
 static int method_execute_dir_ping(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
@@ -953,20 +1116,6 @@ static int method_execute_dir_ping(sd_varlink *link, sd_json_variant *parameters
         return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("name", srv->name));
 }
 
-static void on_execute_dir_disconnect(sd_varlink_server *s, sd_varlink *link, void *userdata) {
-        ExecDirServer *srv = ASSERT_PTR(userdata);
-
-        /* Only one client (from varlink_execute_directory()) connects per server — once it's gone, we're done. */
-        ASSERT_OK(sd_event_exit(srv->event, 0));
-}
-
-static void *execute_dir_server_thread(void *arg) {
-        ExecDirServer *srv = arg;
-
-        ASSERT_OK(sd_event_loop(srv->event));
-        return NULL;
-}
-
 static int execute_dir_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
         size_t *count = ASSERT_PTR(userdata);
 
@@ -977,51 +1126,28 @@ static int execute_dir_reply(sd_varlink *link, sd_json_variant *parameters, cons
         return 0;
 }
 
-TEST(execute_directory) {
-        _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
-        static const char * const names[] = { "alpha", "beta", "gamma" };
-        ExecDirServer servers[ELEMENTSOF(names)] = {};
-        size_t reply_count = 0;
-
-        ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir));
-
-        for (size_t i = 0; i < ELEMENTSOF(names); i++) {
-                ExecDirServer *eds = servers + i;
-                servers[i].name = names[i];
+typedef struct ExecDirClientArgs {
+        const char *tmpdir;
+        size_t n_servers;
+        size_t *reply_count;
+} ExecDirClientArgs;
 
-                _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i]));
-
-                ASSERT_OK(sd_event_new(&eds->event));
-                ASSERT_OK(varlink_server_new(&eds->server,
-                                             SD_VARLINK_SERVER_INHERIT_USERDATA,
-                                             eds));
-                ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping));
-                ASSERT_OK(sd_varlink_server_bind_disconnect(eds->server, on_execute_dir_disconnect));
-                ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600));
-                ASSERT_OK(sd_varlink_server_attach_event(eds->server, eds->event, 0));
-
-                ASSERT_OK(-pthread_create(&eds->thread, NULL, execute_dir_server_thread, eds));
-        }
+static int execute_dir_client_fiber(void *arg) {
+        ExecDirClientArgs *a = ASSERT_PTR(arg);
 
         ASSERT_OK_EQ(varlink_execute_directory(
-                                     tmpdir,
+                                     a->tmpdir,
                                      "io.test.ExecDirPing",
                                      /* parameters= */ NULL,
                                      /* more= */ false,
                                      /* timeout_usec= */ USEC_INFINITY,
                                      execute_dir_reply,
-                                     &reply_count), (ssize_t) ELEMENTSOF(names));
-        ASSERT_EQ(reply_count, ELEMENTSOF(names));
-
-        FOREACH_ELEMENT(eds, servers) {
-                ASSERT_OK(-pthread_join(eds->thread, NULL));
-                eds->server = sd_varlink_server_unref(eds->server);
-                eds->event = sd_event_unref(eds->event);
-        }
+                                     a->reply_count), (ssize_t) a->n_servers);
+        ASSERT_EQ(*a->reply_count, a->n_servers);
 
         /* Calling the helper against a non-existent directory must fail. */
         _cleanup_free_ char *nope = NULL;
-        ASSERT_OK(asprintf(&nope, "%s/does-not-exist", tmpdir));
+        ASSERT_OK(asprintf(&nope, "%s/does-not-exist", a->tmpdir));
         ASSERT_FAIL(varlink_execute_directory(
                                     nope,
                                     "io.test.ExecDirPing",
@@ -1029,13 +1155,13 @@ TEST(execute_directory) {
                                     /* more= */ false,
                                     /* timeout_usec= */ USEC_INFINITY,
                                     execute_dir_reply,
-                                    &reply_count));
+                                    a->reply_count));
 
         /* An empty directory must simply return 0 and not invoke the reply callback. */
-        _cleanup_free_ char *empty = ASSERT_PTR(path_join(tmpdir, "empty"));
+        _cleanup_free_ char *empty = ASSERT_PTR(path_join(a->tmpdir, "empty"));
         ASSERT_OK_ERRNO(mkdir(empty, 0755));
 
-        size_t count_before = reply_count;
+        size_t count_before = *a->reply_count;
         ASSERT_OK_ZERO(varlink_execute_directory(
                                        empty,
                                        "io.test.ExecDirPing",
@@ -1043,8 +1169,52 @@ TEST(execute_directory) {
                                        /* more= */ false,
                                        /* timeout_usec= */ USEC_INFINITY,
                                        execute_dir_reply,
-                                       &reply_count));
-        ASSERT_EQ(reply_count, count_before);
+                                       a->reply_count));
+        ASSERT_EQ(*a->reply_count, count_before);
+
+        ASSERT_OK(sd_event_exit(sd_fiber_get_event(), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(execute_directory) {
+        _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        static const char * const names[] = { "alpha", "beta", "gamma" };
+        ExecDirServer servers[ELEMENTSOF(names)] = {};
+        size_t reply_count = 0;
+
+        ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir));
+
+        ASSERT_OK(sd_event_new(&e));
+
+        for (size_t i = 0; i < ELEMENTSOF(names); i++) {
+                ExecDirServer *eds = servers + i;
+                servers[i].name = names[i];
+
+                _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i]));
+
+                ASSERT_OK(varlink_server_new(&eds->server,
+                                             SD_VARLINK_SERVER_INHERIT_USERDATA,
+                                             eds));
+                ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping));
+                ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600));
+                ASSERT_OK(sd_varlink_server_attach_event(eds->server, e, 0));
+        }
+
+        ExecDirClientArgs args = {
+                .tmpdir = tmpdir,
+                .n_servers = ELEMENTSOF(names),
+                .reply_count = &reply_count,
+        };
+        ASSERT_OK(sd_fiber_new(e, "execute-dir-client", execute_dir_client_fiber, &args, NULL, &f));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(sd_future_result(f));
+
+        FOREACH_ELEMENT(eds, servers)
+                eds->server = sd_varlink_server_unref(eds->server);
 }
 
 #define CTRUNC_N_FDS 64U
@@ -1075,7 +1245,7 @@ static int reply_ctrunc(sd_varlink *link, sd_json_variant *parameters, const cha
 
 TEST(ctrunc) {
         int r;
-        
+
         _cleanup_(sd_event_unrefp) sd_event *e = NULL;
         ASSERT_OK(sd_event_default(&e));
 
index 32d6d5983a75f9a49666e1800b48ebea145f332b..beec5be42c70964f7df8f18ca96eebce45a06403 100644 (file)
@@ -135,7 +135,8 @@ typedef struct sd_varlink_server {
 
         LIST_HEAD(VarlinkServerSocket, sockets);
 
-        Hashmap *methods;              /* Fully qualified symbol name of a method → VarlinkMethod */
+        Hashmap *methods;              /* Fully qualified symbol name of a method → sd_varlink_method_t */
+        Hashmap *fiber_methods;        /* Fully qualified symbol name of a fiber method → sd_varlink_method_t */
         Hashmap *interfaces;           /* Fully qualified interface name → VarlinkInterface* */
         Hashmap *symbols;              /* Fully qualified symbol name of method/error → VarlinkSymbol* */
         sd_varlink_connect_t connect_callback;
index d6ecb03c545332a2402d3fd72530e77d5c3dd93d..d5765ca2c72f11f3ecda5c3b1dbd0c165d6cc2ff 100644 (file)
@@ -19,6 +19,10 @@ int varlink_many_notifyb(Set *s, ...);
 int varlink_many_reply(Set *s, sd_json_variant *parameters);
 int varlink_many_error(Set *s, const char *error_id, sd_json_variant *parameters);
 
+int varlink_server_bind_fiber(sd_varlink_server *s, const char *method, sd_varlink_method_t callback);
+int varlink_server_bind_fiber_many_internal(sd_varlink_server *s, ...);
+#define varlink_server_bind_fiber_many(s, ...) varlink_server_bind_fiber_many_internal(s, __VA_ARGS__, NULL)
+
 int varlink_set_info_systemd(sd_varlink_server *server);
 
 int varlink_server_new(