]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
qmp-client: eagerly enqueue qmp_capabilities on connect, drop the handshake state...
authorDaan De Meyer <daan@amutable.com>
Fri, 24 Apr 2026 07:51:44 +0000 (07:51 +0000)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Fri, 24 Apr 2026 12:29:31 +0000 (14:29 +0200)
QEMU's QMP greeting is an unsolicited, informational, server-initiated advertisement
— it doesn't gate commands. The server accepts (and pipelines) commands the instant
the socket is open. We were using it as a trigger to build qmp_capabilities and
blocking callers via qmp_client_ensure_running() until the reply came back.

Build qmp_capabilities inside qmp_client_connect_fd() instead and let the JsonStream
output queue preserve FIFO ordering between it and any user command a subsequent
invoke() enqueues. That satisfies QEMU's only ordering requirement (cap must precede
other commands) without any blocking in the send path.

Fallout:
  - Collapse QmpClientState to {RUNNING, DISCONNECTED}. The three HANDSHAKE_* states
    and the QMP_CLIENT_STATE_IS_HANDSHAKE() macro go away.
  - Drop qmp_client_dispatch_handshake(); fold greeting-drop into dispatch_reply as
    a one-line shape check.
  - Drop qmp_client_ensure_running() and its qmp_client_send() call site. send()
    now only refuses when state == DISCONNECTED.
  - The qmp_capabilities reply lands on an ordinary slot whose callback logs a
    protocol-level error and force-disconnects if cap negotiation failed, matching
    the old EPROTO behaviour at the same observable boundary.
  - qmp_client_phase() no longer special-cases the old handshake states; it maps
    directly to READING / AWAITING_REPLY based on whether slots are in flight.

Test updates:
  - qmp_client_first_invoke_with_fd → qmp_client_invoke_with_fd. The scenario it
    was pinned to (push_fd+invoke staging order) has been structurally impossible
    since the QmpClientArgs rework in 8ad4adcb6f; eager-cap removes it a second
    way. The test now covers end-to-end fd-passing on the first invoke, accepting
    either recv carrying the single SCM_RIGHTS fd (AF_UNIX absorbs non-scm skbs
    forward into the next scm-bearing skb's recv, so the fd may surface with cap
    or add-fd depending on kernel scheduling — QEMU's FIFO fd queue handles either).
  - qmp_client_invoke_failure_closes_fds restructured around the new invariant:
    invoke no longer blocks and no longer returns ENOTCONN for a dead peer, so
    the fd-leak assertion moves to "still open while the JsonStream queue owns it,
    closed on client teardown" and the nested block is flattened into an explicit
    qmp_client_unref().

src/shared/qmp-client.c
src/test/test-qmp-client.c

index 45bc0d8dbd22e27d0fec8cc8a80ceb50a4e4ba8e..cc51259cd1290970827eb8831684104b18b30561 100644 (file)
 #include "string-util.h"
 
 typedef enum QmpClientState {
-        QMP_CLIENT_HANDSHAKE_INITIAL,           /* waiting for QMP greeting */
-        QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, /* greeting received, sending qmp_capabilities */
-        QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT, /* waiting for qmp_capabilities response */
-        QMP_CLIENT_RUNNING,                     /* connected, ready for commands */
+        QMP_CLIENT_RUNNING,                     /* connection alive; qmp_capabilities may still be in flight */
         QMP_CLIENT_DISCONNECTED,                /* connection closed */
         _QMP_CLIENT_STATE_MAX,
         _QMP_CLIENT_STATE_INVALID = -EINVAL,
 } QmpClientState;
 
-/* States routed to dispatch_handshake. */
-#define QMP_CLIENT_STATE_IS_HANDSHAKE(s)               \
-        IN_SET(s,                                      \
-               QMP_CLIENT_HANDSHAKE_INITIAL,           \
-               QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, \
-               QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT)
-
 struct QmpSlot {
         unsigned n_ref;
         QmpClient *client;  /* NULL once disconnected (reply dispatched, cancelled, or client died) */
@@ -280,7 +270,7 @@ static int qmp_client_build_command(
 }
 
 /* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */
-static int qmp_client_dispatch_reply(QmpClient *c) {
+static int qmp_client_dispatch(QmpClient *c) {
         sd_json_variant *result = NULL;
         const char *desc = NULL;
         uint64_t id;
@@ -298,6 +288,14 @@ static int qmp_client_dispatch_reply(QmpClient *c) {
                 return 1;
         }
 
+        /* QEMU sends a one-shot greeting with a "QMP" key unsolicited on connect. We don't
+         * wait for it before sending qmp_capabilities (QEMU accepts commands the moment the
+         * socket is open), we detect it by the "QMP" key and drop it. */
+        if (sd_json_variant_by_key(c->current, "QMP")) {
+                qmp_client_clear_current(c);
+                return 1;
+        }
+
         /* Command responses carry an "id" matching a request we sent */
         r = qmp_extract_response_id(c->current, &id);
         if (r < 0) {
@@ -423,88 +421,6 @@ static bool qmp_client_test_disconnect(QmpClient *c) {
         return qmp_client_handle_disconnect(c);
 }
 
-/* INITIAL → greeting → GREETING_RECEIVED → qmp_capabilities → CAPABILITIES_SENT → response → RUNNING. */
-static int qmp_client_dispatch_handshake(QmpClient *c) {
-        int r;
-
-        assert(c);
-        assert(QMP_CLIENT_STATE_IS_HANDSHAKE(c->state));
-
-        if (!c->current)
-                return 0;
-
-        /* Defensive: QEMU shouldn't emit events during capability negotiation, but if one
-         * arrives, dispatch it as an event rather than mis-parsing it as a handshake reply. */
-        if (sd_json_variant_by_key(c->current, "event")) {
-                _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
-                qmp_client_dispatch_event(c, v);
-                return 1;
-        }
-
-        switch (c->state) {
-
-        case QMP_CLIENT_HANDSHAKE_INITIAL: {
-                /* Waiting for QMP greeting. Take ownership so by_key()'s borrowed pointer
-                 * stays valid through the case scope. */
-                _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
-                if (!sd_json_variant_by_key(v, "QMP"))
-                        return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO),
-                                                     "Expected QMP greeting, got something else");
-
-                c->state = QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED;
-
-                /* Fall through to immediately send capabilities */
-                _fallthrough_;
-        }
-
-        case QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED: {
-                /* Send qmp_capabilities command */
-                _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL;
-                r = sd_json_buildo(
-                                &cmd,
-                                SD_JSON_BUILD_PAIR_STRING("execute", "qmp_capabilities"),
-                                SD_JSON_BUILD_PAIR_UNSIGNED("id", c->next_id++));
-                if (r < 0)
-                        return r;
-
-                r = json_stream_enqueue(&c->stream, cmd);
-                if (r < 0)
-                        return r;
-
-                c->state = QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT;
-                return 1;
-        }
-
-        case QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT: {
-                /* Take ownership so desc (borrowed from v's "error.desc") survives the format string. */
-                _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
-                const char *desc = NULL;
-                r = qmp_parse_response(v, /* ret_result= */ NULL, &desc);
-                if (r < 0)
-                        return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO),
-                                                     "qmp_capabilities failed: %s", desc);
-
-                c->state = QMP_CLIENT_RUNNING;
-                return 1;
-        }
-
-        default:
-                assert_not_reached();
-        }
-}
-
-static int qmp_client_dispatch(QmpClient *c) {
-        assert(c);
-
-        if (!c->current)
-                return 0;
-
-        if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state))
-                return qmp_client_dispatch_handshake(c);
-
-        return qmp_client_dispatch_reply(c);
-}
-
 /* Single step: write → dispatch → parse → read → disconnect. Matches sd_varlink_process(). */
 int qmp_client_process(QmpClient *c) {
         int r;
@@ -524,7 +440,7 @@ int qmp_client_process(QmpClient *c) {
         if (r != 0)
                 goto finish;
 
-        /* 2. Dispatch — route based on state */
+        /* 2. Dispatch — dispatch incoming messages to slots */
         r = qmp_client_dispatch(c);
         if (r < 0)
                 json_stream_log_errno(&c->stream, r, "Failed to dispatch QMP message: %m");
@@ -600,19 +516,14 @@ static JsonStreamPhase qmp_client_phase(void *userdata) {
         if (c->current)
                 return JSON_STREAM_PHASE_OTHER;
 
-        /* During handshake we're waiting for the greeting or qmp_capabilities response. */
-        if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state))
-                return JSON_STREAM_PHASE_AWAITING_REPLY;
-
-        /* Running with pending async commands — waiting for their responses. */
-        if (c->state == QMP_CLIENT_RUNNING && !set_isempty(c->slots))
-                return JSON_STREAM_PHASE_AWAITING_REPLY;
-
-        /* Running with no pending commands — waiting for unsolicited events. */
-        if (c->state == QMP_CLIENT_RUNNING)
-                return JSON_STREAM_PHASE_READING;
+        if (c->state != QMP_CLIENT_RUNNING)
+                return JSON_STREAM_PHASE_OTHER;
 
-        return JSON_STREAM_PHASE_OTHER;
+        /* Pending slots (user commands or the initial qmp_capabilities) → awaiting reply.
+         * Otherwise we're idling for unsolicited events. */
+        return set_isempty(c->slots)
+                        ? JSON_STREAM_PHASE_READING
+                        : JSON_STREAM_PHASE_AWAITING_REPLY;
 }
 
 static int qmp_client_dispatch_cb(void *userdata) {
@@ -630,33 +541,6 @@ static int qmp_client_defer_callback(sd_event_source *source, void *userdata) {
         return 1;
 }
 
-/* Drive handshake to completion. Matches sd-bus's bus_ensure_running(). */
-static int qmp_client_ensure_running(QmpClient *c) {
-        int r;
-
-        assert(c);
-
-        if (c->state == QMP_CLIENT_RUNNING)
-                return 1;
-
-        for (;;) {
-                if (c->state < 0 || c->state == QMP_CLIENT_DISCONNECTED)
-                        return -ENOTCONN;
-
-                r = qmp_client_process(c);
-                if (r < 0)
-                        return r;
-                if (c->state == QMP_CLIENT_RUNNING)
-                        return 1;
-                if (r > 0)
-                        continue;
-
-                r = qmp_client_wait(c, USEC_INFINITY);
-                if (r < 0)
-                        return r;
-        }
-}
-
 static void qmp_client_detach_event(QmpClient *c) {
         if (!c)
                 return;
@@ -712,6 +596,37 @@ static int qmp_client_quit_callback(sd_event_source *source, void *userdata) {
         return 1;
 }
 
+static int qmp_client_send(
+                QmpClient *c,
+                const char *command,
+                QmpClientArgs *args,
+                qmp_command_callback_t callback,
+                void *userdata,
+                QmpSlot **ret_slot);
+
+/* Reply callback for the eagerly-enqueued qmp_capabilities command. Success → we stay in
+ * RUNNING. Failure → negotiation is unrecoverable, force-disconnect so the next user op gets
+ * -ENOTCONN rather than hanging. */
+static int qmp_client_capabilities_reply(
+                QmpClient *c,
+                sd_json_variant *result,
+                const char *error_desc,
+                int error,
+                void *userdata) {
+
+        /* qmp_client_handle_disconnect() below fails all pending slots, which re-enters this
+         * callback with -ECONNRESET on our own still-registered slot. Short-circuit that. */
+        if (c->state == QMP_CLIENT_DISCONNECTED)
+                return 0;
+
+        if (error >= 0)
+                return 0;
+
+        json_stream_log_errno(&c->stream, error, "qmp_capabilities failed: %s", strna(error_desc));
+        qmp_client_handle_disconnect(c);
+        return 0;
+}
+
 int qmp_client_connect_fd(QmpClient **ret, int fd) {
         _cleanup_(qmp_client_unrefp) QmpClient *c = NULL;
         int r;
@@ -725,7 +640,7 @@ int qmp_client_connect_fd(QmpClient **ret, int fd) {
 
         *c = (QmpClient) {
                 .n_ref = 1,
-                .state = QMP_CLIENT_HANDSHAKE_INITIAL,
+                .state = QMP_CLIENT_RUNNING,
                 .next_id = 1,
         };
 
@@ -744,6 +659,16 @@ int qmp_client_connect_fd(QmpClient **ret, int fd) {
         if (r < 0)
                 return r;
 
+        /* Eagerly queue qmp_capabilities. QEMU accepts commands as soon as the socket opens
+         * — its greeting is informational and doesn't gate writes on our side. FIFO ordering
+         * of the output queue guarantees cap precedes any user command a later invoke()
+         * enqueues, which is all QEMU actually requires. */
+        r = qmp_client_send(c, "qmp_capabilities", /* args= */ NULL,
+                            qmp_client_capabilities_reply, /* userdata= */ NULL,
+                            /* ret_slot= */ NULL);
+        if (r < 0)
+                return r;
+
         *ret = TAKE_PTR(c);
         return 0;
 }
@@ -825,9 +750,8 @@ static int qmp_client_send(
         assert(c);
         assert(command);
 
-        r = qmp_client_ensure_running(c);
-        if (r < 0)
-                return r;
+        if (c->state == QMP_CLIENT_DISCONNECTED)
+                return -ENOTCONN;
 
         r = qmp_client_build_command(c, command, args ? args->arguments : NULL, &cmd, &id);
         if (r < 0)
index bbc3f152869534f0a08c82c24e8eecd4415487a7..befee024845880476931bfddb2ac27c06d5bf5ef 100644 (file)
@@ -335,12 +335,13 @@ TEST(qmp_client_eof) {
         ASSERT_EQ(si.si_status, EXIT_SUCCESS);
 }
 
-/* Mock QMP server for the fd-on-first-invoke regression. Drives the wire dance:
- *   greeting → (recv qmp_capabilities, expect 0 fds) → reply →
- *   (recv add-fd, expect exactly 1 fd) → reply
- * Asserts the attached fd counts directly so a regression flips the child to
- * exit_failure and the parent test fails on the wait-for-terminate. */
-static _noreturn_ void mock_qmp_server_fd_first(int fd) {
+/* Mock QMP server for the fd-passing test. Drives the wire dance:
+ *   greeting → recv qmp_capabilities → reply → recv add-fd → reply
+ * Asserts that exactly one SCM_RIGHTS fd arrives total across the two recvs. We can't
+ * require the fd to come attached to add-fd specifically: AF_UNIX coalesces the client's
+ * non-SCM cap sendmsg forward into the SCM-bearing add-fd sendmsg, so the fd may surface
+ * with either recv depending on kernel scheduling. QEMU's FIFO fd queue doesn't care. */
+static _noreturn_ void mock_qmp_server_fd(int fd) {
         _cleanup_(json_stream_done) JsonStream s = {};
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *cap_cmd = NULL,
                                                           *addfd_cmd = NULL,
@@ -349,18 +350,17 @@ static _noreturn_ void mock_qmp_server_fd_first(int fd) {
                                                           *addfd_reply = NULL;
 
         mock_qmp_init(&s, fd);
-        /* Accept SCM_RIGHTS on incoming messages so we can count how many fds the client
-         * attaches to each sendmsg. */
         ASSERT_OK(json_stream_set_allow_fd_passing_input(&s, true, /* with_sockopt= */ true));
 
         /* Greeting */
         mock_qmp_send_literal(&s,
                 "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
 
-        /* Receive qmp_capabilities — must arrive with NO fds attached. */
+        /* Receive qmp_capabilities (may or may not carry the fd depending on coalescing). */
         mock_qmp_recv(&s, &cap_cmd);
-        ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 0);
+        size_t n_fds_total = json_stream_get_n_input_fds(&s);
         ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(cap_cmd, "execute")), "qmp_capabilities");
+        json_stream_close_input_fds(&s);
 
         sd_json_variant *cap_id = ASSERT_NOT_NULL(sd_json_variant_by_key(cap_cmd, "id"));
         ASSERT_OK(sd_json_buildo(
@@ -369,12 +369,14 @@ static _noreturn_ void mock_qmp_server_fd_first(int fd) {
                         SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(cap_id))));
         mock_qmp_send(&s, cap_reply);
 
-        /* Receive add-fd — must arrive with EXACTLY ONE fd attached. */
+        /* Receive add-fd (fd may already have been consumed with cap's recv). */
         mock_qmp_recv(&s, &addfd_cmd);
-        ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 1);
+        n_fds_total += json_stream_get_n_input_fds(&s);
         ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(addfd_cmd, "execute")), "add-fd");
         json_stream_close_input_fds(&s);
 
+        ASSERT_EQ(n_fds_total, (size_t) 1);
+
         sd_json_variant *addfd_id = ASSERT_NOT_NULL(sd_json_variant_by_key(addfd_cmd, "id"));
         ASSERT_OK(sd_json_buildo(
                         &addfd_return,
@@ -389,13 +391,9 @@ static _noreturn_ void mock_qmp_server_fd_first(int fd) {
         _exit(EXIT_SUCCESS);
 }
 
-/* Regression: pass an fd in the very first qmp_client_invoke() against a fresh client
- * (lazy-bootstrap state, handshake not yet done). The previous push_fd+invoke split would
- * stage the fd on the stream BEFORE qmp_client_ensure_running() drove the handshake; the
- * handshake's qmp_capabilities enqueue would then steal the staged fd onto its own
- * sendmsg. The new QmpClientArgs API stages fds inside invoke AFTER ensure_running, so
- * the fd lands on add-fd's sendmsg as it should. */
-TEST(qmp_client_first_invoke_with_fd) {
+/* End-to-end fd-passing through qmp_client_invoke() with QMP_CLIENT_ARGS_FD(): open a real
+ * fd, send add-fd, confirm the mock received a single SCM_RIGHTS fd and replied successfully. */
+TEST(qmp_client_invoke_with_fd) {
         _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
         _cleanup_(sd_event_unrefp) sd_event *event = NULL;
         _cleanup_(pidref_done) PidRef pid = PIDREF_NULL;
@@ -408,11 +406,11 @@ TEST(qmp_client_first_invoke_with_fd) {
         ASSERT_OK(sd_event_new(&event));
         ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
 
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd-first)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
+        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
 
         if (r == 0) {
                 safe_close(qmp_fds[0]);
-                mock_qmp_server_fd_first(qmp_fds[1]);
+                mock_qmp_server_fd(qmp_fds[1]);
         }
 
         safe_close(qmp_fds[1]);
@@ -424,12 +422,8 @@ TEST(qmp_client_first_invoke_with_fd) {
         ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
         ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
 
-        /* Build add-fd args. The fdset-id value is irrelevant — the mock server only
-         * cares that the fd arrived with the correct sendmsg. */
         ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0)));
 
-        /* THIS is the previously-broken pattern: very first invoke against the client,
-         * carrying an fd, with the handshake still pending. */
         ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
                                     QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
                                     on_test_result, &t));
@@ -439,56 +433,50 @@ TEST(qmp_client_first_invoke_with_fd) {
         ASSERT_NOT_NULL(t.result);
         qmp_test_result_done(&t);
 
-        /* Wait for the mock server child. If it received fds in the wrong order it
-         * exited via the test-assertion failure path and si.si_status will be non-zero. */
+        /* Wait for the mock. If its fd-count assertion tripped, si.si_status is non-zero. */
         siginfo_t si = {};
         ASSERT_OK(pidref_wait_for_terminate(&pid, &si));
         ASSERT_EQ(si.si_code, CLD_EXITED);
         ASSERT_EQ(si.si_status, EXIT_SUCCESS);
 }
 
-/* Regression: when qmp_client_invoke() fails before stage_fds runs (e.g.
- * ensure_running() returns -ENOTCONN because the peer closed mid-handshake), the
- * caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() — must be
- * closed inside invoke. Otherwise they leak. */
+/* Regression: the caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() —
+ * must never leak, regardless of whether the invoke reaches the wire. Verified here via a
+ * dead peer: invoke enqueues (non-blocking), the queue item owns the fd, and client teardown
+ * must close it. */
 TEST(qmp_client_invoke_failure_closes_fds) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *args = NULL;
         _cleanup_close_ int fd_to_pass = -EBADF;
+        QmpClient *client = NULL;
         QmpTestResult t = {};
         int qmp_fds[2];
         int saved_fd_value;
 
         ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
 
-        /* Close the peer end immediately so ensure_running()'s read sees EOF and
-         * the client transitions straight to DISCONNECTED inside the first invoke. */
+        /* Close the peer end immediately so any write attempt sees EPIPE. */
         safe_close(qmp_fds[1]);
 
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
-        /* Deliberately do NOT attach to an event loop — invoke uses ensure_running()'s
-         * synchronous process+wait pump for the handshake. */
-
         fd_to_pass = open("/dev/null", O_RDWR|O_CLOEXEC);
         ASSERT_OK(fd_to_pass);
         saved_fd_value = fd_to_pass;   /* remember the int value for the closed-check */
 
         ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0)));
+        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
 
-        /* invoke must fail because the peer is gone. The TAKE_FD inside the macro
-         * has already zeroed our local fd_to_pass; if invoke leaked the fd here,
-         * the fd would stay open in our process. */
-        int r = qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
-                                  QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
-                                  on_test_result, &t);
-        ASSERT_TRUE(r < 0);
-        ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
+        /* invoke no longer blocks on the handshake — it just enqueues. The fd is now
+         * owned by the underlying JsonStream output queue. */
+        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
+                                    QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
+                                    on_test_result, &t));
+        ASSERT_EQ(fd_to_pass, -EBADF);  /* TAKE_FD cleared our local handle */
+
+        /* The fd is still open here (held in JsonStream's queue). */
+        ASSERT_OK_ERRNO(fcntl(saved_fd_value, F_GETFD));
 
-        /* fd_to_pass should now be -EBADF (TAKE_FD'd) and the underlying kernel fd
-         * should have been closed by the qmp_client_args_close_fds cleanup in
-         * qmp_client_invoke(). fcntl on the old int returns EBADF only if the slot
-         * is genuinely free. */
-        ASSERT_EQ(fd_to_pass, -EBADF);
+        /* Client teardown (json_stream_done) must close queued output fds, otherwise the
+         * saved fd number would still be valid. */
+        client = qmp_client_unref(client);
         ASSERT_EQ(fcntl(saved_fd_value, F_GETFD), -1);
         ASSERT_EQ(errno, EBADF);
 }