]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
test-qmp-client: run mock QMP servers as fibers on the shared event loop 39771/head
authorDaan De Meyer <daan@amutable.com>
Fri, 24 Apr 2026 09:49:10 +0000 (09:49 +0000)
committerDaan De Meyer <daan@amutable.com>
Thu, 21 May 2026 09:55:04 +0000 (09:55 +0000)
The mock servers used to be driven out-of-band: each test created a
socketpair, forked a child, ran a hand-coded request/response script
against the raw fd, and sent SIGTERM to tear it down. That worked but
required pidref/process-util/signal plumbing in every test, two
distinct execution contexts that couldn't share state, and a JsonStream
attached to the mock side that pretended to be event-loop-driven while
actually being driven manually via blocking reads.

Now that JsonStream suspends when on a fiber, the mocks can live
inside the same process and event loop as the client. Each mock is
rewritten as an sd-fiber that runs alongside the client fiber: so the
mock fiber yields on I/O and the event loop schedules the client in the
meantime. Both sides progress cooperatively, no fork/SIGTERM/PID tracking,
no manual phase tracking.

Two cleanups fall out of the rewrite:

- A QMP_TEST(name, mock_fn) { ... } macro encapsulates the per-test
  scaffolding (event loop, socketpair, mock fiber spawn, exit-on-idle
  shim) and injects an already-connected QmpClient *client into the
  test body. Each test now reads as a flat sequence of
  qmp_client_call() invocations against that client.

- Repeated mock command/reply scripting is factored into
  mock_qmp_expect(), mock_qmp_reply(), mock_qmp_expect_and_reply(),
  mock_qmp_handshake(), and mock_qmp_query_status_running(). The
  greeting JSON is built with sd_json_buildo() instead of being parsed
  from a literal.

The file shrinks from 756 to 494 lines, mostly through deletions.

src/test/test-qmp-client.c

index e5e7ed3b735b12306e0b93d180cb7f860d6df26f..948122720eb235e5c3d083c30a8e063c5d56b9ad 100644 (file)
@@ -1,33 +1,27 @@
 /* SPDX-License-Identifier: LGPL-2.1-or-later */
 
-#include <fcntl.h>
-#include <signal.h>
+#include <sys/eventfd.h>
 #include <sys/socket.h>
 
 #include "sd-event.h"
+#include "sd-future.h"
 #include "sd-json.h"
 
 #include "errno-util.h"
 #include "fd-util.h"
 #include "json-stream.h"
-#include "pidref.h"
-#include "process-util.h"
 #include "qmp-client.h"
 #include "string-util.h"
 #include "tests.h"
 
-/* Mock QMP server: runs in the child process of a fork, communicates via one end of a socketpair.
- * Uses JsonStream as the transport so framing (CRLF delimiter, message queuing, SCM_RIGHTS) is
- * handled the same way as on the client side — individual recv() syscalls may coalesce multiple
- * messages, and the parser must re-emit each one on its own. */
+/* Mock QMP server runs as an sd-fiber alongside the client on the same event loop. Its
+ * JsonStream uses the suspending json_stream_wait()/json_stream_flush() helpers, so the mock
+ * fiber yields whenever it's blocked on I/O and the client makes progress in the meantime. */
 
-/* We drive the stream manually via read/parse/wait; always report READING so json_stream_wait()
- * asks for POLLIN. */
 static JsonStreamPhase mock_qmp_phase(void *userdata) {
         return JSON_STREAM_PHASE_READING;
 }
 
-/* Never reached — we don't wire the mock stream up to sd-event — but required at init. */
 static int mock_qmp_dispatch(void *userdata) {
         return 0;
 }
@@ -43,9 +37,6 @@ static void mock_qmp_init(JsonStream *s, int fd) {
         ASSERT_OK(json_stream_connect_fd_pair(s, fd, fd));
 }
 
-/* Read one complete JSON message, blocking until available. Handles the case where multiple
- * client messages arrived coalesced into a single recv(): the parser walks the input buffer
- * one CRLF-delimited message at a time. */
 static void mock_qmp_recv(JsonStream *s, sd_json_variant **ret) {
         int r;
 
@@ -62,142 +53,137 @@ static void mock_qmp_recv(JsonStream *s, sd_json_variant **ret) {
         }
 }
 
-/* Enqueue one JSON variant and block until it has been fully written. */
 static void mock_qmp_send(JsonStream *s, sd_json_variant *v) {
         ASSERT_OK(json_stream_enqueue(s, v));
         ASSERT_OK(json_stream_flush(s));
 }
 
-/* Parse a literal JSON string and send it. Used for fixed greetings and unsolicited events. */
-static void mock_qmp_send_literal(JsonStream *s, const char *msg) {
+static void mock_qmp_send_greeting(JsonStream *s) {
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = NULL;
 
-        ASSERT_OK(sd_json_parse(msg, 0, &v, NULL, NULL));
+        ASSERT_OK(sd_json_buildo(&v,
+                SD_JSON_BUILD_PAIR("QMP", SD_JSON_BUILD_OBJECT(
+                        SD_JSON_BUILD_PAIR("version", SD_JSON_BUILD_OBJECT(
+                                SD_JSON_BUILD_PAIR("qemu", SD_JSON_BUILD_OBJECT(
+                                        SD_JSON_BUILD_PAIR_UNSIGNED("micro", 0),
+                                        SD_JSON_BUILD_PAIR_UNSIGNED("minor", 2),
+                                        SD_JSON_BUILD_PAIR_UNSIGNED("major", 9))))),
+                        SD_JSON_BUILD_PAIR("capabilities", SD_JSON_BUILD_STRV(STRV_MAKE("oob")))))));
         mock_qmp_send(s, v);
 }
 
-/* Read a command from the client, verify it contains the expected command name, and send a
- * reply carrying the same id. If reply_data is NULL, an empty return object is sent. */
-static void mock_qmp_expect_and_reply(JsonStream *s, const char *expected_command, sd_json_variant *reply_data) {
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *reply_obj = NULL, *response = NULL;
-
-        mock_qmp_recv(s, &cmd);
-
-        sd_json_variant *execute = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "execute"));
+/* Receive one command, assert it matches `expected_command`, return its id (borrowed from *cmd). */
+static sd_json_variant* mock_qmp_expect(JsonStream *s, const char *expected_command, sd_json_variant **cmd) {
+        mock_qmp_recv(s, cmd);
+        sd_json_variant *execute = ASSERT_NOT_NULL(sd_json_variant_by_key(*cmd, "execute"));
         ASSERT_STREQ(sd_json_variant_string(execute), expected_command);
+        return ASSERT_NOT_NULL(sd_json_variant_by_key(*cmd, "id"));
+}
 
-        sd_json_variant *id = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "id"));
+/* Send a reply for a previously-received command id. Passing NULL reply_data sends {}. */
+static void mock_qmp_reply(JsonStream *s, sd_json_variant *id, sd_json_variant *reply_data) {
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *empty = NULL, *response = NULL;
 
-        if (!reply_data)
-                ASSERT_OK(sd_json_variant_new_object(&reply_obj, NULL, 0));
+        if (!reply_data) {
+                ASSERT_OK(sd_json_build(&empty, SD_JSON_BUILD_EMPTY_OBJECT));
+                reply_data = empty;
+        }
 
-        ASSERT_OK(sd_json_buildo(
-                        &response,
-                        SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_VARIANT(reply_data ?: reply_obj)),
-                        SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
+        ASSERT_OK(sd_json_buildo(&response,
+                SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_VARIANT(reply_data)),
+                SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
 
         mock_qmp_send(s, response);
 }
 
-/* Same shape as mock_qmp_expect_and_reply() but replies with a QMP error object. */
-static void mock_qmp_expect_and_reply_error(JsonStream *s, const char *expected_command, const char *error_desc) {
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *error_obj = NULL, *response = NULL;
-
-        mock_qmp_recv(s, &cmd);
-
-        sd_json_variant *execute = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "execute"));
-        ASSERT_STREQ(sd_json_variant_string(execute), expected_command);
+static void mock_qmp_expect_and_reply(JsonStream *s, const char *expected_command, sd_json_variant *reply_data) {
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL;
+        mock_qmp_reply(s, mock_qmp_expect(s, expected_command, &cmd), reply_data);
+}
 
-        sd_json_variant *id = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "id"));
+static void mock_qmp_expect_and_reply_error(JsonStream *s, const char *expected_command, const char *error_desc) {
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *response = NULL;
+        sd_json_variant *id = mock_qmp_expect(s, expected_command, &cmd);
 
-        ASSERT_OK(sd_json_buildo(
-                        &error_obj,
+        ASSERT_OK(sd_json_buildo(&response,
+                SD_JSON_BUILD_PAIR("error", SD_JSON_BUILD_OBJECT(
                         SD_JSON_BUILD_PAIR_STRING("class", "GenericError"),
-                        SD_JSON_BUILD_PAIR_STRING("desc", error_desc)));
-
-        ASSERT_OK(sd_json_buildo(
-                        &response,
-                        SD_JSON_BUILD_PAIR("error", SD_JSON_BUILD_VARIANT(error_obj)),
-                        SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
+                        SD_JSON_BUILD_PAIR_STRING("desc", error_desc))),
+                SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
 
         mock_qmp_send(s, response);
 }
 
-static _noreturn_ void mock_qmp_server(int fd) {
-        _cleanup_(json_stream_done) JsonStream s = {};
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL;
-
-        mock_qmp_init(&s, fd);
+static void mock_qmp_handshake(JsonStream *s) {
+        mock_qmp_send_greeting(s);
+        mock_qmp_expect_and_reply(s, "qmp_capabilities", NULL);
+}
 
-        /* Send QMP greeting */
-        mock_qmp_send_literal(&s,
-                "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 2, \"major\": 9}}, \"capabilities\": [\"oob\"]}}");
+/* Reply to query-status with a running=true/status="running" payload. */
+static void mock_qmp_query_status_running(JsonStream *s) {
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = NULL;
 
-        /* Accept qmp_capabilities */
-        mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
+        ASSERT_OK(sd_json_buildo(&v,
+                SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
+                SD_JSON_BUILD_PAIR_STRING("status", "running")));
+        mock_qmp_expect_and_reply(s, "query-status", v);
+}
 
-        /* Accept query-status, reply with running state */
-        ASSERT_OK(sd_json_buildo(
-                        &status_return,
-                        SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
-                        SD_JSON_BUILD_PAIR_STRING("status", "running")));
-        mock_qmp_expect_and_reply(&s, "query-status", status_return);
+/* Drive a mock+client pair on a single event loop. The client fiber runs as userdata=client,
+ * the mock fiber as userdata=fd (the server-side socket). */
+static void run_qmp_test(sd_fiber_func_t mock_fn, sd_fiber_func_t client_fn) {
+        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+        _cleanup_(sd_future_unrefp) sd_future *client_f = NULL, *mock_f = NULL;
+        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
+        _cleanup_close_pair_ int qmp_fds[2] = EBADF_PAIR;
 
-        /* Accept stop */
-        mock_qmp_expect_and_reply(&s, "stop", NULL);
+        ASSERT_OK(sd_event_new(&event));
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
 
-        /* Send a STOP event */
-        mock_qmp_send_literal(&s,
-                "{\"event\": \"STOP\", \"timestamp\": {\"seconds\": 1234, \"microseconds\": 5678}}");
+        ASSERT_OK(qmp_client_connect_fd(&client, TAKE_FD(qmp_fds[0])));
+        ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
 
-        /* Accept cont */
-        mock_qmp_expect_and_reply(&s, "cont", NULL);
+        ASSERT_OK(sd_fiber_new(event, "mock", mock_fn, FD_TO_PTR(TAKE_FD(qmp_fds[1])), NULL, &mock_f));
+        ASSERT_OK(sd_fiber_new(event, "client", client_fn, client, NULL, &client_f));
 
-        /* json_stream_done() on cleanup closes our fd and signals EOF. */
-        _exit(EXIT_SUCCESS);
+        ASSERT_OK(sd_event_loop(event));
+        ASSERT_OK(sd_future_result(client_f));
+        ASSERT_OK(sd_future_result(mock_f));
 }
 
-/* Test helper: tracks an async QMP command result and signals completion. */
-typedef struct {
-        sd_json_variant *result;
-        char *error_desc;
-        int error;
-        bool done;
-} QmpTestResult;
-
-static int on_test_result(
-                QmpClient *client,
-                sd_json_variant *result,
-                const char *error_desc,
-                int error,
-                void *userdata) {
-
-        QmpTestResult *t = ASSERT_PTR(userdata);
-
-        t->error = error;
-        if (result)
-                t->result = sd_json_variant_ref(result);
-        if (error_desc)
-                t->error_desc = strdup(error_desc);
-        t->done = true;
-        return 0;
-}
+/* Define a test whose body runs as the client fiber on an event loop shared with `mock_fn`.
+ * The body receives `QmpClient *client` as its argument. */
+#define QMP_TEST(name, mock_fn)                                                \
+        static int test_##name##_body(QmpClient *client);                      \
+        static int test_##name##_fiber(void *userdata) {                       \
+                int r = test_##name##_body(userdata);                          \
+                ASSERT_OK(sd_event_exit(sd_fiber_get_event(), 0));             \
+                return r;                                                      \
+        }                                                                      \
+        TEST(name) {                                                           \
+                run_qmp_test(mock_fn, test_##name##_fiber);                    \
+        }                                                                      \
+        static int test_##name##_body(QmpClient *client)
+
+static int mock_qmp_basic_fiber(void *userdata) {
+        _cleanup_(json_stream_done) JsonStream s = {};
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *stop_event = NULL;
 
-/* Run the event loop until the test result callback fires. */
-static void qmp_test_wait(sd_event *event, QmpTestResult *t) {
-        assert(event);
-        assert(t);
+        mock_qmp_init(&s, PTR_TO_FD(userdata));
+        mock_qmp_handshake(&s);
 
-        while (!t->done)
-                ASSERT_OK(sd_event_run(event, UINT64_MAX));
-}
+        mock_qmp_query_status_running(&s);
+        mock_qmp_expect_and_reply(&s, "stop", NULL);
 
-static void qmp_test_result_done(QmpTestResult *t) {
-        assert(t);
+        ASSERT_OK(sd_json_buildo(&stop_event,
+                SD_JSON_BUILD_PAIR_STRING("event", "STOP"),
+                SD_JSON_BUILD_PAIR("timestamp", SD_JSON_BUILD_OBJECT(
+                        SD_JSON_BUILD_PAIR_UNSIGNED("seconds", 1234),
+                        SD_JSON_BUILD_PAIR_UNSIGNED("microseconds", 5678)))));
+        mock_qmp_send(&s, stop_event);
 
-        sd_json_variant_unref(t->result);
-        free(t->error_desc);
-        *t = (QmpTestResult) {};
+        mock_qmp_expect_and_reply(&s, "cont", NULL);
+        return 0;
 }
 
 static int test_event_callback(
@@ -208,516 +194,283 @@ static int test_event_callback(
 
         bool *event_received = ASSERT_PTR(userdata);
 
-        /* We may also receive a synthetic SHUTDOWN event when the mock server closes the connection;
-         * only validate the STOP event we actually care about. */
+        /* Ignore the synthetic SHUTDOWN emitted when the mock closes the connection. */
         if (streq(event, "STOP"))
                 *event_received = true;
 
         return 0;
 }
 
-TEST(qmp_client_basic) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
-        _cleanup_(pidref_done) PidRef pid = PIDREF_NULL;
-        QmpTestResult t = {};
-        sd_json_variant *running, *status;
-        int qmp_fds[2];
-        int r;
-
-        ASSERT_OK(sd_event_new(&event));
-
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
-
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
-
-        if (r == 0) {
-                safe_close(qmp_fds[0]);
-                mock_qmp_server(qmp_fds[1]);
-        }
-
-        safe_close(qmp_fds[1]);
-
-        /* Connect then attach to event loop — handshake completes transparently
-         * inside the first call()/invoke(). */
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
-        ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
-
-        /* Set event callback to catch STOP event during cont */
+QMP_TEST(qmp_client_basic, mock_qmp_basic_fiber) {
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
+        _cleanup_free_ char *error_desc = NULL;
         bool event_received = false;
+
         qmp_client_bind_event(client, test_event_callback, &event_received);
 
-        /* Execute query-status */
-        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t));
-        qmp_test_wait(event, &t);
-        ASSERT_EQ(t.error, 0);
-        ASSERT_NOT_NULL(t.result);
+        ASSERT_OK_POSITIVE(qmp_client_call(client, "query-status", NULL, &result, &error_desc));
+        ASSERT_NULL(error_desc);
 
-        running = ASSERT_NOT_NULL(sd_json_variant_by_key(t.result, "running"));
+        sd_json_variant *running = ASSERT_NOT_NULL(sd_json_variant_by_key(result, "running"));
         ASSERT_TRUE(sd_json_variant_boolean(running));
-
-        status = ASSERT_NOT_NULL(sd_json_variant_by_key(t.result, "status"));
+        sd_json_variant *status = ASSERT_NOT_NULL(sd_json_variant_by_key(result, "status"));
         ASSERT_STREQ(sd_json_variant_string(status), "running");
 
-        qmp_test_result_done(&t);
+        ASSERT_OK_POSITIVE(qmp_client_call(client, "stop", NULL, NULL, NULL));
+        ASSERT_OK_POSITIVE(qmp_client_call(client, "cont", NULL, NULL, NULL));
 
-        /* Execute stop */
-        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "stop", NULL, on_test_result, &t));
-        qmp_test_wait(event, &t);
-        ASSERT_EQ(t.error, 0);
-        qmp_test_result_done(&t);
+        ASSERT_TRUE(event_received);
+        return 0;
+}
 
-        /* Execute cont -- the STOP event should be dispatched by the IO callback */
-        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "cont", NULL, on_test_result, &t));
-        qmp_test_wait(event, &t);
-        ASSERT_EQ(t.error, 0);
-        qmp_test_result_done(&t);
+static int mock_qmp_eof_fiber(void *userdata) {
+        _cleanup_(json_stream_done) JsonStream s = {};
 
-        /* Verify the STOP event was received */
-        ASSERT_TRUE(event_received);
+        mock_qmp_init(&s, PTR_TO_FD(userdata));
+        mock_qmp_handshake(&s);
+        /* Return; _cleanup_ closes the fd → client sees EOF. */
+        return 0;
+}
 
-        /* Wait for child and verify clean exit */
-        siginfo_t si = {};
-        ASSERT_OK(pidref_wait_for_terminate(&pid, &si));
-        ASSERT_EQ(si.si_code, CLD_EXITED);
-        ASSERT_EQ(si.si_status, EXIT_SUCCESS);
+QMP_TEST(qmp_client_eof, mock_qmp_eof_fiber) {
+        int r = qmp_client_call(client, "query-status", NULL, NULL, NULL);
+        ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
+        return 0;
 }
 
-TEST(qmp_client_eof) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
-        _cleanup_(pidref_done) PidRef pid = PIDREF_NULL;
-        QmpTestResult t = {};
-        int qmp_fds[2];
-        int r;
+static int mock_qmp_call_fiber(void *userdata) {
+        _cleanup_(json_stream_done) JsonStream s = {};
 
-        ASSERT_OK(sd_event_new(&event));
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
+        mock_qmp_init(&s, PTR_TO_FD(userdata));
+        mock_qmp_handshake(&s);
+
+        mock_qmp_query_status_running(&s);
+        mock_qmp_expect_and_reply_error(&s, "stop", "not running");
+        mock_qmp_expect_and_reply_error(&s, "stop", "still not running");
+        return 0;
+}
+
+QMP_TEST(qmp_client_call, mock_qmp_call_fiber) {
+        _cleanup_(sd_future_cancel_wait_unrefp) sd_future *f = NULL;
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
+        _cleanup_free_ char *error_desc = NULL;
 
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-eof)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
+        /* Exercise qmp_client_call_future() + sd_fiber_await() + future_get_qmp_reply()
+         * directly — success path. */
+        ASSERT_OK(qmp_client_call_future(client, "query-status", NULL, &f));
+        ASSERT_OK(sd_fiber_await(f));
+        ASSERT_OK(sd_future_result(f));
+        ASSERT_OK(future_get_qmp_reply(f, &result, &error_desc));
 
-        if (r == 0) {
-                _cleanup_(json_stream_done) JsonStream s = {};
+        ASSERT_NULL(error_desc);
+        sd_json_variant *running = ASSERT_NOT_NULL(sd_json_variant_by_key(result, "running"));
+        ASSERT_TRUE(sd_json_variant_boolean(running));
+        sd_json_variant *status = ASSERT_NOT_NULL(sd_json_variant_by_key(result, "status"));
+        ASSERT_STREQ(sd_json_variant_string(status), "running");
 
-                safe_close(qmp_fds[0]);
-                mock_qmp_init(&s, qmp_fds[1]);
+        /* QMP-level error: future resolves with -EIO, sd_fiber_await() returns -EIO, and
+         * future_get_qmp_reply() also returns -EIO (mirroring future_get_bus_reply()) — with the
+         * detailed description captured via error_desc on top, and result left NULL. */
+        f = sd_future_unref(f);
+        result = sd_json_variant_unref(result);
+        error_desc = mfree(error_desc);
 
-                /* Send greeting and accept capabilities, then die */
-                mock_qmp_send_literal(&s,
-                        "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
+        ASSERT_OK(qmp_client_call_future(client, "stop", NULL, &f));
+        ASSERT_ERROR(sd_fiber_await(f), EIO);
+        ASSERT_ERROR(sd_future_result(f), EIO);
+        ASSERT_ERROR(future_get_qmp_reply(f, &result, &error_desc), EIO);
 
-                mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
+        ASSERT_NULL(result);
+        ASSERT_STREQ(error_desc, "not running");
 
-                /* _exit() closes our fd via kernel teardown, signalling EOF to the peer. */
-                _exit(EXIT_SUCCESS);
-        }
+        /* qmp_client_call() also surfaces QMP errors as -EIO, regardless of whether the caller
+         * passed ret_error_desc. */
+        ASSERT_ERROR(qmp_client_call(client, "stop", NULL, NULL, NULL), EIO);
+        return 0;
+}
 
-        safe_close(qmp_fds[1]);
+static int mock_qmp_call_disconnect_fiber(void *userdata) {
+        _cleanup_(json_stream_done) JsonStream s = {};
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *stop_cmd = NULL;
 
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
-        ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
+        mock_qmp_init(&s, PTR_TO_FD(userdata));
+        mock_qmp_handshake(&s);
 
-        /* Executing a command should fail with a disconnect error because the server
-         * closed. The handshake may succeed or fail inside invoke() — either way the
-         * invoke itself or the async callback should report a disconnect. */
-        r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t);
-        if (r < 0)
-                ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
-        else {
-                qmp_test_wait(event, &t);
-                ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(t.error));
-                qmp_test_result_done(&t);
-        }
+        /* Consume the stop command but don't reply — cleanup closes the fd and the client
+         * sees a disconnect while suspended. */
+        mock_qmp_recv(&s, &stop_cmd);
+        return 0;
+}
 
-        siginfo_t si = {};
-        ASSERT_OK(pidref_wait_for_terminate(&pid, &si));
-        ASSERT_EQ(si.si_code, CLD_EXITED);
-        ASSERT_EQ(si.si_status, EXIT_SUCCESS);
+QMP_TEST(qmp_client_call_disconnect, mock_qmp_call_disconnect_fiber) {
+        int r = qmp_client_call(client, "stop", NULL, NULL, NULL);
+        ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
+        return 0;
 }
 
-/* Mock QMP server for the fd-passing test. Drives the wire dance:
- *   greeting → recv qmp_capabilities → reply → recv add-fd → reply
- * Asserts that exactly one SCM_RIGHTS fd arrives total across the two recvs. We can't
- * require the fd to come attached to add-fd specifically: AF_UNIX coalesces the client's
- * non-SCM cap sendmsg forward into the SCM-bearing add-fd sendmsg, so the fd may surface
- * with either recv depending on kernel scheduling. QEMU's FIFO fd queue doesn't care. */
-static _noreturn_ void mock_qmp_server_fd(int fd) {
+static int mock_qmp_fd_fiber(void *userdata) {
         _cleanup_(json_stream_done) JsonStream s = {};
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *cap_cmd = NULL,
-                                                          *addfd_cmd = NULL,
-                                                          *cap_reply = NULL,
-                                                          *addfd_return = NULL,
-                                                          *addfd_reply = NULL;
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *cap_cmd = NULL, *addfd_cmd = NULL,
+                                                          *addfd_return = NULL;
 
-        mock_qmp_init(&s, fd);
-        ASSERT_OK(json_stream_set_allow_fd_passing_input(&s, true, /* with_sockopt= */ true));
+        mock_qmp_init(&s, PTR_TO_FD(userdata));
+        ASSERT_OK(json_stream_set_allow_fd_passing_input(&s, true, true));
 
-        /* Greeting */
-        mock_qmp_send_literal(&s,
-                "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
+        mock_qmp_send_greeting(&s);
 
-        /* Receive qmp_capabilities (may or may not carry the fd depending on coalescing). */
-        mock_qmp_recv(&s, &cap_cmd);
+        /* The fd may ride with either command depending on AF_UNIX coalescing; count across both. */
+        sd_json_variant *cap_id = mock_qmp_expect(&s, "qmp_capabilities", &cap_cmd);
         size_t n_fds_total = json_stream_get_n_input_fds(&s);
-        ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(cap_cmd, "execute")), "qmp_capabilities");
         json_stream_close_input_fds(&s);
+        mock_qmp_reply(&s, cap_id, NULL);
 
-        sd_json_variant *cap_id = ASSERT_NOT_NULL(sd_json_variant_by_key(cap_cmd, "id"));
-        ASSERT_OK(sd_json_buildo(
-                        &cap_reply,
-                        SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_EMPTY_OBJECT),
-                        SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(cap_id))));
-        mock_qmp_send(&s, cap_reply);
-
-        /* Receive add-fd (fd may already have been consumed with cap's recv). */
-        mock_qmp_recv(&s, &addfd_cmd);
+        sd_json_variant *addfd_id = mock_qmp_expect(&s, "add-fd", &addfd_cmd);
         n_fds_total += json_stream_get_n_input_fds(&s);
-        ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(addfd_cmd, "execute")), "add-fd");
         json_stream_close_input_fds(&s);
-
         ASSERT_EQ(n_fds_total, (size_t) 1);
 
-        sd_json_variant *addfd_id = ASSERT_NOT_NULL(sd_json_variant_by_key(addfd_cmd, "id"));
-        ASSERT_OK(sd_json_buildo(
-                        &addfd_return,
-                        SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0),
-                        SD_JSON_BUILD_PAIR_UNSIGNED("fd", 42)));
-        ASSERT_OK(sd_json_buildo(
-                        &addfd_reply,
-                        SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_VARIANT(addfd_return)),
-                        SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(addfd_id))));
-        mock_qmp_send(&s, addfd_reply);
-
-        _exit(EXIT_SUCCESS);
+        ASSERT_OK(sd_json_buildo(&addfd_return,
+                SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0),
+                SD_JSON_BUILD_PAIR_UNSIGNED("fd", 42)));
+        mock_qmp_reply(&s, addfd_id, addfd_return);
+        return 0;
 }
 
-/* End-to-end fd-passing through qmp_client_invoke() with QMP_CLIENT_ARGS_FD(): open a real
- * fd, send add-fd, confirm the mock received a single SCM_RIGHTS fd and replied successfully. */
-TEST(qmp_client_invoke_with_fd) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
-        _cleanup_(pidref_done) PidRef pid = PIDREF_NULL;
+QMP_TEST(qmp_client_invoke_with_fd, mock_qmp_fd_fiber) {
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *args = NULL;
         _cleanup_close_ int fd_to_pass = -EBADF;
-        QmpTestResult t = {};
-        int qmp_fds[2];
-        int r;
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
 
-        ASSERT_OK(sd_event_new(&event));
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
-
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
-
-        if (r == 0) {
-                safe_close(qmp_fds[0]);
-                mock_qmp_server_fd(qmp_fds[1]);
-        }
-
-        safe_close(qmp_fds[1]);
-
-        /* Open a real fd to pass — /dev/null is universally available. */
-        fd_to_pass = open("/dev/null", O_RDWR|O_CLOEXEC);
-        ASSERT_OK(fd_to_pass);
-
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
-        ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
+        fd_to_pass = ASSERT_OK_ERRNO(eventfd(0, EFD_CLOEXEC));
 
         ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0)));
 
-        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
-                                    QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
-                                    on_test_result, &t));
+        ASSERT_OK_POSITIVE(qmp_client_call(client, "add-fd",
+                                           QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
+                                           &result, NULL));
+        ASSERT_NOT_NULL(result);
+        return 0;
+}
+
+static int on_dead_peer_reply(
+                QmpClient *client,
+                sd_json_variant *result,
+                const char *error_desc,
+                int error,
+                void *userdata) {
 
-        qmp_test_wait(event, &t);
-        ASSERT_EQ(t.error, 0);
-        ASSERT_NOT_NULL(t.result);
-        qmp_test_result_done(&t);
+        bool *fired = ASSERT_PTR(userdata);
 
-        /* Wait for the mock. If its fd-count assertion tripped, si.si_status is non-zero. */
-        siginfo_t si = {};
-        ASSERT_OK(pidref_wait_for_terminate(&pid, &si));
-        ASSERT_EQ(si.si_code, CLD_EXITED);
-        ASSERT_EQ(si.si_status, EXIT_SUCCESS);
+        /* Peer was closed before the write hit the wire; expect a disconnect. */
+        ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(error));
+        *fired = true;
+        return 0;
 }
 
-/* Regression: the caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() —
- * must never leak, regardless of whether the invoke reaches the wire. Verified here via a
- * dead peer: invoke enqueues (non-blocking), the queue item owns the fd, and client teardown
- * must close it. */
+/* Verify caller-supplied fds passed through QMP_CLIENT_ARGS_FD() are closed on client teardown
+ * even when the peer is already dead: invoke enqueues, the queue item owns the fd, unref closes. */
 TEST(qmp_client_invoke_failure_closes_fds) {
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *args = NULL;
         _cleanup_close_ int fd_to_pass = -EBADF;
         QmpClient *client = NULL;
-        QmpTestResult t = {};
-        int qmp_fds[2];
+        _cleanup_close_pair_ int qmp_fds[2] = EBADF_PAIR;
         int saved_fd_value;
+        bool callback_fired = false;
 
         ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
+        qmp_fds[1] = safe_close(qmp_fds[1]);
 
-        /* Close the peer end immediately so any write attempt sees EPIPE. */
-        safe_close(qmp_fds[1]);
-
-        fd_to_pass = open("/dev/null", O_RDWR|O_CLOEXEC);
-        ASSERT_OK(fd_to_pass);
-        saved_fd_value = fd_to_pass;   /* remember the int value for the closed-check */
+        fd_to_pass = ASSERT_OK_ERRNO(eventfd(0, EFD_CLOEXEC));
+        saved_fd_value = fd_to_pass;
 
         ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0)));
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
+        ASSERT_OK(qmp_client_connect_fd(&client, TAKE_FD(qmp_fds[0])));
 
-        /* invoke no longer blocks on the handshake — it just enqueues. The fd is now
-         * owned by the underlying JsonStream output queue. */
-        ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
+        ASSERT_OK(qmp_client_invoke(client, NULL, "add-fd",
                                     QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
-                                    on_test_result, &t));
-        ASSERT_EQ(fd_to_pass, -EBADF);  /* TAKE_FD cleared our local handle */
-
-        /* The fd is still open here (held in JsonStream's queue). */
+                                    on_dead_peer_reply, &callback_fired));
+        ASSERT_EQ(fd_to_pass, -EBADF);
         ASSERT_OK_ERRNO(fcntl(saved_fd_value, F_GETFD));
 
-        /* Client teardown (json_stream_done) must close queued output fds, otherwise the
-         * saved fd number would still be valid. */
         client = qmp_client_unref(client);
-        ASSERT_EQ(fcntl(saved_fd_value, F_GETFD), -1);
-        ASSERT_EQ(errno, EBADF);
+        ASSERT_ERROR_ERRNO(fcntl(saved_fd_value, F_GETFD), EBADF);
+        ASSERT_TRUE(callback_fired);
 }
 
-/* Mock for the slot lifecycle + cancel tests: greets, accepts capabilities, then accepts
- * query-status and stop, replying with dummy returns. A cancelled query-status still gets
- * sent on the wire (cancel merely removes the pending slot), so the server must be prepared
- * to read and reply to it. */
-static _noreturn_ void mock_qmp_server_slot(int fd) {
+/* Shared mock for the two slot tests: the follow-up stop is what drives the event loop long
+ * enough to dispatch the query-status reply. */
+static int mock_qmp_slot_fiber(void *userdata) {
         _cleanup_(json_stream_done) JsonStream s = {};
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL;
-
-        mock_qmp_init(&s, fd);
 
-        mock_qmp_send_literal(&s,
-                "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
-
-        mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
-
-        ASSERT_OK(sd_json_buildo(
-                        &status_return,
-                        SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
-                        SD_JSON_BUILD_PAIR_STRING("status", "running")));
-        mock_qmp_expect_and_reply(&s, "query-status", status_return);
+        mock_qmp_init(&s, PTR_TO_FD(userdata));
+        mock_qmp_handshake(&s);
 
+        mock_qmp_query_status_running(&s);
         mock_qmp_expect_and_reply(&s, "stop", NULL);
-
-        _exit(EXIT_SUCCESS);
+        return 0;
 }
 
-/* Verify that when qmp_client_invoke() returns a slot, qmp_slot_get_client() tracks the
- * connection state: the client pointer is reported while the call is in flight, and flipped
- * back to NULL once the reply has been dispatched. The caller must still be able to drop its
- * ref safely after that. */
-TEST(qmp_client_invoke_slot_lifecycle) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
-        _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL;
-        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
-        QmpTestResult t = {};
-        int qmp_fds[2];
-        int r;
+static int nop_callback(
+                QmpClient *client,
+                sd_json_variant *result,
+                const char *error_desc,
+                int error,
+                void *userdata) {
 
-        ASSERT_OK(sd_event_new(&event));
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
+        return 0;
+}
 
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-slot-life)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
-        if (r == 0) {
-                safe_close(qmp_fds[0]);
-                mock_qmp_server_slot(qmp_fds[1]);
-        }
-        safe_close(qmp_fds[1]);
+/* Tripwire for the cancel test: if it fires, the cancel didn't do its job. */
+static int tripwire_callback(
+                QmpClient *client,
+                sd_json_variant *result,
+                const char *error_desc,
+                int error,
+                void *userdata) {
 
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
-        ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
+        bool *fired = ASSERT_PTR(userdata);
+        *fired = true;
+        return 0;
+}
 
-        ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, on_test_result, &t));
+QMP_TEST(qmp_client_invoke_slot_lifecycle, mock_qmp_slot_fiber) {
+        _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
 
-        /* While in flight the slot still references its client. */
-        ASSERT_NOT_NULL(slot);
+        ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, nop_callback, NULL));
         ASSERT_PTR_EQ(qmp_slot_get_client(slot), client);
 
-        qmp_test_wait(event, &t);
-        ASSERT_EQ(t.error, 0);
-        ASSERT_NOT_NULL(t.result);
+        /* Drive the loop via a follow-up stop; its suspending call lets both replies dispatch. */
+        ASSERT_OK_POSITIVE(qmp_client_call(client, "stop", NULL, NULL, NULL));
 
-        /* Once dispatched, the slot is disconnected from the client but still owned by us. */
+        /* After dispatch the slot is disconnected from the client but still owned by us. */
         ASSERT_NULL(qmp_slot_get_client(slot));
 
-        qmp_test_result_done(&t);
-
-        /* Drop our ref explicitly (out of order w.r.t. cleanup) to exercise the
-         * already-disconnected path in qmp_slot_free(). */
+        /* Explicit out-of-order unref exercises the already-disconnected path in qmp_slot_free(). */
         slot = qmp_slot_unref(slot);
-        ASSERT_NULL(slot);
+        return 0;
 }
 
-/* Verify that dropping the only reference on a pending slot before the reply arrives cancels
- * the callback. The command is already enqueued on the stream at that point, so the server
- * still sees it and replies — but the reply lands on an unknown id and is discarded. */
-TEST(qmp_client_invoke_slot_cancel) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL;
-        QmpTestResult t_cancelled = {};
+QMP_TEST(qmp_client_invoke_slot_cancel, mock_qmp_slot_fiber) {
         QmpSlot *slot = NULL;
-        int qmp_fds[2];
-        int r;
+        bool fired = false;
 
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
-
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-slot-cancel)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
-        if (r == 0) {
-                safe_close(qmp_fds[0]);
-                mock_qmp_server_slot(qmp_fds[1]);
-        }
-        safe_close(qmp_fds[1]);
-
-        /* Drive without an event loop so the subsequent qmp_client_call() owns all pumping;
-         * it serializes write→read round-trips, which avoids the mock server seeing the
-         * cancelled query-status and the follow-up stop concatenated into a single recv(). */
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
+        ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, tripwire_callback, &fired));
 
-        ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, on_test_result, &t_cancelled));
-        ASSERT_NOT_NULL(slot);
-
-        /* Drop our sole ref → slot disconnects itself from the client's pending set. The
-         * enqueued query-status is still on the wire; when its reply arrives, dispatch_reply
-         * won't find a matching slot and will log-and-discard it. */
+        /* Drop our sole ref → slot disconnects from the client's pending set. The enqueued
+         * query-status is still on the wire; its reply lands on an unknown id and is discarded. */
         slot = qmp_slot_unref(slot);
-        ASSERT_NULL(slot);
-
-        /* Synchronous call drives its own process+wait pump: it first drains the already-
-         * enqueued query-status write, consumes (and discards) its reply, then sends stop
-         * and waits for that reply. Any improper fire of the cancelled callback would have
-         * happened during that process() pass. */
-        ASSERT_EQ(qmp_client_call(client, "stop", NULL, NULL, NULL), 1);
-
-        /* The cancelled callback must never have fired. */
-        ASSERT_FALSE(t_cancelled.done);
-        ASSERT_NULL(t_cancelled.result);
-        ASSERT_NULL(t_cancelled.error_desc);
-}
-
-/* Drives a small wire dance for the sync call test: greeting, capabilities, one successful
- * command reply, and two error replies (one for the ret_error_desc path, one for the -EIO
- * path). */
-static _noreturn_ void mock_qmp_server_call(int fd) {
-        _cleanup_(json_stream_done) JsonStream s = {};
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL;
-
-        mock_qmp_init(&s, fd);
-
-        mock_qmp_send_literal(&s,
-                "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
-
-        mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
-
-        ASSERT_OK(sd_json_buildo(
-                        &status_return,
-                        SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
-                        SD_JSON_BUILD_PAIR_STRING("status", "running")));
-        mock_qmp_expect_and_reply(&s, "query-status", status_return);
-
-        mock_qmp_expect_and_reply_error(&s, "stop", "not running");
-        mock_qmp_expect_and_reply_error(&s, "stop", "still not running");
-
-        _exit(EXIT_SUCCESS);
-}
-
-TEST(qmp_client_call) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL;
-        int qmp_fds[2];
-        int r;
-
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
-
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-call)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
-        if (r == 0) {
-                safe_close(qmp_fds[0]);
-                mock_qmp_server_call(qmp_fds[1]);
-        }
-        safe_close(qmp_fds[1]);
-
-        /* qmp_client_call() drives its own process()+wait() pump, so no event loop needed. */
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
-
-        /* Successful call: borrowed result pointer is valid until the next call. */
-        sd_json_variant *result = NULL;
-        _cleanup_free_ char *error_desc = NULL;
-        ASSERT_EQ(qmp_client_call(client, "query-status", NULL, &result, &error_desc), 1);
-        ASSERT_NULL(error_desc);
-        ASSERT_NOT_NULL(result);
-
-        sd_json_variant *running = ASSERT_NOT_NULL(sd_json_variant_by_key(result, "running"));
-        ASSERT_TRUE(sd_json_variant_boolean(running));
-        sd_json_variant *status = ASSERT_NOT_NULL(sd_json_variant_by_key(result, "status"));
-        ASSERT_STREQ(sd_json_variant_string(status), "running");
-
-        /* QMP error with ret_error_desc provided: returns 1, result NULL, desc set. */
-        result = (sd_json_variant*) 0x1;  /* poison to catch lack-of-write */
-        free(error_desc);
-        ASSERT_EQ(qmp_client_call(client, "stop", NULL, &result, &error_desc), 1);
-        ASSERT_NULL(result);
-        ASSERT_STREQ(error_desc, "not running");
-
-        /* QMP error without ret_error_desc: surfaces as -EIO. */
-        ASSERT_EQ(qmp_client_call(client, "stop", NULL, NULL, NULL), -EIO);
-}
-
-/* Server variant for the sync-call disconnect test: greets, accepts capabilities, reads one
- * command without replying, then closes the socket so the client sees EOF mid-wait. */
-static _noreturn_ void mock_qmp_server_call_disconnect(int fd) {
-        _cleanup_(json_stream_done) JsonStream s = {};
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *stop_cmd = NULL;
-
-        mock_qmp_init(&s, fd);
-
-        mock_qmp_send_literal(&s,
-                "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
-
-        mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
-
-        /* Consume the stop command but don't reply — json_stream_done() on cleanup closes
-         * our fd, triggering EOF while the client is blocked in qmp_client_call()'s
-         * process+wait pump. */
-        mock_qmp_recv(&s, &stop_cmd);
 
-        _exit(EXIT_SUCCESS);
-}
-
-TEST(qmp_client_call_disconnect) {
-        _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
-        _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL;
-        int qmp_fds[2];
-        int r;
-
-        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
-
-        r = ASSERT_OK(pidref_safe_fork("(mock-qmp-call-disc)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
-        if (r == 0) {
-                safe_close(qmp_fds[0]);
-                mock_qmp_server_call_disconnect(qmp_fds[1]);
-        }
-        safe_close(qmp_fds[1]);
-
-        ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
+        ASSERT_OK_POSITIVE(qmp_client_call(client, "stop", NULL, NULL, NULL));
 
-        /* The server reads our stop command and closes without replying. qmp_client_call()
-         * is driving its own pump, so it must notice the EOF, transition to DISCONNECTED,
-         * and return a disconnect error rather than hanging. */
-        r = qmp_client_call(client, "stop", NULL, NULL, NULL);
-        ASSERT_TRUE(r < 0);
-        ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
+        ASSERT_FALSE(fired);
+        return 0;
 }
 
 TEST(qmp_schema_has_member) {
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *schema = NULL;
 
-        /* QEMU introspection uses opaque numeric type ids ("0", "1", ...) — only member names are
-         * the actual QAPI strings. Verify we walk all object entries and find the member by name. */
+        /* QEMU introspection uses opaque numeric type ids ("0", "1", ...); only member names
+         * are the real QAPI strings. Verify we walk all object entries to find members by name. */
         ASSERT_OK(sd_json_build(&schema,
                 SD_JSON_BUILD_ARRAY(
                         SD_JSON_BUILD_OBJECT(
@@ -747,10 +500,4 @@ TEST(qmp_schema_has_member) {
         ASSERT_FALSE(qmp_schema_has_member(NULL, "discard-no-unref"));
 }
 
-static int intro(void) {
-        /* Ignore SIGPIPE so that write() to a closed socket returns EPIPE instead of killing us */
-        ASSERT_TRUE(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
-        return 0;
-}
-
-DEFINE_TEST_MAIN_FULL(LOG_DEBUG, intro, NULL);
+DEFINE_TEST_MAIN(LOG_DEBUG);