From: Daan De Meyer Date: Fri, 24 Apr 2026 07:51:44 +0000 (+0000) Subject: qmp-client: eagerly enqueue qmp_capabilities on connect, drop the handshake state... X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=466662c8bd360e96aabd8325afba82a710c4e02e;p=thirdparty%2Fsystemd.git qmp-client: eagerly enqueue qmp_capabilities on connect, drop the handshake state machine QEMU's QMP greeting is an unsolicited, informational, server-initiated advertisement — it doesn't gate commands. The server accepts (and pipelines) commands the instant the socket is open. We were using it as a trigger to build qmp_capabilities and blocking callers via qmp_client_ensure_running() until the reply came back. Build qmp_capabilities inside qmp_client_connect_fd() instead and let the JsonStream output queue preserve FIFO ordering between it and any user command a subsequent invoke() enqueues. That satisfies QEMU's only ordering requirement (cap must precede other commands) without any blocking in the send path. Fallout: - Collapse QmpClientState to {RUNNING, DISCONNECTED}. The three HANDSHAKE_* states and the QMP_CLIENT_STATE_IS_HANDSHAKE() macro go away. - Drop qmp_client_dispatch_handshake(); fold greeting-drop into dispatch_reply as a one-line shape check. - Drop qmp_client_ensure_running() and its qmp_client_send() call site. send() now only refuses when state == DISCONNECTED. - The qmp_capabilities reply lands on an ordinary slot whose callback logs a protocol-level error and force-disconnects if cap negotiation failed, matching the old EPROTO behaviour at the same observable boundary. - qmp_client_phase() no longer special-cases the old handshake states; it maps directly to READING / AWAITING_REPLY based on whether slots are in flight. Test updates: - qmp_client_first_invoke_with_fd → qmp_client_invoke_with_fd. The scenario it was pinned to (push_fd+invoke staging order) has been structurally impossible since the QmpClientArgs rework in 8ad4adcb6f; eager-cap removes it a second way. The test now covers end-to-end fd-passing on the first invoke, accepting either recv carrying the single SCM_RIGHTS fd (AF_UNIX absorbs non-scm skbs forward into the next scm-bearing skb's recv, so the fd may surface with cap or add-fd depending on kernel scheduling — QEMU's FIFO fd queue handles either). - qmp_client_invoke_failure_closes_fds restructured around the new invariant: invoke no longer blocks and no longer returns ENOTCONN for a dead peer, so the fd-leak assertion moves to "still open while the JsonStream queue owns it, closed on client teardown" and the nested block is flattened into an explicit qmp_client_unref(). --- diff --git a/src/shared/qmp-client.c b/src/shared/qmp-client.c index 45bc0d8dbd2..cc51259cd12 100644 --- a/src/shared/qmp-client.c +++ b/src/shared/qmp-client.c @@ -14,22 +14,12 @@ #include "string-util.h" typedef enum QmpClientState { - QMP_CLIENT_HANDSHAKE_INITIAL, /* waiting for QMP greeting */ - QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, /* greeting received, sending qmp_capabilities */ - QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT, /* waiting for qmp_capabilities response */ - QMP_CLIENT_RUNNING, /* connected, ready for commands */ + QMP_CLIENT_RUNNING, /* connection alive; qmp_capabilities may still be in flight */ QMP_CLIENT_DISCONNECTED, /* connection closed */ _QMP_CLIENT_STATE_MAX, _QMP_CLIENT_STATE_INVALID = -EINVAL, } QmpClientState; -/* States routed to dispatch_handshake. */ -#define QMP_CLIENT_STATE_IS_HANDSHAKE(s) \ - IN_SET(s, \ - QMP_CLIENT_HANDSHAKE_INITIAL, \ - QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, \ - QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT) - struct QmpSlot { unsigned n_ref; QmpClient *client; /* NULL once disconnected (reply dispatched, cancelled, or client died) */ @@ -280,7 +270,7 @@ static int qmp_client_build_command( } /* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */ -static int qmp_client_dispatch_reply(QmpClient *c) { +static int qmp_client_dispatch(QmpClient *c) { sd_json_variant *result = NULL; const char *desc = NULL; uint64_t id; @@ -298,6 +288,14 @@ static int qmp_client_dispatch_reply(QmpClient *c) { return 1; } + /* QEMU sends a one-shot greeting with a "QMP" key unsolicited on connect. We don't + * wait for it before sending qmp_capabilities (QEMU accepts commands the moment the + * socket is open), we detect it by the "QMP" key and drop it. */ + if (sd_json_variant_by_key(c->current, "QMP")) { + qmp_client_clear_current(c); + return 1; + } + /* Command responses carry an "id" matching a request we sent */ r = qmp_extract_response_id(c->current, &id); if (r < 0) { @@ -423,88 +421,6 @@ static bool qmp_client_test_disconnect(QmpClient *c) { return qmp_client_handle_disconnect(c); } -/* INITIAL → greeting → GREETING_RECEIVED → qmp_capabilities → CAPABILITIES_SENT → response → RUNNING. */ -static int qmp_client_dispatch_handshake(QmpClient *c) { - int r; - - assert(c); - assert(QMP_CLIENT_STATE_IS_HANDSHAKE(c->state)); - - if (!c->current) - return 0; - - /* Defensive: QEMU shouldn't emit events during capability negotiation, but if one - * arrives, dispatch it as an event rather than mis-parsing it as a handshake reply. */ - if (sd_json_variant_by_key(c->current, "event")) { - _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); - qmp_client_dispatch_event(c, v); - return 1; - } - - switch (c->state) { - - case QMP_CLIENT_HANDSHAKE_INITIAL: { - /* Waiting for QMP greeting. Take ownership so by_key()'s borrowed pointer - * stays valid through the case scope. */ - _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); - if (!sd_json_variant_by_key(v, "QMP")) - return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO), - "Expected QMP greeting, got something else"); - - c->state = QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED; - - /* Fall through to immediately send capabilities */ - _fallthrough_; - } - - case QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED: { - /* Send qmp_capabilities command */ - _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL; - r = sd_json_buildo( - &cmd, - SD_JSON_BUILD_PAIR_STRING("execute", "qmp_capabilities"), - SD_JSON_BUILD_PAIR_UNSIGNED("id", c->next_id++)); - if (r < 0) - return r; - - r = json_stream_enqueue(&c->stream, cmd); - if (r < 0) - return r; - - c->state = QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT; - return 1; - } - - case QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT: { - /* Take ownership so desc (borrowed from v's "error.desc") survives the format string. */ - _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); - const char *desc = NULL; - r = qmp_parse_response(v, /* ret_result= */ NULL, &desc); - if (r < 0) - return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO), - "qmp_capabilities failed: %s", desc); - - c->state = QMP_CLIENT_RUNNING; - return 1; - } - - default: - assert_not_reached(); - } -} - -static int qmp_client_dispatch(QmpClient *c) { - assert(c); - - if (!c->current) - return 0; - - if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state)) - return qmp_client_dispatch_handshake(c); - - return qmp_client_dispatch_reply(c); -} - /* Single step: write → dispatch → parse → read → disconnect. Matches sd_varlink_process(). */ int qmp_client_process(QmpClient *c) { int r; @@ -524,7 +440,7 @@ int qmp_client_process(QmpClient *c) { if (r != 0) goto finish; - /* 2. Dispatch — route based on state */ + /* 2. Dispatch — dispatch incoming messages to slots */ r = qmp_client_dispatch(c); if (r < 0) json_stream_log_errno(&c->stream, r, "Failed to dispatch QMP message: %m"); @@ -600,19 +516,14 @@ static JsonStreamPhase qmp_client_phase(void *userdata) { if (c->current) return JSON_STREAM_PHASE_OTHER; - /* During handshake we're waiting for the greeting or qmp_capabilities response. */ - if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state)) - return JSON_STREAM_PHASE_AWAITING_REPLY; - - /* Running with pending async commands — waiting for their responses. */ - if (c->state == QMP_CLIENT_RUNNING && !set_isempty(c->slots)) - return JSON_STREAM_PHASE_AWAITING_REPLY; - - /* Running with no pending commands — waiting for unsolicited events. */ - if (c->state == QMP_CLIENT_RUNNING) - return JSON_STREAM_PHASE_READING; + if (c->state != QMP_CLIENT_RUNNING) + return JSON_STREAM_PHASE_OTHER; - return JSON_STREAM_PHASE_OTHER; + /* Pending slots (user commands or the initial qmp_capabilities) → awaiting reply. + * Otherwise we're idling for unsolicited events. */ + return set_isempty(c->slots) + ? JSON_STREAM_PHASE_READING + : JSON_STREAM_PHASE_AWAITING_REPLY; } static int qmp_client_dispatch_cb(void *userdata) { @@ -630,33 +541,6 @@ static int qmp_client_defer_callback(sd_event_source *source, void *userdata) { return 1; } -/* Drive handshake to completion. Matches sd-bus's bus_ensure_running(). */ -static int qmp_client_ensure_running(QmpClient *c) { - int r; - - assert(c); - - if (c->state == QMP_CLIENT_RUNNING) - return 1; - - for (;;) { - if (c->state < 0 || c->state == QMP_CLIENT_DISCONNECTED) - return -ENOTCONN; - - r = qmp_client_process(c); - if (r < 0) - return r; - if (c->state == QMP_CLIENT_RUNNING) - return 1; - if (r > 0) - continue; - - r = qmp_client_wait(c, USEC_INFINITY); - if (r < 0) - return r; - } -} - static void qmp_client_detach_event(QmpClient *c) { if (!c) return; @@ -712,6 +596,37 @@ static int qmp_client_quit_callback(sd_event_source *source, void *userdata) { return 1; } +static int qmp_client_send( + QmpClient *c, + const char *command, + QmpClientArgs *args, + qmp_command_callback_t callback, + void *userdata, + QmpSlot **ret_slot); + +/* Reply callback for the eagerly-enqueued qmp_capabilities command. Success → we stay in + * RUNNING. Failure → negotiation is unrecoverable, force-disconnect so the next user op gets + * -ENOTCONN rather than hanging. */ +static int qmp_client_capabilities_reply( + QmpClient *c, + sd_json_variant *result, + const char *error_desc, + int error, + void *userdata) { + + /* qmp_client_handle_disconnect() below fails all pending slots, which re-enters this + * callback with -ECONNRESET on our own still-registered slot. Short-circuit that. */ + if (c->state == QMP_CLIENT_DISCONNECTED) + return 0; + + if (error >= 0) + return 0; + + json_stream_log_errno(&c->stream, error, "qmp_capabilities failed: %s", strna(error_desc)); + qmp_client_handle_disconnect(c); + return 0; +} + int qmp_client_connect_fd(QmpClient **ret, int fd) { _cleanup_(qmp_client_unrefp) QmpClient *c = NULL; int r; @@ -725,7 +640,7 @@ int qmp_client_connect_fd(QmpClient **ret, int fd) { *c = (QmpClient) { .n_ref = 1, - .state = QMP_CLIENT_HANDSHAKE_INITIAL, + .state = QMP_CLIENT_RUNNING, .next_id = 1, }; @@ -744,6 +659,16 @@ int qmp_client_connect_fd(QmpClient **ret, int fd) { if (r < 0) return r; + /* Eagerly queue qmp_capabilities. QEMU accepts commands as soon as the socket opens + * — its greeting is informational and doesn't gate writes on our side. FIFO ordering + * of the output queue guarantees cap precedes any user command a later invoke() + * enqueues, which is all QEMU actually requires. */ + r = qmp_client_send(c, "qmp_capabilities", /* args= */ NULL, + qmp_client_capabilities_reply, /* userdata= */ NULL, + /* ret_slot= */ NULL); + if (r < 0) + return r; + *ret = TAKE_PTR(c); return 0; } @@ -825,9 +750,8 @@ static int qmp_client_send( assert(c); assert(command); - r = qmp_client_ensure_running(c); - if (r < 0) - return r; + if (c->state == QMP_CLIENT_DISCONNECTED) + return -ENOTCONN; r = qmp_client_build_command(c, command, args ? args->arguments : NULL, &cmd, &id); if (r < 0) diff --git a/src/test/test-qmp-client.c b/src/test/test-qmp-client.c index bbc3f152869..befee024845 100644 --- a/src/test/test-qmp-client.c +++ b/src/test/test-qmp-client.c @@ -335,12 +335,13 @@ TEST(qmp_client_eof) { ASSERT_EQ(si.si_status, EXIT_SUCCESS); } -/* Mock QMP server for the fd-on-first-invoke regression. Drives the wire dance: - * greeting → (recv qmp_capabilities, expect 0 fds) → reply → - * (recv add-fd, expect exactly 1 fd) → reply - * Asserts the attached fd counts directly so a regression flips the child to - * exit_failure and the parent test fails on the wait-for-terminate. */ -static _noreturn_ void mock_qmp_server_fd_first(int fd) { +/* Mock QMP server for the fd-passing test. Drives the wire dance: + * greeting → recv qmp_capabilities → reply → recv add-fd → reply + * Asserts that exactly one SCM_RIGHTS fd arrives total across the two recvs. We can't + * require the fd to come attached to add-fd specifically: AF_UNIX coalesces the client's + * non-SCM cap sendmsg forward into the SCM-bearing add-fd sendmsg, so the fd may surface + * with either recv depending on kernel scheduling. QEMU's FIFO fd queue doesn't care. */ +static _noreturn_ void mock_qmp_server_fd(int fd) { _cleanup_(json_stream_done) JsonStream s = {}; _cleanup_(sd_json_variant_unrefp) sd_json_variant *cap_cmd = NULL, *addfd_cmd = NULL, @@ -349,18 +350,17 @@ static _noreturn_ void mock_qmp_server_fd_first(int fd) { *addfd_reply = NULL; mock_qmp_init(&s, fd); - /* Accept SCM_RIGHTS on incoming messages so we can count how many fds the client - * attaches to each sendmsg. */ ASSERT_OK(json_stream_set_allow_fd_passing_input(&s, true, /* with_sockopt= */ true)); /* Greeting */ mock_qmp_send_literal(&s, "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}"); - /* Receive qmp_capabilities — must arrive with NO fds attached. */ + /* Receive qmp_capabilities (may or may not carry the fd depending on coalescing). */ mock_qmp_recv(&s, &cap_cmd); - ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 0); + size_t n_fds_total = json_stream_get_n_input_fds(&s); ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(cap_cmd, "execute")), "qmp_capabilities"); + json_stream_close_input_fds(&s); sd_json_variant *cap_id = ASSERT_NOT_NULL(sd_json_variant_by_key(cap_cmd, "id")); ASSERT_OK(sd_json_buildo( @@ -369,12 +369,14 @@ static _noreturn_ void mock_qmp_server_fd_first(int fd) { SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(cap_id)))); mock_qmp_send(&s, cap_reply); - /* Receive add-fd — must arrive with EXACTLY ONE fd attached. */ + /* Receive add-fd (fd may already have been consumed with cap's recv). */ mock_qmp_recv(&s, &addfd_cmd); - ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 1); + n_fds_total += json_stream_get_n_input_fds(&s); ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(addfd_cmd, "execute")), "add-fd"); json_stream_close_input_fds(&s); + ASSERT_EQ(n_fds_total, (size_t) 1); + sd_json_variant *addfd_id = ASSERT_NOT_NULL(sd_json_variant_by_key(addfd_cmd, "id")); ASSERT_OK(sd_json_buildo( &addfd_return, @@ -389,13 +391,9 @@ static _noreturn_ void mock_qmp_server_fd_first(int fd) { _exit(EXIT_SUCCESS); } -/* Regression: pass an fd in the very first qmp_client_invoke() against a fresh client - * (lazy-bootstrap state, handshake not yet done). The previous push_fd+invoke split would - * stage the fd on the stream BEFORE qmp_client_ensure_running() drove the handshake; the - * handshake's qmp_capabilities enqueue would then steal the staged fd onto its own - * sendmsg. The new QmpClientArgs API stages fds inside invoke AFTER ensure_running, so - * the fd lands on add-fd's sendmsg as it should. */ -TEST(qmp_client_first_invoke_with_fd) { +/* End-to-end fd-passing through qmp_client_invoke() with QMP_CLIENT_ARGS_FD(): open a real + * fd, send add-fd, confirm the mock received a single SCM_RIGHTS fd and replied successfully. */ +TEST(qmp_client_invoke_with_fd) { _cleanup_(qmp_client_unrefp) QmpClient *client = NULL; _cleanup_(sd_event_unrefp) sd_event *event = NULL; _cleanup_(pidref_done) PidRef pid = PIDREF_NULL; @@ -408,11 +406,11 @@ TEST(qmp_client_first_invoke_with_fd) { ASSERT_OK(sd_event_new(&event)); ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds)); - r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd-first)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid)); + r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid)); if (r == 0) { safe_close(qmp_fds[0]); - mock_qmp_server_fd_first(qmp_fds[1]); + mock_qmp_server_fd(qmp_fds[1]); } safe_close(qmp_fds[1]); @@ -424,12 +422,8 @@ TEST(qmp_client_first_invoke_with_fd) { ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0])); ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL)); - /* Build add-fd args. The fdset-id value is irrelevant — the mock server only - * cares that the fd arrived with the correct sendmsg. */ ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0))); - /* THIS is the previously-broken pattern: very first invoke against the client, - * carrying an fd, with the handshake still pending. */ ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)), on_test_result, &t)); @@ -439,56 +433,50 @@ TEST(qmp_client_first_invoke_with_fd) { ASSERT_NOT_NULL(t.result); qmp_test_result_done(&t); - /* Wait for the mock server child. If it received fds in the wrong order it - * exited via the test-assertion failure path and si.si_status will be non-zero. */ + /* Wait for the mock. If its fd-count assertion tripped, si.si_status is non-zero. */ siginfo_t si = {}; ASSERT_OK(pidref_wait_for_terminate(&pid, &si)); ASSERT_EQ(si.si_code, CLD_EXITED); ASSERT_EQ(si.si_status, EXIT_SUCCESS); } -/* Regression: when qmp_client_invoke() fails before stage_fds runs (e.g. - * ensure_running() returns -ENOTCONN because the peer closed mid-handshake), the - * caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() — must be - * closed inside invoke. Otherwise they leak. */ +/* Regression: the caller-supplied fds — already TAKE_FD()'d through QMP_CLIENT_ARGS_FD() — + * must never leak, regardless of whether the invoke reaches the wire. Verified here via a + * dead peer: invoke enqueues (non-blocking), the queue item owns the fd, and client teardown + * must close it. */ TEST(qmp_client_invoke_failure_closes_fds) { - _cleanup_(qmp_client_unrefp) QmpClient *client = NULL; _cleanup_(sd_json_variant_unrefp) sd_json_variant *args = NULL; _cleanup_close_ int fd_to_pass = -EBADF; + QmpClient *client = NULL; QmpTestResult t = {}; int qmp_fds[2]; int saved_fd_value; ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds)); - /* Close the peer end immediately so ensure_running()'s read sees EOF and - * the client transitions straight to DISCONNECTED inside the first invoke. */ + /* Close the peer end immediately so any write attempt sees EPIPE. */ safe_close(qmp_fds[1]); - ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0])); - /* Deliberately do NOT attach to an event loop — invoke uses ensure_running()'s - * synchronous process+wait pump for the handshake. */ - fd_to_pass = open("/dev/null", O_RDWR|O_CLOEXEC); ASSERT_OK(fd_to_pass); saved_fd_value = fd_to_pass; /* remember the int value for the closed-check */ ASSERT_OK(sd_json_buildo(&args, SD_JSON_BUILD_PAIR_UNSIGNED("fdset-id", 0))); + ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0])); - /* invoke must fail because the peer is gone. The TAKE_FD inside the macro - * has already zeroed our local fd_to_pass; if invoke leaked the fd here, - * the fd would stay open in our process. */ - int r = qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd", - QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)), - on_test_result, &t); - ASSERT_TRUE(r < 0); - ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r)); + /* invoke no longer blocks on the handshake — it just enqueues. The fd is now + * owned by the underlying JsonStream output queue. */ + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd", + QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)), + on_test_result, &t)); + ASSERT_EQ(fd_to_pass, -EBADF); /* TAKE_FD cleared our local handle */ + + /* The fd is still open here (held in JsonStream's queue). */ + ASSERT_OK_ERRNO(fcntl(saved_fd_value, F_GETFD)); - /* fd_to_pass should now be -EBADF (TAKE_FD'd) and the underlying kernel fd - * should have been closed by the qmp_client_args_close_fds cleanup in - * qmp_client_invoke(). fcntl on the old int returns EBADF only if the slot - * is genuinely free. */ - ASSERT_EQ(fd_to_pass, -EBADF); + /* Client teardown (json_stream_done) must close queued output fds, otherwise the + * saved fd number would still be valid. */ + client = qmp_client_unref(client); ASSERT_EQ(fcntl(saved_fd_value, F_GETFD), -1); ASSERT_EQ(errno, EBADF); }