]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
qmp-client: add fiber-aware call paths
authorDaan De Meyer <daan@amutable.com>
Fri, 24 Apr 2026 09:49:02 +0000 (09:49 +0000)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
The synchronous qmp_client_call() pumps the event loop until its reply
arrives, pinning the parsed reply on c->current so it can hand out
borrowed pointers to the caller. That model only fits one in-flight
sync call: a second qmp_client_call() on the same client clears
c->current before issuing its own send, invalidating the first
caller's borrowed pointers. On a single-threaded event loop that was
fine, but with fibers two concurrent calls on the same client can
interleave through the pump (json_stream_wait() suspends the running
fiber) and trample each other.

To fix this, make qmp_client_call() detect when it's running on a fiber
whose event loop matches the client and transparently delegate to
qmp_client_call_suspend(), which makes use of a new QmpFuture to allow
multiple concurrent calls to qmp_client_call().

To make this work concurrently, we also change qmp_client_call() to
hand out references and copies of errors so that we don't have to store
the borrowed pointers we hand out in the QmpClient struct.

src/shared/qmp-client.c
src/shared/qmp-client.h
src/test/test-qmp-client.c

index 41b0c6dd57034bfce119dddb8c20cc78808151d6..8de903de1ad8ca02ef728c81c6d93f3866cf7996 100644 (file)
@@ -1,6 +1,7 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
 #include "sd-event.h"
+#include "sd-future.h"
 #include "sd-json.h"
 
 #include "alloc-util.h"
@@ -226,19 +227,23 @@ static int qmp_extract_response_id(sd_json_variant *v, uint64_t *ret) {
         return 1;
 }
 
-/* Returns 0 on success (ret_result = "return" value), -EIO on QMP error (reterr_desc set). */
-static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, const char **reterr_desc) {
+/* Returns 0 on success (ret_result = freshly reffed "return" value), -EIO on QMP error
+ * (ret_error_desc set to a freshly allocated string). Caller owns both outputs. */
+static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, char **reterr_error_desc) {
         const char *desc;
 
         desc = qmp_extract_error_description(v);
         if (desc) {
-                if (reterr_desc)
-                        *reterr_desc = desc;
+                if (reterr_error_desc) {
+                        *reterr_error_desc = strdup(desc);
+                        if (!*reterr_error_desc)
+                                return -ENOMEM;
+                }
                 return -EIO;
         }
 
         if (ret_result)
-                *ret_result = sd_json_variant_by_key(v, "return");
+                *ret_result = sd_json_variant_ref(sd_json_variant_by_key(v, "return"));
         return 0;
 }
 
@@ -273,8 +278,8 @@ static int qmp_client_build_command(
 
 /* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */
 static int qmp_client_dispatch(QmpClient *c) {
-        sd_json_variant *result = NULL;
-        const char *desc = NULL;
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
+        _cleanup_free_ char *desc = NULL;
         uint64_t id;
         int error, r;
 
@@ -318,8 +323,8 @@ static int qmp_client_dispatch(QmpClient *c) {
         }
 
         /* Synchronous slot (no callback): leave c->current pinned so qmp_client_call() can
-         * pick up the reply and hand out borrowed pointers into it. The sync caller owns a
-         * ref on the slot and detects completion by observing slot->client turning NULL. */
+         * pick the reply up after its pump loop. The sync caller owns a ref on the slot and
+         * detects completion by observing slot->client turning NULL. */
         if (!slot->callback) {
                 qmp_slot_disconnect(slot, /* unref= */ true);
                 return 1;
@@ -574,6 +579,10 @@ static void qmp_client_clear(QmpClient *c) {
         qmp_client_detach_event(c);
         qmp_client_clear_current(c);
         json_stream_done(&c->stream);
+        /* qmp_client_handle_disconnect() above drained every entry via qmp_client_fail_pending();
+         * the set is borrow-only for non-floating slots, so set_free() can't safely run a
+         * destructor over leftovers — enforce the drain invariant instead. */
+        assert(set_isempty(c->slots));
         c->slots = set_free(c->slots);
 }
 
@@ -745,7 +754,7 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds);
 
 /* Shared send path for qmp_client_invoke() and qmp_client_call(). A NULL callback registers
  * a "synchronous" slot: dispatch_reply leaves c->current pinned on match instead of invoking
- * a callback, so qmp_client_call() can hand out borrowed pointers into the reply. If ret_slot
+ * a callback, so qmp_client_call() can pick the reply up after its pump loop. If ret_slot
  * is NULL the slot is allocated as floating (owned by c->slots); otherwise a reference is
  * handed back to the caller. */
 static int qmp_client_send(
@@ -810,21 +819,193 @@ int qmp_client_invoke(
         return qmp_client_send(c, command, args, callback, userdata, ret_slot);
 }
 
+typedef struct QmpFuture {
+        QmpSlot *slot;            /* owned, non-floating; NULL once disconnected */
+        sd_json_variant *result;
+        char *error_desc;
+} QmpFuture;
+
+static void* qmp_future_alloc(void) {
+        return new0(QmpFuture, 1);
+}
+
+static void qmp_future_free(sd_future *f) {
+        QmpFuture *qf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+        qmp_slot_unref(qf->slot);
+        sd_json_variant_unref(qf->result);
+        free(qf->error_desc);
+        free(qf);
+}
+
+static int qmp_future_cancel(sd_future *f) {
+        QmpFuture *qf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+
+        /* Drop the pending slot so dispatch_reply won't try to fire our callback (and touch
+         * freed memory) when the reply eventually arrives. */
+        qf->slot = qmp_slot_unref(qf->slot);
+        return sd_future_resolve(f, -ECANCELED);
+}
+
+static const sd_future_ops qmp_call_future_ops = {
+        .size = sizeof(sd_future_ops),
+        .alloc = qmp_future_alloc,
+        .free = qmp_future_free,
+        .cancel = qmp_future_cancel,
+};
+
+static int qmp_future_callback(
+                QmpClient *c,
+                sd_json_variant *result,
+                const char *desc,
+                int error,
+                void *userdata) {
+
+        sd_future *f = ASSERT_PTR(userdata);
+        QmpFuture *qf = ASSERT_PTR(sd_future_get_private(f));
+        int r;
+
+        assert(result || desc || error);
+
+        if (result)
+                qf->result = sd_json_variant_ref(result);
+        if (desc) {
+                qf->error_desc = strdup(desc);
+                if (!qf->error_desc)
+                        /* No usable reply payload to surface — propagate as transport-style
+                         * failure so suspend() / sd_future_result() see the OOM. */
+                        return sd_future_resolve(f, -ENOMEM);
+        }
+
+        /* Resolve with 0 on a success reply and -EIO on a QMP-level error (matching the synchronous
+         * path's errno for the desc-without-ret_error_desc case), so a caller awaiting the future
+         * learns about call failures from the resolution value alone. The reply payload itself
+         * (result or error_desc) is always stashed on the QmpFuture so future_get_qmp_reply() can
+         * hand the description string back on top of the bare -EIO. With no reply at all (transport
+         * failure, disconnect), resolve with the propagated transport errno; cancellation surfaces
+         * as -ECANCELED via qmp_future_cancel(). */
+        if (result)
+                r = 0;
+        else if (desc)
+                r = -EIO;
+        else
+                r = error;
+
+        return sd_future_resolve(f, r);
+}
+
+int qmp_client_call_future(
+                QmpClient *c,
+                const char *command,
+                QmpClientArgs *args,
+                sd_future **ret) {
+
+        int r;
+
+        assert(c);
+        assert(command);
+        assert(ret);
+
+        _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+        r = sd_future_new(&qmp_call_future_ops, &f);
+        if (r < 0)
+                return r;
+
+        QmpFuture *qf = sd_future_get_private(f);
+
+        r = qmp_client_send(c, command, args, qmp_future_callback, f, &qf->slot);
+        if (r < 0)
+                return r;
+
+        *ret = TAKE_PTR(f);
+        return 0;
+}
+
+/* Extract the reply from a resolved qmp_client_call_future(). Returns 1 on success (with
+ * *ret_result a fresh reference the caller unrefs), -EIO on a QMP-level error (with the detail
+ * description copied into *ret_error_desc when the caller passed one to receive it), and the
+ * future's negative resume errno when no reply landed at all (transport failure / cancellation).
+ */
+int future_get_qmp_reply(sd_future *f, sd_json_variant **ret_result, char **reterr_error_desc) {
+        assert(f);
+        assert(sd_future_get_ops(f) == &qmp_call_future_ops);
+        assert(sd_future_state(f) == SD_FUTURE_RESOLVED);
+
+        QmpFuture *qf = ASSERT_PTR(sd_future_get_private(f));
+
+        /* No reply at all: transport failure or cancellation — surface the future result. */
+        if (!qf->result && !qf->error_desc)
+                return sd_future_result(f);
+
+        if (qf->error_desc) {
+                if (reterr_error_desc) {
+                        char *desc = strdup(qf->error_desc);
+                        if (!desc)
+                                return -ENOMEM;
+                        *reterr_error_desc = desc;
+                }
+                return -EIO;
+        }
+
+        if (reterr_error_desc)
+                *reterr_error_desc = NULL;
+        if (ret_result)
+                *ret_result = sd_json_variant_ref(qf->result);
+
+        return 1;
+}
+
+static int qmp_client_call_suspend(
+                QmpClient *c,
+                const char *command,
+                QmpClientArgs *args,
+                sd_json_variant **ret_result,
+                char **ret_error_desc) {
+
+        int r;
+
+        assert(c);
+        assert(command);
+        assert(sd_fiber_is_running());
+
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *call = NULL;
+        r = qmp_client_call_future(c, command, args, &call);
+        if (r < 0)
+                return r;
+
+        r = sd_fiber_suspend();
+
+        /* If the future isn't resolved, the suspend was interrupted before a reply arrived (fiber
+         * cancelled, fiber-wide SD_FIBER_TIMEOUT scope expired, …). There's no reply to extract,
+         * so surface the resume error directly. When the future is resolved, future_get_qmp_reply()
+         * already encodes success (1), QMP-level error (-EIO with the desc captured if asked for),
+         * and no-reply (negative future result) — pass it through. */
+        if (sd_future_state(call) != SD_FUTURE_RESOLVED)
+                return r;
+
+        return future_get_qmp_reply(call, ret_result, ret_error_desc);
+}
+
 int qmp_client_call(
                 QmpClient *c,
                 const char *command,
                 QmpClientArgs *args,
                 sd_json_variant **ret_result,
-                const char **ret_error_desc) {
+                char **reterr_error_desc) {
 
-        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
+        _cleanup_free_ char *desc = NULL;
         int r;
 
         assert_return(c, -EINVAL);
         assert_return(command, -EINVAL);
 
-        /* Drop any reply pinned by a previous qmp_client_call() before we pin a new one. */
-        qmp_client_clear_current(c);
+        /* If we're on a fiber sharing the QMP client's event loop, use the async + suspend path so
+         * multiple concurrent qmp_client_call() invocations across fibers don't deadlock each other
+         * on the process+wait pump. */
+        if (sd_fiber_is_running() && qmp_client_get_event(c) == sd_fiber_get_event())
+                return qmp_client_call_suspend(c, command, args, ret_result, reterr_error_desc);
+
+        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
 
         /* NULL callback marks this as a synchronous slot: dispatch_reply matches on id like
          * any other slot (so stray unknown-id replies still get logged and dropped), but
@@ -855,18 +1036,24 @@ int qmp_client_call(
                         return r;
         }
 
-        sd_json_variant *result = NULL;
-        const char *desc = NULL;
-        int error = qmp_parse_response(c->current, &result, &desc);
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *current = TAKE_PTR(c->current);
+        r = qmp_parse_response(current, &result, &desc);
+        if (r < 0 && r != -EIO)
+                return r;
 
-        /* If caller doesn't ask for the error string, surface the error as the return code. */
-        if (!ret_error_desc && error < 0)
-                return error;
+        /* QMP-level error: copy the description into *ret_error_desc when the caller asked for it,
+         * and surface the failure via the return value (matching qmp_client_call_suspend() /
+         * sd_bus_call()'s "negative on error" convention). */
+        if (desc) {
+                if (reterr_error_desc)
+                        *reterr_error_desc = TAKE_PTR(desc);
+                return -EIO;
+        }
 
         if (ret_result)
-                *ret_result = result;
-        if (ret_error_desc)
-                *ret_error_desc = desc;
+                *ret_result = TAKE_PTR(result);
+        if (reterr_error_desc)
+                *reterr_error_desc = NULL;
 
         return 1;
 }
index 7dcd53355d06c76078ca92309fc2d80becab7c2e..0bf435384caaf3ca137d660f79aedda3477c61ab 100644 (file)
@@ -68,15 +68,23 @@ int qmp_client_invoke(
                 qmp_command_callback_t callback,
                 void *userdata);
 
-/* Synchronous send + receive. Pumps the event loop until the reply arrives. *ret_result and
- * *ret_error_desc are borrowed pointers into the last reply, valid until the next
- * qmp_client_call(). Same contract as sd_varlink_call(). */
 int qmp_client_call(
                 QmpClient *client,
                 const char *command,
                 QmpClientArgs *args,
                 sd_json_variant **ret_result,
-                const char **ret_error_desc);
+                char **reterr_error_desc);
+
+int qmp_client_call_future(
+                QmpClient *client,
+                const char *command,
+                QmpClientArgs *args,
+                sd_future **ret);
+
+int future_get_qmp_reply(
+                sd_future *f,
+                sd_json_variant **ret_result,
+                char **reterr_error_desc);
 
 void qmp_client_bind_event(QmpClient *c, qmp_event_callback_t callback, void *userdata);
 void qmp_client_bind_disconnect(QmpClient *c, qmp_disconnect_callback_t callback, void *userdata);
index befee024845880476931bfddb2ac27c06d5bf5ef..e5e7ed3b735b12306e0b93d180cb7f860d6df26f 100644 (file)
@@ -646,7 +646,7 @@ TEST(qmp_client_call) {
 
         /* Successful call: borrowed result pointer is valid until the next call. */
         sd_json_variant *result = NULL;
-        const char *error_desc = NULL;
+        _cleanup_free_ char *error_desc = NULL;
         ASSERT_EQ(qmp_client_call(client, "query-status", NULL, &result, &error_desc), 1);
         ASSERT_NULL(error_desc);
         ASSERT_NOT_NULL(result);
@@ -658,7 +658,7 @@ TEST(qmp_client_call) {
 
         /* QMP error with ret_error_desc provided: returns 1, result NULL, desc set. */
         result = (sd_json_variant*) 0x1;  /* poison to catch lack-of-write */
-        error_desc = NULL;
+        free(error_desc);
         ASSERT_EQ(qmp_client_call(client, "stop", NULL, &result, &error_desc), 1);
         ASSERT_NULL(result);
         ASSERT_STREQ(error_desc, "not running");