]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
qmp-client: make QmpSlot a public, refcounted, cancellable handle
authorDaan De Meyer <daan@amutable.com>
Fri, 24 Apr 2026 06:07:31 +0000 (06:07 +0000)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Fri, 24 Apr 2026 12:29:31 +0000 (14:29 +0200)
QmpSlot now stores a back-reference to its QmpClient (mirroring sd_bus_slot)
and is exposed as a public refcounted type via qmp_slot_ref/qmp_slot_unref.
qmp_client_invoke() gains an optional QmpSlot **ret_slot out-parameter
matching sd_bus_call_async(): passing non-NULL hands back a reference whose
unref cancels the pending call (the callback is deregistered; a late reply
is logged and discarded as unknown-id).

Internally slots come in two flavours, following sd_bus's model: floating
(owned by the client's pending set, used when ret_slot is NULL) and
non-floating (ref held by the caller, slot holds a ref on the client).
qmp_slot_disconnect() centralizes the teardown so the reply-dispatched,
explicit-cancel, and client-teardown paths all converge on the same
idempotent cleanup.

qmp_client_call()'s sync slot is now non-floating and observes completion
by watching slot->client go NULL instead of set_contains() on an id.

src/shared/qmp-client.c
src/shared/qmp-client.h
src/shared/shared-forward.h
src/test/test-qmp-client-qemu.c
src/test/test-qmp-client.c
src/vmspawn/vmspawn-qmp.c
src/vmspawn/vmspawn-varlink.c

index 53a30abe5a9b589ac84c0e07cd99f007231f2927..45bc0d8dbd22e27d0fec8cc8a80ceb50a4e4ba8e 100644 (file)
@@ -30,11 +30,14 @@ typedef enum QmpClientState {
                QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, \
                QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT)
 
-typedef struct QmpSlot {
+struct QmpSlot {
+        unsigned n_ref;
+        QmpClient *client;  /* NULL once disconnected (reply dispatched, cancelled, or client died) */
         uint64_t id;
+        bool floating;
         qmp_command_callback_t callback;
         void *userdata;
-} QmpSlot;
+};
 
 struct QmpClient {
         unsigned n_ref;
@@ -69,9 +72,92 @@ static int qmp_slot_compare_func(const QmpSlot *a, const QmpSlot *b) {
 DEFINE_PRIVATE_HASH_OPS(qmp_slot_hash_ops,
                         QmpSlot, qmp_slot_hash_func, qmp_slot_compare_func);
 
+/* Break the slot's connection to the client: remove from the lookup set, drop whichever reference
+ * is implied by the slot's floating-ness. For floating slots, the set is the sole owner, so with
+ * unref=true we also drop the slot's n_ref (usually dropping it to zero and freeing). For
+ * non-floating slots, we release the back-reference the slot holds on the client.
+ *
+ * Safe to call multiple times: once slot->client is NULL, subsequent calls are no-ops. */
+static void qmp_slot_disconnect(QmpSlot *slot, bool unref) {
+        assert(slot);
+
+        if (!slot->client)
+                return;
+
+        QmpClient *client = slot->client;
+
+        set_remove(client->slots, slot);
+        slot->client = NULL;
+
+        if (!slot->floating)
+                qmp_client_unref(client);
+        else if (unref)
+                /* May re-enter via qmp_slot_free→qmp_slot_disconnect(,false) if this drops the
+                 * last ref, but the early return above makes that recursion a no-op. */
+                qmp_slot_unref(slot);
+}
+
+static QmpSlot* qmp_slot_free(QmpSlot *slot) {
+        if (!slot)
+                return NULL;
+
+        /* Idempotent: if the slot was already disconnected (reply dispatched, explicit cancel,
+         * or client-side teardown), this is a no-op. Otherwise it removes us from the set and
+         * drops our client reference (for non-floating slots). */
+        qmp_slot_disconnect(slot, /* unref= */ false);
+
+        return mfree(slot);
+}
+
+DEFINE_TRIVIAL_REF_UNREF_FUNC(QmpSlot, qmp_slot, qmp_slot_free);
+
+QmpClient* qmp_slot_get_client(QmpSlot *slot) {
+        assert(slot);
+        return slot->client;
+}
+
+static int qmp_slot_new(
+                QmpClient *client,
+                bool floating,
+                uint64_t id,
+                qmp_command_callback_t callback,
+                void *userdata,
+                QmpSlot **ret) {
+
+        int r;
+
+        assert(client);
+        assert(ret);
+
+        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = new(QmpSlot, 1);
+        if (!slot)
+                return -ENOMEM;
+
+        *slot = (QmpSlot) {
+                .n_ref    = 1,
+                .client   = NULL,   /* wired up below, after set_put succeeds */
+                .id       = id,
+                .floating = floating,
+                .callback = callback,
+                .userdata = userdata,
+        };
+
+        r = set_ensure_put(&client->slots, &qmp_slot_hash_ops, slot);
+        if (r < 0)
+                return r;
+        assert(r > 0);
+
+        slot->client = client;
+        if (!floating)
+                qmp_client_ref(client);
+
+        *ret = TAKE_PTR(slot);
+        return 0;
+}
+
 static void qmp_client_clear(QmpClient *c);
 
-static QmpClient* qmp_client_destroy(QmpClient *c) {
+static QmpClient* qmp_client_free(QmpClient *c) {
         if (!c)
                 return NULL;
 
@@ -80,8 +166,7 @@ static QmpClient* qmp_client_destroy(QmpClient *c) {
         return mfree(c);
 }
 
-DEFINE_PRIVATE_TRIVIAL_REF_FUNC(QmpClient, qmp_client);
-DEFINE_TRIVIAL_UNREF_FUNC(QmpClient, qmp_client, qmp_client_destroy);
+DEFINE_TRIVIAL_REF_UNREF_FUNC(QmpClient, qmp_client, qmp_client_free);
 
 static void qmp_client_clear_current(QmpClient *c) {
         assert(c);
@@ -225,42 +310,59 @@ static int qmp_client_dispatch_reply(QmpClient *c) {
                 return 1;
         }
 
-        _cleanup_free_ QmpSlot *pending = set_remove(c->slots, &(QmpSlot) { .id = id });
-        if (!pending) {
+        QmpSlot *slot = set_get(c->slots, &(QmpSlot) { .id = id });
+        if (!slot) {
                 qmp_client_clear_current(c);
                 json_stream_log(&c->stream, "Discarding QMP response with unknown id %" PRIu64, id);
                 return 1;
         }
 
         /* Synchronous slot (no callback): leave c->current pinned so qmp_client_call() can
-         * pick up the reply and hand out borrowed pointers into it. */
-        if (!pending->callback)
+         * pick up the reply and hand out borrowed pointers into it. The sync caller owns a
+         * ref on the slot and detects completion by observing slot->client turning NULL. */
+        if (!slot->callback) {
+                qmp_slot_disconnect(slot, /* unref= */ true);
                 return 1;
+        }
 
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
         error = qmp_parse_response(v, &result, &desc);
 
-        r = pending->callback(c, result, desc, error, pending->userdata);
+        /* Pin the slot across the callback regardless of floating-ness. For a floating slot,
+         * disconnect(unref=true) drops the set's implicit ref which would otherwise free it
+         * out from under the callback. */
+        qmp_slot_ref(slot);
+
+        r = slot->callback(c, result, desc, error, slot->userdata);
         if (r < 0)
                 json_stream_log_errno(&c->stream, r, "Command callback returned error, ignoring: %m");
 
+        qmp_slot_disconnect(slot, /* unref= */ true);
+        qmp_slot_unref(slot);
+
         return 1;
 }
 
-/* Fail all pending async commands with the given error. Called on disconnect. */
+/* Fail all pending commands with the given error. Called on disconnect. */
 static void qmp_client_fail_pending(QmpClient *c, int error) {
-        QmpSlot *p;
+        QmpSlot *slot;
         int r;
 
         assert(c);
 
-        while ((p = set_steal_first(c->slots))) {
-                if (p->callback) {
-                        r = p->callback(c, /* result= */ NULL, /* error_desc= */ NULL, error, p->userdata);
+        while ((slot = set_first(c->slots))) {
+                /* Keep alive across the callback and past disconnect (which may unref it for
+                 * floating slots). */
+                qmp_slot_ref(slot);
+
+                if (slot->callback) {
+                        r = slot->callback(c, /* result= */ NULL, /* error_desc= */ NULL, error, slot->userdata);
                         if (r < 0)
                                 json_stream_log_errno(&c->stream, r, "Command callback returned error, ignoring: %m");
                 }
-                free(p);
+
+                qmp_slot_disconnect(slot, /* unref= */ true);
+                qmp_slot_unref(slot);
         }
 }
 
@@ -701,17 +803,19 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds);
 
 /* Shared send path for qmp_client_invoke() and qmp_client_call(). A NULL callback registers
  * a "synchronous" slot: dispatch_reply leaves c->current pinned on match instead of invoking
- * a callback, so qmp_client_call() can hand out borrowed pointers into the reply. */
+ * a callback, so qmp_client_call() can hand out borrowed pointers into the reply. If ret_slot
+ * is NULL the slot is allocated as floating (owned by c->slots); otherwise a reference is
+ * handed back to the caller. */
 static int qmp_client_send(
                 QmpClient *c,
                 const char *command,
                 QmpClientArgs *args,
                 qmp_command_callback_t callback,
                 void *userdata,
-                uint64_t *ret_id) {
+                QmpSlot **ret_slot) {
 
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL;
-        _cleanup_free_ QmpSlot *pending = NULL;
+        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
         /* Closes any fds in args on every early-return path; TAKE_PTR()'d on the success path
          * below once json_stream_enqueue_full() has taken ownership of them. */
         _cleanup_(qmp_client_args_close_fdsp) QmpClientArgs *fds_owner = args;
@@ -729,50 +833,40 @@ static int qmp_client_send(
         if (r < 0)
                 return r;
 
-        pending = new(QmpSlot, 1);
-        if (!pending)
-                return -ENOMEM;
-
-        *pending = (QmpSlot) {
-                .id       = id,
-                .callback = callback,
-                .userdata = userdata,
-        };
-
-        r = set_ensure_put(&c->slots, &qmp_slot_hash_ops, pending);
+        r = qmp_slot_new(c, /* floating= */ !ret_slot, id, callback, userdata, &slot);
         if (r < 0)
                 return r;
-        assert(r > 0);
 
         r = json_stream_enqueue_full(&c->stream, cmd,
                                      args ? args->fds_consume : NULL,
                                      args ? args->n_fds : 0);
-        if (r < 0) {
-                set_remove(c->slots, pending);
-                return r;
-        }
+        if (r < 0)
+                return r;  /* slot cleanup disconnects it */
 
         /* Arm defer so process() drains the output on the next iteration. */
         if (c->defer_event_source)
                 (void) sd_event_source_set_enabled(c->defer_event_source, SD_EVENT_ON);
 
-        TAKE_PTR(pending);
         TAKE_PTR(fds_owner);
 
-        if (ret_id)
-                *ret_id = id;
+        if (ret_slot)
+                *ret_slot = TAKE_PTR(slot);
+        else
+                TAKE_PTR(slot);  /* floating: c->slots keeps it alive until dispatch */
+
         return 0;
 }
 
 int qmp_client_invoke(
                 QmpClient *c,
+                QmpSlot **ret_slot,
                 const char *command,
                 QmpClientArgs *args,
                 qmp_command_callback_t callback,
                 void *userdata) {
 
         assert(callback);
-        return qmp_client_send(c, command, args, callback, userdata, /* ret_id= */ NULL);
+        return qmp_client_send(c, command, args, callback, userdata, ret_slot);
 }
 
 int qmp_client_call(
@@ -782,7 +876,7 @@ int qmp_client_call(
                 sd_json_variant **ret_result,
                 const char **ret_error_desc) {
 
-        uint64_t id;
+        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
         int r;
 
         assert_return(c, -EINVAL);
@@ -793,17 +887,18 @@ int qmp_client_call(
 
         /* NULL callback marks this as a synchronous slot: dispatch_reply matches on id like
          * any other slot (so stray unknown-id replies still get logged and dropped), but
-         * pins c->current for us instead of invoking a callback. */
-        r = qmp_client_send(c, command, args, /* callback= */ NULL, /* userdata= */ NULL, &id);
+         * pins c->current for us instead of invoking a callback. The slot is non-floating so
+         * we can observe dispatch by watching slot->client go NULL. */
+        r = qmp_client_send(c, command, args, /* callback= */ NULL, /* userdata= */ NULL, &slot);
         if (r < 0)
                 return r;
 
-        /* Pump the loop until our sync slot fires (removed from c->slots, c->current pinned). */
+        /* Pump the loop until our sync slot fires (disconnected by dispatch, c->current pinned). */
         for (;;) {
                 if (c->state == QMP_CLIENT_DISCONNECTED)
                         return -ECONNRESET;
 
-                if (!set_contains(c->slots, &(QmpSlot) { .id = id })) {
+                if (!slot->client) {
                         assert(c->current);
                         break;
                 }
index b3ee8262e01e9696ece62081d088deeb40d4727b..7a477f7e4fa375b9be89d02c49544481263ae174 100644 (file)
@@ -54,9 +54,12 @@ bool qmp_client_is_idle(QmpClient *c);
 /* True iff the connection is dead. Stable terminal state — once set, it stays set. */
 bool qmp_client_is_disconnected(QmpClient *c);
 
-/* Async send. Returns 0 on send (callback will fire later), negative errno on failure. */
+/* Async send. Returns 0 on send (callback will fire later), negative errno on failure. If
+ * ret_slot is non-NULL, returns a reference to a QmpSlot which can be used to cancel the call
+ * (by unreffing it before the reply arrives). */
 int qmp_client_invoke(
                 QmpClient *client,
+                QmpSlot **ret_slot,
                 const char *command,
                 QmpClientArgs *args,
                 qmp_command_callback_t callback,
@@ -78,10 +81,14 @@ int qmp_client_set_description(QmpClient *c, const char *description);
 sd_event* qmp_client_get_event(QmpClient *c);
 unsigned qmp_client_next_fdset_id(QmpClient *client);
 
-QmpClient* qmp_client_unref(QmpClient *p);
-
+DECLARE_TRIVIAL_REF_UNREF_FUNC(QmpClient, qmp_client);
 DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClient *, qmp_client_unref);
 
+DECLARE_TRIVIAL_REF_UNREF_FUNC(QmpSlot, qmp_slot);
+DEFINE_TRIVIAL_CLEANUP_FUNC(QmpSlot *, qmp_slot_unref);
+
+QmpClient* qmp_slot_get_client(QmpSlot *slot);
+
 /* Returns true iff any object entry in schema (result of query-qmp-schema) has a member with this
  * name. QEMU's introspection replaces type names with opaque numeric ids, so lookup-by-type-name is
  * impossible — but member names are real. Use only when the member name is unique in the schema. */
index 1a19b42499ca5f2acc9d21d6fa2130c0995714ef..1207fe8a258266e6f2f9c2dbd3d18caf5b6dc02b 100644 (file)
@@ -80,6 +80,7 @@ typedef struct MStack MStack;
 typedef struct OpenFile OpenFile;
 typedef struct Pkcs11EncryptedKey Pkcs11EncryptedKey;
 typedef struct QmpClient QmpClient;
+typedef struct QmpSlot QmpSlot;
 typedef struct Table Table;
 typedef struct Tpm2Context Tpm2Context;
 typedef struct Tpm2Handle Tpm2Handle;
index ec520cc270a04f179546b94fc8115c134167b5a1..df8d3c6e21599f9e13637262eedc456f0a2a60a8 100644 (file)
@@ -133,7 +133,7 @@ TEST(qmp_client_qemu_handshake_and_schema) {
 
         /* query-qmp-schema returns ~200KB -- validates the buffered reader handles large multi-read()
          * responses correctly. The handshake completes transparently inside invoke(). */
-        r = qmp_client_invoke(client, "query-qmp-schema", NULL, on_test_result, &t);
+        r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-qmp-schema", NULL, on_test_result, &t);
         if (r < 0) {
                 log_tests_skipped_errno(r, "QMP invoke failed (handshake or send)");
                 return;
@@ -153,7 +153,7 @@ TEST(qmp_client_qemu_handshake_and_schema) {
         qmp_test_result_done(&t);
 
         /* Clean shutdown */
-        ASSERT_OK(qmp_client_invoke(client, "quit", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "quit", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         qmp_test_result_done(&t);
@@ -197,7 +197,7 @@ TEST(qmp_client_qemu_query_status) {
 
         /* query-status validates response parsing against real QEMU output format.
          * The handshake completes transparently inside invoke(). */
-        r = qmp_client_invoke(client, "query-status", NULL, on_test_result, &t);
+        r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t);
         if (r < 0) {
                 log_tests_skipped_errno(r, "QMP invoke failed (handshake or send)");
                 return;
@@ -219,13 +219,13 @@ TEST(qmp_client_qemu_query_status) {
         qmp_test_result_done(&t);
 
         /* Test stop + cont to exercise command sequencing and id correlation */
-        ASSERT_OK(qmp_client_invoke(client, "stop", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "stop", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         qmp_test_result_done(&t);
 
         /* Verify status changed */
-        ASSERT_OK(qmp_client_invoke(client, "query-status", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         ASSERT_NOT_NULL(t.result);
@@ -236,13 +236,13 @@ TEST(qmp_client_qemu_query_status) {
 
         qmp_test_result_done(&t);
 
-        ASSERT_OK(qmp_client_invoke(client, "cont", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "cont", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         qmp_test_result_done(&t);
 
         /* Clean shutdown */
-        ASSERT_OK(qmp_client_invoke(client, "quit", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "quit", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         qmp_test_result_done(&t);
index c70be2c0f4ea207b826cf74d7ab9a18098401b0c..bbc3f152869534f0a08c82c24e8eecd4415487a7 100644 (file)
@@ -248,7 +248,7 @@ TEST(qmp_client_basic) {
         qmp_client_bind_event(client, test_event_callback, &event_received);
 
         /* Execute query-status */
-        ASSERT_OK(qmp_client_invoke(client, "query-status", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         ASSERT_NOT_NULL(t.result);
@@ -262,13 +262,13 @@ TEST(qmp_client_basic) {
         qmp_test_result_done(&t);
 
         /* Execute stop */
-        ASSERT_OK(qmp_client_invoke(client, "stop", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "stop", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         qmp_test_result_done(&t);
 
         /* Execute cont -- the STOP event should be dispatched by the IO callback */
-        ASSERT_OK(qmp_client_invoke(client, "cont", NULL, on_test_result, &t));
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "cont", NULL, on_test_result, &t));
         qmp_test_wait(event, &t);
         ASSERT_EQ(t.error, 0);
         qmp_test_result_done(&t);
@@ -320,7 +320,7 @@ TEST(qmp_client_eof) {
         /* Executing a command should fail with a disconnect error because the server
          * closed. The handshake may succeed or fail inside invoke() — either way the
          * invoke itself or the async callback should report a disconnect. */
-        r = qmp_client_invoke(client, "query-status", NULL, on_test_result, &t);
+        r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t);
         if (r < 0)
                 ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
         else {
@@ -430,7 +430,7 @@ TEST(qmp_client_first_invoke_with_fd) {
 
         /* THIS is the previously-broken pattern: very first invoke against the client,
          * carrying an fd, with the handshake still pending. */
-        ASSERT_OK(qmp_client_invoke(client, "add-fd",
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
                                     QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
                                     on_test_result, &t));
 
@@ -478,7 +478,7 @@ TEST(qmp_client_invoke_failure_closes_fds) {
         /* invoke must fail because the peer is gone. The TAKE_FD inside the macro
          * has already zeroed our local fd_to_pass; if invoke leaked the fd here,
          * the fd would stay open in our process. */
-        int r = qmp_client_invoke(client, "add-fd",
+        int r = qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
                                   QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
                                   on_test_result, &t);
         ASSERT_TRUE(r < 0);
@@ -493,6 +493,125 @@ TEST(qmp_client_invoke_failure_closes_fds) {
         ASSERT_EQ(errno, EBADF);
 }
 
+/* Mock for the slot lifecycle + cancel tests: greets, accepts capabilities, then accepts
+ * query-status and stop, replying with dummy returns. A cancelled query-status still gets
+ * sent on the wire (cancel merely removes the pending slot), so the server must be prepared
+ * to read and reply to it. */
+static _noreturn_ void mock_qmp_server_slot(int fd) {
+        _cleanup_(json_stream_done) JsonStream s = {};
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL;
+
+        mock_qmp_init(&s, fd);
+
+        mock_qmp_send_literal(&s,
+                "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
+
+        mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
+
+        ASSERT_OK(sd_json_buildo(
+                        &status_return,
+                        SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
+                        SD_JSON_BUILD_PAIR_STRING("status", "running")));
+        mock_qmp_expect_and_reply(&s, "query-status", status_return);
+
+        mock_qmp_expect_and_reply(&s, "stop", NULL);
+
+        _exit(EXIT_SUCCESS);
+}
+
+/* Verify that when qmp_client_invoke() returns a slot, qmp_slot_get_client() tracks the
+ * connection state: the client pointer is reported while the call is in flight, and flipped
+ * back to NULL once the reply has been dispatched. The caller must still be able to drop its
+ * ref safely after that. */
+TEST(qmp_client_invoke_slot_lifecycle) {
+        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
+        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+        _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL;
+        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
+        QmpTestResult t = {};
+        int qmp_fds[2];
+        int r;
+
+        ASSERT_OK(sd_event_new(&event));
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
+
+        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-slot-life)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
+        if (r == 0) {
+                safe_close(qmp_fds[0]);
+                mock_qmp_server_slot(qmp_fds[1]);
+        }
+        safe_close(qmp_fds[1]);
+
+        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
+        ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
+
+        ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, on_test_result, &t));
+
+        /* While in flight the slot still references its client. */
+        ASSERT_NOT_NULL(slot);
+        ASSERT_PTR_EQ(qmp_slot_get_client(slot), client);
+
+        qmp_test_wait(event, &t);
+        ASSERT_EQ(t.error, 0);
+        ASSERT_NOT_NULL(t.result);
+
+        /* Once dispatched, the slot is disconnected from the client but still owned by us. */
+        ASSERT_NULL(qmp_slot_get_client(slot));
+
+        qmp_test_result_done(&t);
+
+        /* Drop our ref explicitly (out of order w.r.t. cleanup) to exercise the
+         * already-disconnected path in qmp_slot_free(). */
+        slot = qmp_slot_unref(slot);
+        ASSERT_NULL(slot);
+}
+
+/* Verify that dropping the only reference on a pending slot before the reply arrives cancels
+ * the callback. The command is already enqueued on the stream at that point, so the server
+ * still sees it and replies — but the reply lands on an unknown id and is discarded. */
+TEST(qmp_client_invoke_slot_cancel) {
+        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
+        _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL;
+        QmpTestResult t_cancelled = {};
+        QmpSlot *slot = NULL;
+        int qmp_fds[2];
+        int r;
+
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
+
+        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-slot-cancel)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
+        if (r == 0) {
+                safe_close(qmp_fds[0]);
+                mock_qmp_server_slot(qmp_fds[1]);
+        }
+        safe_close(qmp_fds[1]);
+
+        /* Drive without an event loop so the subsequent qmp_client_call() owns all pumping;
+         * it serializes write→read round-trips, which avoids the mock server seeing the
+         * cancelled query-status and the follow-up stop concatenated into a single recv(). */
+        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
+
+        ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, on_test_result, &t_cancelled));
+        ASSERT_NOT_NULL(slot);
+
+        /* Drop our sole ref → slot disconnects itself from the client's pending set. The
+         * enqueued query-status is still on the wire; when its reply arrives, dispatch_reply
+         * won't find a matching slot and will log-and-discard it. */
+        slot = qmp_slot_unref(slot);
+        ASSERT_NULL(slot);
+
+        /* Synchronous call drives its own process+wait pump: it first drains the already-
+         * enqueued query-status write, consumes (and discards) its reply, then sends stop
+         * and waits for that reply. Any improper fire of the cancelled callback would have
+         * happened during that process() pass. */
+        ASSERT_EQ(qmp_client_call(client, "stop", NULL, NULL, NULL), 1);
+
+        /* The cancelled callback must never have fired. */
+        ASSERT_FALSE(t_cancelled.done);
+        ASSERT_NULL(t_cancelled.result);
+        ASSERT_NULL(t_cancelled.error_desc);
+}
+
 /* Drives a small wire dance for the sync call test: greeting, capabilities, one successful
  * command reply, and two error replies (one for the ret_error_desc path, one for the -EIO
  * path). */
index 4171772ee7c844b9a69fc9a575ecccc4376ef576..f69d2a5e7c64c5107fb3adc5e5d9ee58b11292b5 100644 (file)
@@ -126,7 +126,7 @@ static int qmp_fdset_add(QmpClient *qmp, int fd_consume, char **ret_path) {
         if (asprintf(&path, "/dev/fdset/%u", id) < 0)
                 return -ENOMEM;
 
-        r = qmp_client_invoke(qmp, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd)),
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd)),
                               on_qmp_setup_complete, (void*) "add-fd");
         if (r < 0)
                 return r;
@@ -221,7 +221,7 @@ static int qmp_add_file_node(QmpClient *qmp, const QmpFileNodeParams *p) {
         if (r < 0)
                 return r;
 
-        return qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "blockdev-add");
+        return qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "blockdev-add");
 }
 
 /* Get the virtual size of an image from the fd directly. For raw images the virtual size
@@ -320,7 +320,7 @@ static int on_ephemeral_create_concluded(QmpClient *qmp, void *userdata) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build overlay format JSON for '%s': %m", ctx->node_name);
 
-        r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add");
         if (r < 0)
                 return r;
 
@@ -339,7 +339,7 @@ static int on_ephemeral_create_concluded(QmpClient *qmp, void *userdata) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build device_add JSON for '%s': %m", ctx->node_name);
 
-        r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
         if (r < 0)
                 return r;
 
@@ -408,7 +408,7 @@ static int qmp_setup_ephemeral_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, D
         if (r < 0)
                 return r;
 
-        r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(base_fmt_args), on_qmp_setup_complete, (void*) "blockdev-add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(base_fmt_args), on_qmp_setup_complete, (void*) "blockdev-add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send blockdev-add for base format '%s': %m", drive->path);
 
@@ -424,7 +424,7 @@ static int qmp_setup_ephemeral_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, D
         if (r < 0)
                 return r;
 
-        r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(overlay_file_args), on_qmp_setup_complete, (void*) "blockdev-add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(overlay_file_args), on_qmp_setup_complete, (void*) "blockdev-add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send blockdev-add for overlay file '%s': %m", drive->path);
 
@@ -482,7 +482,7 @@ static int qmp_setup_ephemeral_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, D
 
         TAKE_PTR(ectx);
 
-        r = qmp_client_invoke(qmp, "blockdev-create", QMP_CLIENT_ARGS(cmd_args), on_qmp_setup_complete, (void*) "blockdev-create");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-create", QMP_CLIENT_ARGS(cmd_args), on_qmp_setup_complete, (void*) "blockdev-create");
         if (r < 0)
                 return log_error_errno(r, "Failed to send blockdev-create for '%s': %m", drive->path);
 
@@ -532,7 +532,7 @@ static int qmp_setup_regular_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, Dri
         if (r < 0)
                 return r;
 
-        r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send blockdev-add format for '%s': %m", drive->path);
 
@@ -542,7 +542,7 @@ static int qmp_setup_regular_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, Dri
         if (r < 0)
                 return r;
 
-        r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send device_add for '%s': %m", drive->path);
 
@@ -584,7 +584,7 @@ int vmspawn_qmp_setup_network(VmspawnQmpBridge *bridge, NetworkInfo *network) {
                 if (r < 0)
                         return log_error_errno(r, "Failed to build getfd JSON: %m");
 
-                r = qmp_client_invoke(qmp, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(network->fd)),
+                r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(network->fd)),
                                       on_qmp_setup_complete, (void*) "getfd");
                 if (r < 0)
                         return log_error_errno(r, "Failed to send getfd for TAP fd: %m");
@@ -606,7 +606,7 @@ int vmspawn_qmp_setup_network(VmspawnQmpBridge *bridge, NetworkInfo *network) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build netdev_add JSON: %m");
 
-        r = qmp_client_invoke(qmp, "netdev_add", QMP_CLIENT_ARGS(netdev_args), on_qmp_setup_complete, (void*) "netdev_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "netdev_add", QMP_CLIENT_ARGS(netdev_args), on_qmp_setup_complete, (void*) "netdev_add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send netdev_add: %m");
 
@@ -623,7 +623,7 @@ int vmspawn_qmp_setup_network(VmspawnQmpBridge *bridge, NetworkInfo *network) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build NIC device_add JSON: %m");
 
-        r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send NIC device_add: %m");
 
@@ -658,7 +658,7 @@ static int vmspawn_qmp_setup_one_virtiofs(QmpClient *qmp, const VirtiofsInfo *vf
         if (r < 0)
                 return log_error_errno(r, "Failed to build chardev-add JSON for '%s': %m", vfs->id);
 
-        r = qmp_client_invoke(qmp, "chardev-add", QMP_CLIENT_ARGS(chardev_args), on_qmp_setup_complete, (void*) "chardev-add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "chardev-add", QMP_CLIENT_ARGS(chardev_args), on_qmp_setup_complete, (void*) "chardev-add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send chardev-add '%s': %m", vfs->id);
 
@@ -675,7 +675,7 @@ static int vmspawn_qmp_setup_one_virtiofs(QmpClient *qmp, const VirtiofsInfo *vf
         if (r < 0)
                 return log_error_errno(r, "Failed to build virtiofs device_add JSON for '%s': %m", vfs->id);
 
-        r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send virtiofs device_add '%s': %m", vfs->id);
 
@@ -719,7 +719,7 @@ int vmspawn_qmp_setup_vsock(VmspawnQmpBridge *bridge, VsockInfo *vsock) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build getfd JSON for VSOCK: %m");
 
-        r = qmp_client_invoke(qmp, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(vsock->fd)),
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(vsock->fd)),
                               on_qmp_setup_complete, (void*) "getfd");
         if (r < 0)
                 return log_error_errno(r, "Failed to send getfd for VSOCK fd: %m");
@@ -736,7 +736,7 @@ int vmspawn_qmp_setup_vsock(VmspawnQmpBridge *bridge, VsockInfo *vsock) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build VSOCK device_add JSON: %m");
 
-        r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send VSOCK device_add: %m");
 
@@ -766,7 +766,7 @@ static int qmp_setup_scsi_controller(QmpClient *qmp, const char *pcie_port) {
         if (r < 0)
                 return log_error_errno(r, "Failed to build SCSI controller JSON: %m");
 
-        r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "device_add");
+        r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "device_add");
         if (r < 0)
                 return log_error_errno(r, "Failed to send SCSI controller device_add: %m");
 
@@ -909,8 +909,8 @@ static int on_io_uring_probe_add_reply(
         if (r < 0)
                 return r;
 
-        return qmp_client_invoke(c, "blockdev-del", QMP_CLIENT_ARGS(del_args),
-                        on_io_uring_probe_del_reply, bridge);
+        return qmp_client_invoke(c, /* ret_slot= */ NULL, "blockdev-del", QMP_CLIENT_ARGS(del_args),
+                                 on_io_uring_probe_del_reply, bridge);
 }
 
 static int probe_io_uring(QmpClient *c, VmspawnQmpBridge *bridge) {
@@ -930,8 +930,8 @@ static int probe_io_uring(QmpClient *c, VmspawnQmpBridge *bridge) {
         if (r < 0)
                 return r;
 
-        return qmp_client_invoke(c, "blockdev-add", QMP_CLIENT_ARGS(args),
-                        on_io_uring_probe_add_reply, bridge);
+        return qmp_client_invoke(c, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(args),
+                                 on_io_uring_probe_add_reply, bridge);
 }
 
 static int on_probe_schema_reply(
@@ -965,8 +965,8 @@ static int probe_schema(QmpClient *c, VmspawnQmpBridge *bridge) {
         assert(c);
         assert(bridge);
 
-        return qmp_client_invoke(c, "query-qmp-schema", QMP_CLIENT_ARGS(NULL),
-                        on_probe_schema_reply, bridge);
+        return qmp_client_invoke(c, /* ret_slot= */ NULL, "query-qmp-schema", QMP_CLIENT_ARGS(NULL),
+                                 on_probe_schema_reply, bridge);
 }
 
 int vmspawn_qmp_init(VmspawnQmpBridge **ret, int fd, sd_event *event) {
@@ -1055,5 +1055,5 @@ static int on_cont_complete(
 int vmspawn_qmp_start(VmspawnQmpBridge *bridge) {
         assert(bridge);
 
-        return qmp_client_invoke(bridge->qmp, "cont", /* args= */ NULL, on_cont_complete, /* userdata= */ NULL);
+        return qmp_client_invoke(bridge->qmp, /* ret_slot= */ NULL, "cont", /* args= */ NULL, on_cont_complete, /* userdata= */ NULL);
 }
index c73372e7a1a64e223d6f126fd6202f59bbad3630..4a11b6fd4e103ae1d5963c18f75b6594d394666e 100644 (file)
@@ -71,7 +71,7 @@ static int qmp_execute_varlink_async(
 
         sd_varlink_ref(link);
 
-        r = qmp_client_invoke(ctx->bridge->qmp, command, QMP_CLIENT_ARGS(arguments), callback, link);
+        r = qmp_client_invoke(ctx->bridge->qmp, /* ret_slot= */ NULL, command, QMP_CLIENT_ARGS(arguments), callback, link);
         if (r < 0)
                 sd_varlink_unref(link);
 
@@ -241,7 +241,7 @@ static int dispatch_pending_job(VmspawnQmpBridge *bridge, sd_json_variant *data)
         if (r < 0)
                 return sd_event_exit(qmp_client_get_event(bridge->qmp), r);
 
-        r = qmp_client_invoke(bridge->qmp, "job-dismiss", QMP_CLIENT_ARGS(dismiss_args),
+        r = qmp_client_invoke(bridge->qmp, /* ret_slot= */ NULL, "job-dismiss", QMP_CLIENT_ARGS(dismiss_args),
                               on_job_dismiss_complete, /* userdata= */ NULL);
         if (r < 0)
                 return sd_event_exit(qmp_client_get_event(bridge->qmp), r);