#include "string-util.h"
typedef enum QmpClientState {
- QMP_CLIENT_HANDSHAKE_INITIAL, /* waiting for QMP greeting */
- QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, /* greeting received, sending qmp_capabilities */
- QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT, /* waiting for qmp_capabilities response */
- QMP_CLIENT_RUNNING, /* connected, ready for commands */
+ QMP_CLIENT_RUNNING, /* connection alive; qmp_capabilities may still be in flight */
QMP_CLIENT_DISCONNECTED, /* connection closed */
_QMP_CLIENT_STATE_MAX,
_QMP_CLIENT_STATE_INVALID = -EINVAL,
} QmpClientState;
-/* States routed to dispatch_handshake. */
-#define QMP_CLIENT_STATE_IS_HANDSHAKE(s) \
- IN_SET(s, \
- QMP_CLIENT_HANDSHAKE_INITIAL, \
- QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, \
- QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT)
-
struct QmpSlot {
unsigned n_ref;
QmpClient *client; /* NULL once disconnected (reply dispatched, cancelled, or client died) */
}
/* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */
-static int qmp_client_dispatch_reply(QmpClient *c) {
+static int qmp_client_dispatch(QmpClient *c) {
sd_json_variant *result = NULL;
const char *desc = NULL;
uint64_t id;
return 1;
}
+ /* QEMU sends a one-shot greeting with a "QMP" key unsolicited on connect. We don't
+ * wait for it before sending qmp_capabilities (QEMU accepts commands the moment the
+ * socket is open), we detect it by the "QMP" key and drop it. */
+ if (sd_json_variant_by_key(c->current, "QMP")) {
+ qmp_client_clear_current(c);
+ return 1;
+ }
+
/* Command responses carry an "id" matching a request we sent */
r = qmp_extract_response_id(c->current, &id);
if (r < 0) {
return qmp_client_handle_disconnect(c);
}
-/* INITIAL → greeting → GREETING_RECEIVED → qmp_capabilities → CAPABILITIES_SENT → response → RUNNING. */
-static int qmp_client_dispatch_handshake(QmpClient *c) {
- int r;
-
- assert(c);
- assert(QMP_CLIENT_STATE_IS_HANDSHAKE(c->state));
-
- if (!c->current)
- return 0;
-
- /* Defensive: QEMU shouldn't emit events during capability negotiation, but if one
- * arrives, dispatch it as an event rather than mis-parsing it as a handshake reply. */
- if (sd_json_variant_by_key(c->current, "event")) {
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
- qmp_client_dispatch_event(c, v);
- return 1;
- }
-
- switch (c->state) {
-
- case QMP_CLIENT_HANDSHAKE_INITIAL: {
- /* Waiting for QMP greeting. Take ownership so by_key()'s borrowed pointer
- * stays valid through the case scope. */
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
- if (!sd_json_variant_by_key(v, "QMP"))
- return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO),
- "Expected QMP greeting, got something else");
-
- c->state = QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED;
-
- /* Fall through to immediately send capabilities */
- _fallthrough_;
- }
-
- case QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED: {
- /* Send qmp_capabilities command */
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL;
- r = sd_json_buildo(
- &cmd,
- SD_JSON_BUILD_PAIR_STRING("execute", "qmp_capabilities"),
- SD_JSON_BUILD_PAIR_UNSIGNED("id", c->next_id++));
- if (r < 0)
- return r;
-
- r = json_stream_enqueue(&c->stream, cmd);
- if (r < 0)
- return r;
-
- c->state = QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT;
- return 1;
- }
-
- case QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT: {
- /* Take ownership so desc (borrowed from v's "error.desc") survives the format string. */
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current);
- const char *desc = NULL;
- r = qmp_parse_response(v, /* ret_result= */ NULL, &desc);
- if (r < 0)
- return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO),
- "qmp_capabilities failed: %s", desc);
-
- c->state = QMP_CLIENT_RUNNING;
- return 1;
- }
-
- default:
- assert_not_reached();
- }
-}
-
-static int qmp_client_dispatch(QmpClient *c) {
- assert(c);
-
- if (!c->current)
- return 0;
-
- if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state))
- return qmp_client_dispatch_handshake(c);
-
- return qmp_client_dispatch_reply(c);
-}
-
/* Single step: write → dispatch → parse → read → disconnect. Matches sd_varlink_process(). */
int qmp_client_process(QmpClient *c) {
int r;
if (r != 0)
goto finish;
- /* 2. Dispatch — route based on state */
+ /* 2. Dispatch — dispatch incoming messages to slots */
r = qmp_client_dispatch(c);
if (r < 0)
json_stream_log_errno(&c->stream, r, "Failed to dispatch QMP message: %m");
if (c->current)
return JSON_STREAM_PHASE_OTHER;
- /* During handshake we're waiting for the greeting or qmp_capabilities response. */
- if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state))
- return JSON_STREAM_PHASE_AWAITING_REPLY;
-
- /* Running with pending async commands — waiting for their responses. */
- if (c->state == QMP_CLIENT_RUNNING && !set_isempty(c->slots))
- return JSON_STREAM_PHASE_AWAITING_REPLY;
-
- /* Running with no pending commands — waiting for unsolicited events. */
- if (c->state == QMP_CLIENT_RUNNING)
- return JSON_STREAM_PHASE_READING;
+ if (c->state != QMP_CLIENT_RUNNING)
+ return JSON_STREAM_PHASE_OTHER;
- return JSON_STREAM_PHASE_OTHER;
+ /* Pending slots (user commands or the initial qmp_capabilities) → awaiting reply.
+ * Otherwise we're idling for unsolicited events. */
+ return set_isempty(c->slots)
+ ? JSON_STREAM_PHASE_READING
+ : JSON_STREAM_PHASE_AWAITING_REPLY;
}
static int qmp_client_dispatch_cb(void *userdata) {
return 1;
}
-/* Drive handshake to completion. Matches sd-bus's bus_ensure_running(). */
-static int qmp_client_ensure_running(QmpClient *c) {
- int r;
-
- assert(c);
-
- if (c->state == QMP_CLIENT_RUNNING)
- return 1;
-
- for (;;) {
- if (c->state < 0 || c->state == QMP_CLIENT_DISCONNECTED)
- return -ENOTCONN;
-
- r = qmp_client_process(c);
- if (r < 0)
- return r;
- if (c->state == QMP_CLIENT_RUNNING)
- return 1;
- if (r > 0)
- continue;
-
- r = qmp_client_wait(c, USEC_INFINITY);
- if (r < 0)
- return r;
- }
-}
-
static void qmp_client_detach_event(QmpClient *c) {
if (!c)
return;
return 1;
}
+static int qmp_client_send(
+ QmpClient *c,
+ const char *command,
+ QmpClientArgs *args,
+ qmp_command_callback_t callback,
+ void *userdata,
+ QmpSlot **ret_slot);
+
+/* Reply callback for the eagerly-enqueued qmp_capabilities command. Success → we stay in
+ * RUNNING. Failure → negotiation is unrecoverable, force-disconnect so the next user op gets
+ * -ENOTCONN rather than hanging. */
+static int qmp_client_capabilities_reply(
+ QmpClient *c,
+ sd_json_variant *result,
+ const char *error_desc,
+ int error,
+ void *userdata) {
+
+ /* qmp_client_handle_disconnect() below fails all pending slots, which re-enters this
+ * callback with -ECONNRESET on our own still-registered slot. Short-circuit that. */
+ if (c->state == QMP_CLIENT_DISCONNECTED)
+ return 0;
+
+ if (error >= 0)
+ return 0;
+
+ json_stream_log_errno(&c->stream, error, "qmp_capabilities failed: %s", strna(error_desc));
+ qmp_client_handle_disconnect(c);
+ return 0;
+}
+
int qmp_client_connect_fd(QmpClient **ret, int fd) {
_cleanup_(qmp_client_unrefp) QmpClient *c = NULL;
int r;
*c = (QmpClient) {
.n_ref = 1,
- .state = QMP_CLIENT_HANDSHAKE_INITIAL,
+ .state = QMP_CLIENT_RUNNING,
.next_id = 1,
};
if (r < 0)
return r;
+ /* Eagerly queue qmp_capabilities. QEMU accepts commands as soon as the socket opens
+ * — its greeting is informational and doesn't gate writes on our side. FIFO ordering
+ * of the output queue guarantees cap precedes any user command a later invoke()
+ * enqueues, which is all QEMU actually requires. */
+ r = qmp_client_send(c, "qmp_capabilities", /* args= */ NULL,
+ qmp_client_capabilities_reply, /* userdata= */ NULL,
+ /* ret_slot= */ NULL);
+ if (r < 0)
+ return r;
+
*ret = TAKE_PTR(c);
return 0;
}
assert(c);
assert(command);
- r = qmp_client_ensure_running(c);
- if (r < 0)
- return r;
+ if (c->state == QMP_CLIENT_DISCONNECTED)
+ return -ENOTCONN;
r = qmp_client_build_command(c, command, args ? args->arguments : NULL, &cmd, &id);
if (r < 0)
ASSERT_EQ(si.si_status, EXIT_SUCCESS);
}
-/* Mock QMP server for the fd-on-first-invoke regression. Drives the wire dance:
- * greeting → (recv qmp_capabilities, expect 0 fds) → reply →
- * (recv add-fd, expect exactly 1 fd) → reply
- * Asserts the attached fd counts directly so a regression flips the child to
- * exit_failure and the parent test fails on the wait-for-terminate. */
-static _noreturn_ void mock_qmp_server_fd_first(int fd) {
+/* Mock QMP server for the fd-passing test. Drives the wire dance:
+ * greeting → recv qmp_capabilities → reply → recv add-fd → reply
+ * Asserts that exactly one SCM_RIGHTS fd arrives total across the two recvs. We can't
+ * require the fd to come attached to add-fd specifically: AF_UNIX coalesces the client's
+ * non-SCM cap sendmsg forward into the SCM-bearing add-fd sendmsg, so the fd may surface
+ * with either recv depending on kernel scheduling. QEMU's FIFO fd queue doesn't care. */
+static _noreturn_ void mock_qmp_server_fd(int fd) {
_cleanup_(json_stream_done) JsonStream s = {};
_cleanup_(sd_json_variant_unrefp) sd_json_variant *cap_cmd = NULL,
*addfd_cmd = NULL,
*addfd_reply = NULL;
mock_qmp_init(&s, fd);
- /* Accept SCM_RIGHTS on incoming messages so we can count how many fds the client
- * attaches to each sendmsg. */
ASSERT_OK(json_stream_set_allow_fd_passing_input(&s, true, /* with_sockopt= */ true));
/* Greeting */
mock_qmp_send_literal(&s,
"{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
- /* Receive qmp_capabilities — must arrive with NO fds attached. */
+ /* Receive qmp_capabilities (may or may not carry the fd depending on coalescing). */
mock_qmp_recv(&s, &cap_cmd);
- ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 0);
+ size_t n_fds_total = json_stream_get_n_input_fds(&s);
ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(cap_cmd, "execute")), "qmp_capabilities");
+ json_stream_close_input_fds(&s);
sd_json_variant *cap_id = ASSERT_NOT_NULL(sd_json_variant_by_key(cap_cmd, "id"));
ASSERT_OK(sd_json_buildo(
SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(cap_id))));
mock_qmp_send(&s, cap_reply);
- /* Receive add-fd — must arrive with EXACTLY ONE fd attached. */
+ /* Receive add-fd (fd may already have been consumed with cap's recv). */
mock_qmp_recv(&s, &addfd_cmd);
- ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 1);
+ n_fds_total += json_stream_get_n_input_fds(&s);
ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(addfd_cmd, "execute")), "add-fd");
json_stream_close_input_fds(&s);
+ ASSERT_EQ(n_fds_total, (size_t) 1);
+
sd_json_variant *addfd_id = ASSERT_NOT_NULL(sd_json_variant_by_key(addfd_cmd, "id"));
ASSERT_OK(sd_json_buildo(
&addfd_return,
_exit(EXIT_SUCCESS);
}
-/* Regression: pass an fd in the very first qmp_client_invoke() against a fresh client
- * (lazy-bootstrap state, handshake not yet done). The previous push_fd+invoke split would
- * stage the fd on the stream BEFORE qmp_client_ensure_running() drove the handshake; the
- * handshake's qmp_capabilities enqueue would then steal the staged fd onto its own
- * sendmsg. The new QmpClientArgs API stages fds inside invoke AFTER ensure_running, so
- * the fd lands on add-fd's sendmsg as it should. */
-TEST(qmp_client_first_invoke_with_fd) {
+/* End-to-end fd-passing through qmp_client_invoke() with QMP_CLIENT_ARGS_FD(): open a real
+ * fd, send add-fd, confirm the mock received a single SCM_RIGHTS fd and replied successfully. */
+TEST(qmp_client_invoke_with_fd) {
_cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
_cleanup_(sd_event_unrefp) sd_event *event = NULL;
_cleanup_(pidref_done) PidRef pid = PIDREF_NULL;
ASSERT_OK(sd_event_new(&event));
ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
- r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd-first)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
+ r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
if (r == 0) {
safe_close(qmp_fds[0]);
- mock_qmp_server_fd_first(qmp_fds[1]);
+ mock_qmp_server_fd(qmp_fds[1]);
}
safe_close(qmp_fds[1]);
ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL));
- /* Build add-fd args. The fdset-id value is irrelevant — the mock server only
- * cares that the fd arrived with the correct sendmsg. */
ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0)));
- /* THIS is the previously-broken pattern: very first invoke against the client,
- * carrying an fd, with the handshake still pending. */
ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
on_test_result, &t));
ASSERT_NOT_NULL(t.result);
qmp_test_result_done(&t);
- /* Wait for the mock server child. If it received fds in the wrong order it
- * exited via the test-assertion failure path and si.si_status will be non-zero. */
+ /* Wait for the mock. If its fd-count assertion tripped, si.si_status is non-zero. */
siginfo_t si = {};
ASSERT_OK(pidref_wait_for_terminate(&pid, &si));
ASSERT_EQ(si.si_code, CLD_EXITED);
ASSERT_EQ(si.si_status, EXIT_SUCCESS);
}
-/* Regression: when qmp_client_invoke() fails before stage_fds runs (e.g.
- * ensure_running() returns -ENOTCONN because the peer closed mid-handshake), the
- * caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() — must be
- * closed inside invoke. Otherwise they leak. */
+/* Regression: the caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() —
+ * must never leak, regardless of whether the invoke reaches the wire. Verified here via a
+ * dead peer: invoke enqueues (non-blocking), the queue item owns the fd, and client teardown
+ * must close it. */
TEST(qmp_client_invoke_failure_closes_fds) {
- _cleanup_(qmp_client_unrefp) QmpClient *client = NULL;
_cleanup_(sd_json_variant_unrefp) sd_json_variant *args = NULL;
_cleanup_close_ int fd_to_pass = -EBADF;
+ QmpClient *client = NULL;
QmpTestResult t = {};
int qmp_fds[2];
int saved_fd_value;
ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
- /* Close the peer end immediately so ensure_running()'s read sees EOF and
- * the client transitions straight to DISCONNECTED inside the first invoke. */
+ /* Close the peer end immediately so any write attempt sees EPIPE. */
safe_close(qmp_fds[1]);
- ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
- /* Deliberately do NOT attach to an event loop — invoke uses ensure_running()'s
- * synchronous process+wait pump for the handshake. */
-
fd_to_pass = open("/dev/null", O_RDWR|O_CLOEXEC);
ASSERT_OK(fd_to_pass);
saved_fd_value = fd_to_pass; /* remember the int value for the closed-check */
ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0)));
+ ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0]));
- /* invoke must fail because the peer is gone. The TAKE_FD inside the macro
- * has already zeroed our local fd_to_pass; if invoke leaked the fd here,
- * the fd would stay open in our process. */
- int r = qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
- QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
- on_test_result, &t);
- ASSERT_TRUE(r < 0);
- ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r));
+ /* invoke no longer blocks on the handshake — it just enqueues. The fd is now
+ * owned by the underlying JsonStream output queue. */
+ ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd",
+ QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)),
+ on_test_result, &t));
+ ASSERT_EQ(fd_to_pass, -EBADF); /* TAKE_FD cleared our local handle */
+
+ /* The fd is still open here (held in JsonStream's queue). */
+ ASSERT_OK_ERRNO(fcntl(saved_fd_value, F_GETFD));
- /* fd_to_pass should now be -EBADF (TAKE_FD'd) and the underlying kernel fd
- * should have been closed by the qmp_client_args_close_fds cleanup in
- * qmp_client_invoke(). fcntl on the old int returns EBADF only if the slot
- * is genuinely free. */
- ASSERT_EQ(fd_to_pass, -EBADF);
+ /* Client teardown (json_stream_done) must close queued output fds, otherwise the
+ * saved fd number would still be valid. */
+ client = qmp_client_unref(client);
ASSERT_EQ(fcntl(saved_fd_value, F_GETFD), -1);
ASSERT_EQ(errno, EBADF);
}