#include "sd-daemon.h"
#include "sd-event.h"
+#include "sd-future.h"
#include "sd-varlink.h"
#include "alloc-util.h"
#include "varlink-internal.h"
#include "varlink-io.systemd.h"
#include "varlink-org.varlink.service.h"
+#include "varlink-util.h"
#define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
#define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 128U
SD_JSON_BUILD_PAIR_STRING("description", text));
}
+static int varlink_dispatch_sentinel(sd_varlink *v) {
+ int r;
+
+ assert(v);
+ assert(v->sentinel);
+
+ if (v->previous) {
+ r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
+ if (r >= 0) {
+ v->previous = sd_json_variant_unref(v->previous);
+ v->previous_fds = mfree(v->previous_fds);
+ v->n_previous_fds = 0;
+ /* Mirror sd_varlink_reply()'s post-enqueue state machine: PENDING_* means we're
+ * outside the dispatch stack frame (e.g. called from varlink_fiber_entry after
+ * the fiber returned), so we go straight to IDLE_SERVER ourselves. PROCESSING_*
+ * means we're inside varlink_dispatch_method(), which will transition us. */
+ if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
+ varlink_clear_current(v);
+ varlink_set_state(v, VARLINK_IDLE_SERVER);
+ } else
+ varlink_set_state(v, VARLINK_PROCESSED_METHOD);
+ }
+
+ return r;
+ }
+
+ char *sentinel = TAKE_PTR(v->sentinel);
+
+ /* Propagate the sentinel to the client if one was configured and no replies were enqueued by
+ * the callback. */
+ if (sentinel == POINTER_MAX)
+ r = sd_varlink_reply(v, NULL);
+ else {
+ r = sd_varlink_error(v, sentinel, NULL);
+ /* sd_varlink_error() deliberately returns a negative
+ * errno mapped from the error id on success (so method
+ * callbacks can `return sd_varlink_error(...);` to
+ * enqueue a reply and propagate a matching errno in one
+ * go). For sentinel dispatch we don't care about that
+ * mapping — the reply is either enqueued or not, which
+ * we detect via the state transition instead. */
+ if (IN_SET(v->state, VARLINK_PROCESSED_METHOD, VARLINK_IDLE_SERVER))
+ r = 0;
+ }
+
+ if (sentinel != POINTER_MAX)
+ free(sentinel);
+
+ return r;
+}
+
+typedef struct VarlinkFiberData {
+ sd_varlink *link;
+ sd_json_variant *parameters;
+ sd_varlink_method_flags_t flags;
+ void *userdata;
+ sd_varlink_method_t callback;
+} VarlinkFiberData;
+
+static VarlinkFiberData* varlink_fiber_data_free(VarlinkFiberData *d) {
+ if (!d)
+ return NULL;
+
+ sd_json_variant_unref(d->parameters);
+ sd_varlink_unref(d->link);
+ return mfree(d);
+}
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkFiberData*, varlink_fiber_data_free);
+
+static void varlink_fiber_data_destroy(void *userdata) {
+ varlink_fiber_data_free(userdata);
+}
+
+static int varlink_fiber_entry(void *userdata) {
+ VarlinkFiberData *d = ASSERT_PTR(userdata);
+ sd_varlink *v = d->link;
+ int r;
+
+ r = d->callback(v, d->parameters, d->flags, d->userdata);
+
+ /* The fiber runs after varlink_dispatch_method() has already transitioned the state from
+ * VARLINK_PROCESSING_METHOD{,_MORE} to VARLINK_PENDING_METHOD{,_MORE}, so that's what we match
+ * here to decide whether the call still needs a reply. Any other state (e.g. IDLE_SERVER after
+ * the callback replied, or DISCONNECTED after sd_varlink_close()) means no fixup is needed. */
+ if (!IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
+ return r;
+
+ if (r < 0) {
+ varlink_log_errno(v, r, "Fiber returned error: %m");
+
+ /* Propagate error to the client if the method call remains unanswered. */
+ r = sd_varlink_error_errno(v, r);
+ } else if (v->sentinel) {
+ r = varlink_dispatch_sentinel(v);
+ if (r < 0)
+ varlink_log_errno(v, r, "Failed to process sentinel: %m");
+ } else if (v->n_ref <= 2) {
+ /* Bare minimum refs (server + fiber data) means the connection wasn't stashed
+ * to reply later, so the fiber was supposed to reply itself but didn't. */
+ r = varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
+ "Fiber returned without enqueuing a reply or stashing connection, failing.");
+ goto fail;
+ } else
+ r = 0;
+
+ /* If we didn't manage to enqueue a response, then fail the connection completely. */
+ if (r < 0 && IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
+ goto fail;
+
+ return r;
+
+fail:
+ varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
+ varlink_dispatch_local_error(v, SD_VARLINK_ERROR_PROTOCOL);
+ sd_varlink_close(v);
+
+ return r;
+}
+
+static int varlink_dispatch_fiber(sd_varlink *v, const char *method, sd_varlink_method_t callback, sd_json_variant *parameters, sd_varlink_method_flags_t flags) {
+ int r;
+
+ assert(v);
+ assert(v->server);
+ assert(method);
+ assert(callback);
+
+ if (!v->server->event)
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(EDEADLK),
+ "Cannot dispatch fiber method without event loop.");
+
+ _cleanup_(varlink_fiber_data_freep) VarlinkFiberData *d = new(VarlinkFiberData, 1);
+ if (!d)
+ return log_oom_debug();
+
+ *d = (VarlinkFiberData) {
+ .link = sd_varlink_ref(v),
+ .parameters = sd_json_variant_ref(parameters),
+ .flags = flags,
+ .userdata = v->userdata,
+ .callback = callback,
+ };
+
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ r = sd_fiber_new(v->server->event, method, varlink_fiber_entry, d, varlink_fiber_data_destroy, &f);
+ if (r < 0)
+ return r;
+
+ TAKE_PTR(d); /* The fiber owns the data now. */
+
+ /* Run the fiber at a higher priority than the connection's quit event source, so that on event
+ * loop exit the fiber's exit source (which cancels it and drives its cleanup) fires before
+ * varlink's quit_callback closes the connection. This lets a fiber handler reply with an error
+ * or flush its sentinel on a still-open connection during graceful shutdown. */
+ int64_t priority;
+ r = sd_event_source_get_priority(v->quit_event_source, &priority);
+ if (r < 0)
+ return r;
+
+ r = sd_future_set_priority(f, priority > INT64_MIN ? priority - 1 : priority);
+ if (r < 0)
+ return r;
+
+ /* Hand the future's lifetime over to the event loop: it'll auto-unref on resolve. */
+ r = sd_fiber_set_floating(f, true);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
static int varlink_dispatch_method(sd_varlink *v) {
_cleanup_(sd_json_variant_unrefp) sd_json_variant *parameters = NULL;
sd_varlink_method_flags_t flags = 0;
v->protocol_upgrade || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE));
/* First consult user supplied method implementations */
+ bool is_fiber = false;
callback = hashmap_get(v->server->methods, method);
+ if (!callback) {
+ callback = hashmap_get(v->server->fiber_methods, method);
+ if (callback)
+ is_fiber = true;
+ }
if (!callback) {
if (streq(method, "org.varlink.service.GetInfo"))
callback = generic_method_get_info;
}
if (!invalid) {
- r = callback(v, parameters, flags, v->userdata);
+ if (is_fiber)
+ /* Spawn a fiber to run the callback. The VarlinkFiberData takes a ref on the
+ * connection (bumping n_ref above 2), so the post-callback logic below treats
+ * this as a deferred reply and moves state to PENDING_METHOD. */
+ r = varlink_dispatch_fiber(v, method, callback, parameters, flags);
+ else
+ r = callback(v, parameters, flags, v->userdata);
if (VARLINK_STATE_WANTS_REPLY(v->state)) {
if (r < 0) {
varlink_log_errno(v, r, "Callback for '%s' returned error: %m", method);
* if the method call remains unanswered. */
r = sd_varlink_error_errno(v, r);
} else if (v->sentinel) {
- if (v->previous) {
- r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
- if (r >= 0) {
- v->previous = sd_json_variant_unref(v->previous);
- v->previous_fds = mfree(v->previous_fds);
- v->n_previous_fds = 0;
- varlink_set_state(v, VARLINK_PROCESSED_METHOD);
- }
- } else {
- char *sentinel = TAKE_PTR(v->sentinel);
-
- /* Propagate the sentinel to the client if one was configured
- * and no replies were enqueued by the callback. */
- if (sentinel == POINTER_MAX)
- r = sd_varlink_reply(v, NULL);
- else {
- r = sd_varlink_error(v, sentinel, NULL);
- /* sd_varlink_error() deliberately returns a negative
- * errno mapped from the error id on success (so method
- * callbacks can `return sd_varlink_error(...);` to
- * enqueue a reply and propagate a matching errno in one
- * go). For sentinel dispatch we don't care about that
- * mapping — the reply is either enqueued or not, which
- * we detect via the state transition instead. */
- if (v->state == VARLINK_PROCESSED_METHOD)
- r = 0;
- }
-
- if (sentinel != POINTER_MAX)
- free(sentinel);
- }
+ r = varlink_dispatch_sentinel(v);
if (r < 0)
varlink_log_errno(v, r, "Failed to process sentinel for method '%s': %m", method);
} else {
if (v->state == VARLINK_PROCESSING_METHOD_ONEWAY)
return 0;
- /* This has to be called during a callback, and not after it has exited. */
- assert_return(IN_SET(v->state, VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE),
+ /* This has to be called during a callback, and not after it has exited. The PENDING states
+ * apply to fiber callbacks, which run after varlink_dispatch_method() has already transitioned
+ * the state from PROCESSING to PENDING. */
+ assert_return(IN_SET(v->state,
+ VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
+ VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE),
-EUCLEAN);
char *s = NULL;
while ((m = hashmap_steal_first_key(s->methods)))
free(m);
+ while ((m = hashmap_steal_first_key(s->fiber_methods)))
+ free(m);
+
hashmap_free(s->methods);
+ hashmap_free(s->fiber_methods);
hashmap_free(s->interfaces);
hashmap_free(s->symbols);
hashmap_free(s->by_uid);
return !strchr(p+1, '.');
}
-_public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
+static int varlink_server_bind_internal(sd_varlink_server *s, Hashmap **methods, const char *method, sd_varlink_method_t callback) {
_cleanup_free_ char *m = NULL;
int r;
- assert_return(s, -EINVAL);
- assert_return(method, -EINVAL);
- assert_return(callback, -EINVAL);
+ assert(s);
+ assert(methods);
+ assert(method);
+ assert(callback);
if (varlink_symbol_in_interface(method, "org.varlink.service") ||
varlink_symbol_in_interface(method, "io.systemd"))
return varlink_server_log_errno(s, SYNTHETIC_ERRNO(EEXIST), "Cannot bind server to '%s'.", method);
+ /* Refuse to register the same method in both the regular and fiber method maps: the dispatcher
+ * always consults methods first and would silently ignore a shadowed fiber_methods entry (or vice
+ * versa), hiding the misconfiguration. */
+ Hashmap *other = methods == &s->methods ? s->fiber_methods : s->methods;
+ if (hashmap_contains(other, method))
+ return varlink_server_log_errno(s, SYNTHETIC_ERRNO(EEXIST),
+ "Method '%s' is already bound in the other method map.", method);
+
m = strdup(method);
if (!m)
return log_oom_debug();
- r = hashmap_ensure_put(&s->methods, &string_hash_ops, m, callback);
+ r = hashmap_ensure_put(methods, &string_hash_ops, m, callback);
if (r == -ENOMEM)
return log_oom_debug();
if (r < 0)
return 0;
}
-_public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, ...) {
- va_list ap;
+static int varlink_server_bind_many_internal(sd_varlink_server *s, Hashmap **methods, va_list ap) {
int r = 0;
- assert_return(s, -EINVAL);
+ assert(s);
+ assert(methods);
- va_start(ap, s);
for (;;) {
sd_varlink_method_t callback;
const char *method;
callback = va_arg(ap, sd_varlink_method_t);
- r = sd_varlink_server_bind_method(s, method, callback);
+ r = varlink_server_bind_internal(s, methods, method, callback);
if (r < 0)
break;
}
+
+ return r;
+}
+
+_public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
+ assert_return(s, -EINVAL);
+ assert_return(method, -EINVAL);
+ assert_return(callback, -EINVAL);
+
+ return varlink_server_bind_internal(s, &s->methods, method, callback);
+}
+
+_public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, ...) {
+ va_list ap;
+ int r;
+
+ assert_return(s, -EINVAL);
+
+ va_start(ap, s);
+ r = varlink_server_bind_many_internal(s, &s->methods, ap);
+ va_end(ap);
+
+ return r;
+}
+
+int varlink_server_bind_fiber(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
+ assert_return(s, -EINVAL);
+ assert_return(method, -EINVAL);
+ assert_return(callback, -EINVAL);
+
+ return varlink_server_bind_internal(s, &s->fiber_methods, method, callback);
+}
+
+int varlink_server_bind_fiber_many_internal(sd_varlink_server *s, ...) {
+ va_list ap;
+ int r;
+
+ assert_return(s, -EINVAL);
+
+ va_start(ap, s);
+ r = varlink_server_bind_many_internal(s, &s->fiber_methods, ap);
va_end(ap);
return r;
#include <fcntl.h>
#include <poll.h>
-#include <pthread.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include "sd-event.h"
+#include "sd-future.h"
#include "sd-json.h"
#include "sd-varlink.h"
/* Block the main event loop while we flood */
ASSERT_OK_EQ_ERRNO(write(block_write_fd, &x, sizeof(x)), (ssize_t) sizeof(x));
- ASSERT_OK(sd_event_default(&e));
+ /* Create a fresh event loop for the flood test — we can't reuse the default event because the
+ * main test (and the fiber we're running in) is already running it, and sd_event_loop() asserts
+ * the event is in the INITIAL state. Exit-on-idle so the nested loop terminates once the
+ * overload reply has been received and all other work is quiesced. */
+ ASSERT_OK(sd_event_new(&e));
+ ASSERT_OK(sd_event_set_exit_on_idle(e, true));
/* Flood the server with connections */
ASSERT_NOT_NULL(connections = new0(sd_varlink*, OVERLOAD_CONNECTIONS));
connections[k] = sd_varlink_unref(connections[k]);
}
-static void *thread(void *arg) {
+static int client_fiber(void *arg) {
_cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
_cleanup_(sd_json_variant_unrefp) sd_json_variant *i = NULL;
_cleanup_(sd_json_variant_unrefp) sd_json_variant *wrong = NULL;
SD_JSON_BUILD_PAIR_INTEGER("b", 99))));
ASSERT_OK(sd_varlink_connect_address(&c, arg));
- ASSERT_OK(sd_varlink_set_description(c, "thread-client"));
+ ASSERT_OK(sd_varlink_set_description(c, "fiber-client"));
ASSERT_OK(sd_varlink_set_allow_fd_passing_input(c, true));
ASSERT_OK(sd_varlink_set_allow_fd_passing_output(c, true));
ASSERT_OK(sd_varlink_send(c, "io.test.Done", NULL));
- return NULL;
+ return 0;
}
static int block_fd_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
_cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
_cleanup_(sd_json_variant_unrefp) sd_json_variant *v = NULL;
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
_cleanup_close_pair_ int block_fds[2] = EBADF_PAIR;
- pthread_t t;
const char *sp;
ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
ASSERT_OK(sd_varlink_attach_event(c, e, 0));
- ASSERT_OK(-pthread_create(&t, NULL, thread, (void*) sp));
+ ASSERT_OK(sd_fiber_new(e, "client", client_fiber, (void*) sp, /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
- ASSERT_OK(-pthread_join(t, NULL));
+ ASSERT_OK(sd_future_result(f));
}
static int method_invalid(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
ASSERT_OK(sd_event_loop(e));
}
+static int method_fiber_sentinel_error(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Set an error sentinel from a fiber callback and return without sending a reply. The sentinel
+ * error should still be propagated by the fiber's post-callback logic, even though the varlink
+ * state has already been transitioned to VARLINK_PENDING_METHOD by the time the fiber runs. */
+ ASSERT_OK(sd_varlink_set_sentinel(link, "io.test.SentinelError"));
+ return 0;
+}
+
+TEST(fiber_sentinel_error) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberSentinelError", method_fiber_sentinel_error));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_error));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberSentinelError", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_fiber_errno(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Return a negative errno without sending a reply. The fiber's post-callback logic should
+ * convert this into a SD_VARLINK_ERROR_SYSTEM reply. */
+ return -ENOSYS;
+}
+
+static int reply_fiber_errno(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ ASSERT_STREQ(error_id, SD_VARLINK_ERROR_SYSTEM);
+ ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "errno")), ENOSYS);
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(fiber_errno) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberErrno", method_fiber_errno));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_fiber_errno));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberErrno", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_fiber_no_reply(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Return success without replying and without stashing a ref. The fiber's post-callback
+ * logic should detect this and fail the connection. */
+ return 0;
+}
+
+static int reply_fiber_no_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ ASSERT_STREQ(error_id, SD_VARLINK_ERROR_DISCONNECTED);
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(fiber_no_reply) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberNoReply", method_fiber_no_reply));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_fiber_no_reply));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberNoReply", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int fiber_stashed_deferred_reply(sd_event_source *s, void *userdata) {
+ _cleanup_(sd_varlink_unrefp) sd_varlink *link = ASSERT_PTR(userdata);
+
+ sd_event_source_disable_unref(s);
+ ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "stashed")));
+ return 0;
+}
+
+static int method_fiber_stash(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Stash a ref on the connection so n_ref > 2 when the fiber returns, and reply later from a
+ * deferred event source. The fiber's post-callback logic should see the extra ref and treat
+ * this as a valid deferred-reply case instead of failing the connection. */
+ sd_event_source *source;
+
+ ASSERT_OK(sd_event_add_defer(sd_varlink_get_event(link), &source, fiber_stashed_deferred_reply, sd_varlink_ref(link)));
+ ASSERT_OK(sd_event_source_set_enabled(source, SD_EVENT_ONESHOT));
+ return 0;
+}
+
+static int reply_fiber_stash(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ ASSERT_NULL(error_id);
+ ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "stashed");
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(fiber_stash) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(varlink_server_bind_fiber(s, "io.test.FiberStash", method_fiber_stash));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_fiber_stash));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.FiberStash", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
static int method_with_fd_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
_cleanup_close_ int fd1 = -EBADF, fd2 = -EBADF;
if (r < 0)
return r;
- /* After upgrade, do raw I/O: read until EOF, reverse, write back.
- * The client shuts down its write side after sending, so we get a clean EOF. */
+ /* After upgrade, do raw I/O: read until the client shuts down its write side (giving us a clean
+ * EOF), reverse what we got, and write it back. Use suspending I/O so other fibers (the client)
+ * can make progress while we're waiting on the socket. */
char buf[64] = {};
ssize_t n = ASSERT_OK(loop_read(input_fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
ASSERT_GT(n, 0);
/* Calling reply_and_upgrade without the client requesting it should fail with -EPROTO */
ASSERT_ERROR(sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd), EPROTO);
- sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS);
-
return sd_varlink_reply(link, /* parameters= */ NULL);
}
-static void *upgrade_thread(void *arg) {
+static int upgrade_client_fiber(void *arg) {
_cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
_cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
sd_json_variant *o = NULL;
ASSERT_OK(sd_varlink_call(c2, "io.test.UpgradeWithoutFlag", /* parameters= */ NULL, &o, &error_id));
ASSERT_NULL(error_id);
- return NULL;
+ ASSERT_OK(sd_event_exit(sd_fiber_get_event(), EXIT_SUCCESS));
+ return 0;
}
TEST(upgrade) {
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
_cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
- pthread_t t;
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
const char *sp;
ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
- ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
+ /* The method does raw I/O on the upgraded socket — bind it as a fiber method so it can
+ * suspend on loop_read()/loop_write() and the client fiber can make progress concurrently. */
+ ASSERT_OK(varlink_server_bind_fiber(s, "io.test.Upgrade", method_upgrade));
ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
- ASSERT_OK(-pthread_create(&t, NULL, upgrade_thread, (void*) sp));
+ ASSERT_OK(sd_fiber_new(e, "upgrade-client", upgrade_client_fiber, (void*) sp, /* destroy= */ NULL, &f));
- /* Run the event loop until no more connections (the thread will disconnect when done) */
+ /* Run the event loop. Exits on idle once the client fiber completes and all server connections
+ * have been torn down. */
ASSERT_OK(sd_event_loop(e));
- ASSERT_OK(-pthread_join(t, NULL));
+ ASSERT_OK(sd_future_result(f));
}
-static int method_upgrade_and_exit(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
- sd_event *event = ASSERT_PTR(userdata);
-
- int r = method_upgrade(link, parameters, flags, /* userdata= */ NULL);
-
- /* Exit the event loop after the upgrade is handled. We can't use sd_varlink_get_event()
- * here because the connection is already disconnected after reply_and_upgrade. */
- (void) sd_event_exit(event, r < 0 ? r : EXIT_SUCCESS);
- return r;
-}
-
-static void *upgrade_pipelining_thread(void *arg) {
+static int upgrade_pipelining_client_fiber(void *arg) {
union sockaddr_union sa = {};
_cleanup_close_ int fd = -EBADF;
/* Shut down write side so server's method_upgrade sees EOF after raw payload */
ASSERT_OK_ERRNO(shutdown(fd, SHUT_WR));
- /* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes
- * the connection after writing, so loop_read() reads until EOF and gets it all. */
+ /* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes the
+ * connection after writing, so loop_read() reads until EOF and gets it all. */
char buf[256] = {};
ssize_t n = ASSERT_OK(loop_read(fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
ASSERT_GT(n, 0);
ASSERT_EQ(raw_size, strlen(raw_payload));
ASSERT_STREQ(strndupa_safe(raw, raw_size), "!denilepiP");
- return NULL;
+ ASSERT_OK(sd_event_exit(sd_fiber_get_event(), EXIT_SUCCESS));
+ return 0;
}
TEST(upgrade_pipelining) {
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
_cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
- pthread_t t;
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
const char *sp;
ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
ASSERT_OK(sd_event_new(&e));
- ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE|SD_VARLINK_SERVER_INHERIT_USERDATA));
+ ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-pipelining-server"));
- ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade_and_exit));
+ /* method_upgrade does raw I/O on the upgraded socket, so bind as a fiber method. */
+ ASSERT_OK(varlink_server_bind_fiber(s, "io.test.Upgrade", method_upgrade));
ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
- sd_varlink_server_set_userdata(s, e);
- ASSERT_OK(-pthread_create(&t, NULL, upgrade_pipelining_thread, (void*) sp));
+ ASSERT_OK(sd_fiber_new(e, "upgrade-pipelining-client", upgrade_pipelining_client_fiber, (void*) sp, /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
- ASSERT_OK(-pthread_join(t, NULL));
+ ASSERT_OK(sd_future_result(f));
}
typedef struct ExecDirServer {
sd_varlink_server *server;
- sd_event *event;
const char *name;
- pthread_t thread;
} ExecDirServer;
static int method_execute_dir_ping(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("name", srv->name));
}
-static void on_execute_dir_disconnect(sd_varlink_server *s, sd_varlink *link, void *userdata) {
- ExecDirServer *srv = ASSERT_PTR(userdata);
-
- /* Only one client (from varlink_execute_directory()) connects per server — once it's gone, we're done. */
- ASSERT_OK(sd_event_exit(srv->event, 0));
-}
-
-static void *execute_dir_server_thread(void *arg) {
- ExecDirServer *srv = arg;
-
- ASSERT_OK(sd_event_loop(srv->event));
- return NULL;
-}
-
static int execute_dir_reply(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
size_t *count = ASSERT_PTR(userdata);
return 0;
}
-TEST(execute_directory) {
- _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
- static const char * const names[] = { "alpha", "beta", "gamma" };
- ExecDirServer servers[ELEMENTSOF(names)] = {};
- size_t reply_count = 0;
-
- ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir));
-
- for (size_t i = 0; i < ELEMENTSOF(names); i++) {
- ExecDirServer *eds = servers + i;
- servers[i].name = names[i];
+typedef struct ExecDirClientArgs {
+ const char *tmpdir;
+ size_t n_servers;
+ size_t *reply_count;
+} ExecDirClientArgs;
- _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i]));
-
- ASSERT_OK(sd_event_new(&eds->event));
- ASSERT_OK(varlink_server_new(&eds->server,
- SD_VARLINK_SERVER_INHERIT_USERDATA,
- eds));
- ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping));
- ASSERT_OK(sd_varlink_server_bind_disconnect(eds->server, on_execute_dir_disconnect));
- ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600));
- ASSERT_OK(sd_varlink_server_attach_event(eds->server, eds->event, 0));
-
- ASSERT_OK(-pthread_create(&eds->thread, NULL, execute_dir_server_thread, eds));
- }
+static int execute_dir_client_fiber(void *arg) {
+ ExecDirClientArgs *a = ASSERT_PTR(arg);
ASSERT_OK_EQ(varlink_execute_directory(
- tmpdir,
+ a->tmpdir,
"io.test.ExecDirPing",
/* parameters= */ NULL,
/* more= */ false,
/* timeout_usec= */ USEC_INFINITY,
execute_dir_reply,
- &reply_count), (ssize_t) ELEMENTSOF(names));
- ASSERT_EQ(reply_count, ELEMENTSOF(names));
-
- FOREACH_ELEMENT(eds, servers) {
- ASSERT_OK(-pthread_join(eds->thread, NULL));
- eds->server = sd_varlink_server_unref(eds->server);
- eds->event = sd_event_unref(eds->event);
- }
+ a->reply_count), (ssize_t) a->n_servers);
+ ASSERT_EQ(*a->reply_count, a->n_servers);
/* Calling the helper against a non-existent directory must fail. */
_cleanup_free_ char *nope = NULL;
- ASSERT_OK(asprintf(&nope, "%s/does-not-exist", tmpdir));
+ ASSERT_OK(asprintf(&nope, "%s/does-not-exist", a->tmpdir));
ASSERT_FAIL(varlink_execute_directory(
nope,
"io.test.ExecDirPing",
/* more= */ false,
/* timeout_usec= */ USEC_INFINITY,
execute_dir_reply,
- &reply_count));
+ a->reply_count));
/* An empty directory must simply return 0 and not invoke the reply callback. */
- _cleanup_free_ char *empty = ASSERT_PTR(path_join(tmpdir, "empty"));
+ _cleanup_free_ char *empty = ASSERT_PTR(path_join(a->tmpdir, "empty"));
ASSERT_OK_ERRNO(mkdir(empty, 0755));
- size_t count_before = reply_count;
+ size_t count_before = *a->reply_count;
ASSERT_OK_ZERO(varlink_execute_directory(
empty,
"io.test.ExecDirPing",
/* more= */ false,
/* timeout_usec= */ USEC_INFINITY,
execute_dir_reply,
- &reply_count));
- ASSERT_EQ(reply_count, count_before);
+ a->reply_count));
+ ASSERT_EQ(*a->reply_count, count_before);
+
+ ASSERT_OK(sd_event_exit(sd_fiber_get_event(), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(execute_directory) {
+ _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ static const char * const names[] = { "alpha", "beta", "gamma" };
+ ExecDirServer servers[ELEMENTSOF(names)] = {};
+ size_t reply_count = 0;
+
+ ASSERT_OK(mkdtemp_malloc("/tmp/varlink-execdir-XXXXXX", &tmpdir));
+
+ ASSERT_OK(sd_event_new(&e));
+
+ for (size_t i = 0; i < ELEMENTSOF(names); i++) {
+ ExecDirServer *eds = servers + i;
+ servers[i].name = names[i];
+
+ _cleanup_free_ char *j = ASSERT_PTR(path_join(tmpdir, names[i]));
+
+ ASSERT_OK(varlink_server_new(&eds->server,
+ SD_VARLINK_SERVER_INHERIT_USERDATA,
+ eds));
+ ASSERT_OK(sd_varlink_server_bind_method(eds->server, "io.test.ExecDirPing", method_execute_dir_ping));
+ ASSERT_OK(sd_varlink_server_listen_address(eds->server, j, 0600));
+ ASSERT_OK(sd_varlink_server_attach_event(eds->server, e, 0));
+ }
+
+ ExecDirClientArgs args = {
+ .tmpdir = tmpdir,
+ .n_servers = ELEMENTSOF(names),
+ .reply_count = &reply_count,
+ };
+ ASSERT_OK(sd_fiber_new(e, "execute-dir-client", execute_dir_client_fiber, &args, NULL, &f));
+
+ ASSERT_OK(sd_event_loop(e));
+
+ ASSERT_OK(sd_future_result(f));
+
+ FOREACH_ELEMENT(eds, servers)
+ eds->server = sd_varlink_server_unref(eds->server);
}
#define CTRUNC_N_FDS 64U
TEST(ctrunc) {
int r;
-
+
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_default(&e));