From: Daan De Meyer Date: Fri, 24 Apr 2026 06:07:31 +0000 (+0000) Subject: qmp-client: make QmpSlot a public, refcounted, cancellable handle X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=54c124eb6cf8a5d8430b491a0171a1dd62506182;p=thirdparty%2Fsystemd.git qmp-client: make QmpSlot a public, refcounted, cancellable handle QmpSlot now stores a back-reference to its QmpClient (mirroring sd_bus_slot) and is exposed as a public refcounted type via qmp_slot_ref/qmp_slot_unref. qmp_client_invoke() gains an optional QmpSlot **ret_slot out-parameter matching sd_bus_call_async(): passing non-NULL hands back a reference whose unref cancels the pending call (the callback is deregistered; a late reply is logged and discarded as unknown-id). Internally slots come in two flavours, following sd_bus's model: floating (owned by the client's pending set, used when ret_slot is NULL) and non-floating (ref held by the caller, slot holds a ref on the client). qmp_slot_disconnect() centralizes the teardown so the reply-dispatched, explicit-cancel, and client-teardown paths all converge on the same idempotent cleanup. qmp_client_call()'s sync slot is now non-floating and observes completion by watching slot->client go NULL instead of set_contains() on an id. --- diff --git a/src/shared/qmp-client.c b/src/shared/qmp-client.c index 53a30abe5a9..45bc0d8dbd2 100644 --- a/src/shared/qmp-client.c +++ b/src/shared/qmp-client.c @@ -30,11 +30,14 @@ typedef enum QmpClientState { QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, \ QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT) -typedef struct QmpSlot { +struct QmpSlot { + unsigned n_ref; + QmpClient *client; /* NULL once disconnected (reply dispatched, cancelled, or client died) */ uint64_t id; + bool floating; qmp_command_callback_t callback; void *userdata; -} QmpSlot; +}; struct QmpClient { unsigned n_ref; @@ -69,9 +72,92 @@ static int qmp_slot_compare_func(const QmpSlot *a, const QmpSlot *b) { DEFINE_PRIVATE_HASH_OPS(qmp_slot_hash_ops, QmpSlot, qmp_slot_hash_func, qmp_slot_compare_func); +/* Break the slot's connection to the client: remove from the lookup set, drop whichever reference + * is implied by the slot's floating-ness. For floating slots, the set is the sole owner, so with + * unref=true we also drop the slot's n_ref (usually dropping it to zero and freeing). For + * non-floating slots, we release the back-reference the slot holds on the client. + * + * Safe to call multiple times: once slot->client is NULL, subsequent calls are no-ops. */ +static void qmp_slot_disconnect(QmpSlot *slot, bool unref) { + assert(slot); + + if (!slot->client) + return; + + QmpClient *client = slot->client; + + set_remove(client->slots, slot); + slot->client = NULL; + + if (!slot->floating) + qmp_client_unref(client); + else if (unref) + /* May re-enter via qmp_slot_free→qmp_slot_disconnect(,false) if this drops the + * last ref, but the early return above makes that recursion a no-op. */ + qmp_slot_unref(slot); +} + +static QmpSlot* qmp_slot_free(QmpSlot *slot) { + if (!slot) + return NULL; + + /* Idempotent: if the slot was already disconnected (reply dispatched, explicit cancel, + * or client-side teardown), this is a no-op. Otherwise it removes us from the set and + * drops our client reference (for non-floating slots). */ + qmp_slot_disconnect(slot, /* unref= */ false); + + return mfree(slot); +} + +DEFINE_TRIVIAL_REF_UNREF_FUNC(QmpSlot, qmp_slot, qmp_slot_free); + +QmpClient* qmp_slot_get_client(QmpSlot *slot) { + assert(slot); + return slot->client; +} + +static int qmp_slot_new( + QmpClient *client, + bool floating, + uint64_t id, + qmp_command_callback_t callback, + void *userdata, + QmpSlot **ret) { + + int r; + + assert(client); + assert(ret); + + _cleanup_(qmp_slot_unrefp) QmpSlot *slot = new(QmpSlot, 1); + if (!slot) + return -ENOMEM; + + *slot = (QmpSlot) { + .n_ref = 1, + .client = NULL, /* wired up below, after set_put succeeds */ + .id = id, + .floating = floating, + .callback = callback, + .userdata = userdata, + }; + + r = set_ensure_put(&client->slots, &qmp_slot_hash_ops, slot); + if (r < 0) + return r; + assert(r > 0); + + slot->client = client; + if (!floating) + qmp_client_ref(client); + + *ret = TAKE_PTR(slot); + return 0; +} + static void qmp_client_clear(QmpClient *c); -static QmpClient* qmp_client_destroy(QmpClient *c) { +static QmpClient* qmp_client_free(QmpClient *c) { if (!c) return NULL; @@ -80,8 +166,7 @@ static QmpClient* qmp_client_destroy(QmpClient *c) { return mfree(c); } -DEFINE_PRIVATE_TRIVIAL_REF_FUNC(QmpClient, qmp_client); -DEFINE_TRIVIAL_UNREF_FUNC(QmpClient, qmp_client, qmp_client_destroy); +DEFINE_TRIVIAL_REF_UNREF_FUNC(QmpClient, qmp_client, qmp_client_free); static void qmp_client_clear_current(QmpClient *c) { assert(c); @@ -225,42 +310,59 @@ static int qmp_client_dispatch_reply(QmpClient *c) { return 1; } - _cleanup_free_ QmpSlot *pending = set_remove(c->slots, &(QmpSlot) { .id = id }); - if (!pending) { + QmpSlot *slot = set_get(c->slots, &(QmpSlot) { .id = id }); + if (!slot) { qmp_client_clear_current(c); json_stream_log(&c->stream, "Discarding QMP response with unknown id %" PRIu64, id); return 1; } /* Synchronous slot (no callback): leave c->current pinned so qmp_client_call() can - * pick up the reply and hand out borrowed pointers into it. */ - if (!pending->callback) + * pick up the reply and hand out borrowed pointers into it. The sync caller owns a + * ref on the slot and detects completion by observing slot->client turning NULL. */ + if (!slot->callback) { + qmp_slot_disconnect(slot, /* unref= */ true); return 1; + } _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); error = qmp_parse_response(v, &result, &desc); - r = pending->callback(c, result, desc, error, pending->userdata); + /* Pin the slot across the callback regardless of floating-ness. For a floating slot, + * disconnect(unref=true) drops the set's implicit ref which would otherwise free it + * out from under the callback. */ + qmp_slot_ref(slot); + + r = slot->callback(c, result, desc, error, slot->userdata); if (r < 0) json_stream_log_errno(&c->stream, r, "Command callback returned error, ignoring: %m"); + qmp_slot_disconnect(slot, /* unref= */ true); + qmp_slot_unref(slot); + return 1; } -/* Fail all pending async commands with the given error. Called on disconnect. */ +/* Fail all pending commands with the given error. Called on disconnect. */ static void qmp_client_fail_pending(QmpClient *c, int error) { - QmpSlot *p; + QmpSlot *slot; int r; assert(c); - while ((p = set_steal_first(c->slots))) { - if (p->callback) { - r = p->callback(c, /* result= */ NULL, /* error_desc= */ NULL, error, p->userdata); + while ((slot = set_first(c->slots))) { + /* Keep alive across the callback and past disconnect (which may unref it for + * floating slots). */ + qmp_slot_ref(slot); + + if (slot->callback) { + r = slot->callback(c, /* result= */ NULL, /* error_desc= */ NULL, error, slot->userdata); if (r < 0) json_stream_log_errno(&c->stream, r, "Command callback returned error, ignoring: %m"); } - free(p); + + qmp_slot_disconnect(slot, /* unref= */ true); + qmp_slot_unref(slot); } } @@ -701,17 +803,19 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds); /* Shared send path for qmp_client_invoke() and qmp_client_call(). A NULL callback registers * a "synchronous" slot: dispatch_reply leaves c->current pinned on match instead of invoking - * a callback, so qmp_client_call() can hand out borrowed pointers into the reply. */ + * a callback, so qmp_client_call() can hand out borrowed pointers into the reply. If ret_slot + * is NULL the slot is allocated as floating (owned by c->slots); otherwise a reference is + * handed back to the caller. */ static int qmp_client_send( QmpClient *c, const char *command, QmpClientArgs *args, qmp_command_callback_t callback, void *userdata, - uint64_t *ret_id) { + QmpSlot **ret_slot) { _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL; - _cleanup_free_ QmpSlot *pending = NULL; + _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL; /* Closes any fds in args on every early-return path; TAKE_PTR()'d on the success path * below once json_stream_enqueue_full() has taken ownership of them. */ _cleanup_(qmp_client_args_close_fdsp) QmpClientArgs *fds_owner = args; @@ -729,50 +833,40 @@ static int qmp_client_send( if (r < 0) return r; - pending = new(QmpSlot, 1); - if (!pending) - return -ENOMEM; - - *pending = (QmpSlot) { - .id = id, - .callback = callback, - .userdata = userdata, - }; - - r = set_ensure_put(&c->slots, &qmp_slot_hash_ops, pending); + r = qmp_slot_new(c, /* floating= */ !ret_slot, id, callback, userdata, &slot); if (r < 0) return r; - assert(r > 0); r = json_stream_enqueue_full(&c->stream, cmd, args ? args->fds_consume : NULL, args ? args->n_fds : 0); - if (r < 0) { - set_remove(c->slots, pending); - return r; - } + if (r < 0) + return r; /* slot cleanup disconnects it */ /* Arm defer so process() drains the output on the next iteration. */ if (c->defer_event_source) (void) sd_event_source_set_enabled(c->defer_event_source, SD_EVENT_ON); - TAKE_PTR(pending); TAKE_PTR(fds_owner); - if (ret_id) - *ret_id = id; + if (ret_slot) + *ret_slot = TAKE_PTR(slot); + else + TAKE_PTR(slot); /* floating: c->slots keeps it alive until dispatch */ + return 0; } int qmp_client_invoke( QmpClient *c, + QmpSlot **ret_slot, const char *command, QmpClientArgs *args, qmp_command_callback_t callback, void *userdata) { assert(callback); - return qmp_client_send(c, command, args, callback, userdata, /* ret_id= */ NULL); + return qmp_client_send(c, command, args, callback, userdata, ret_slot); } int qmp_client_call( @@ -782,7 +876,7 @@ int qmp_client_call( sd_json_variant **ret_result, const char **ret_error_desc) { - uint64_t id; + _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL; int r; assert_return(c, -EINVAL); @@ -793,17 +887,18 @@ int qmp_client_call( /* NULL callback marks this as a synchronous slot: dispatch_reply matches on id like * any other slot (so stray unknown-id replies still get logged and dropped), but - * pins c->current for us instead of invoking a callback. */ - r = qmp_client_send(c, command, args, /* callback= */ NULL, /* userdata= */ NULL, &id); + * pins c->current for us instead of invoking a callback. The slot is non-floating so + * we can observe dispatch by watching slot->client go NULL. */ + r = qmp_client_send(c, command, args, /* callback= */ NULL, /* userdata= */ NULL, &slot); if (r < 0) return r; - /* Pump the loop until our sync slot fires (removed from c->slots, c->current pinned). */ + /* Pump the loop until our sync slot fires (disconnected by dispatch, c->current pinned). */ for (;;) { if (c->state == QMP_CLIENT_DISCONNECTED) return -ECONNRESET; - if (!set_contains(c->slots, &(QmpSlot) { .id = id })) { + if (!slot->client) { assert(c->current); break; } diff --git a/src/shared/qmp-client.h b/src/shared/qmp-client.h index b3ee8262e01..7a477f7e4fa 100644 --- a/src/shared/qmp-client.h +++ b/src/shared/qmp-client.h @@ -54,9 +54,12 @@ bool qmp_client_is_idle(QmpClient *c); /* True iff the connection is dead. Stable terminal state — once set, it stays set. */ bool qmp_client_is_disconnected(QmpClient *c); -/* Async send. Returns 0 on send (callback will fire later), negative errno on failure. */ +/* Async send. Returns 0 on send (callback will fire later), negative errno on failure. If + * ret_slot is non-NULL, returns a reference to a QmpSlot which can be used to cancel the call + * (by unreffing it before the reply arrives). */ int qmp_client_invoke( QmpClient *client, + QmpSlot **ret_slot, const char *command, QmpClientArgs *args, qmp_command_callback_t callback, @@ -78,10 +81,14 @@ int qmp_client_set_description(QmpClient *c, const char *description); sd_event* qmp_client_get_event(QmpClient *c); unsigned qmp_client_next_fdset_id(QmpClient *client); -QmpClient* qmp_client_unref(QmpClient *p); - +DECLARE_TRIVIAL_REF_UNREF_FUNC(QmpClient, qmp_client); DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClient *, qmp_client_unref); +DECLARE_TRIVIAL_REF_UNREF_FUNC(QmpSlot, qmp_slot); +DEFINE_TRIVIAL_CLEANUP_FUNC(QmpSlot *, qmp_slot_unref); + +QmpClient* qmp_slot_get_client(QmpSlot *slot); + /* Returns true iff any object entry in schema (result of query-qmp-schema) has a member with this * name. QEMU's introspection replaces type names with opaque numeric ids, so lookup-by-type-name is * impossible — but member names are real. Use only when the member name is unique in the schema. */ diff --git a/src/shared/shared-forward.h b/src/shared/shared-forward.h index 1a19b42499c..1207fe8a258 100644 --- a/src/shared/shared-forward.h +++ b/src/shared/shared-forward.h @@ -80,6 +80,7 @@ typedef struct MStack MStack; typedef struct OpenFile OpenFile; typedef struct Pkcs11EncryptedKey Pkcs11EncryptedKey; typedef struct QmpClient QmpClient; +typedef struct QmpSlot QmpSlot; typedef struct Table Table; typedef struct Tpm2Context Tpm2Context; typedef struct Tpm2Handle Tpm2Handle; diff --git a/src/test/test-qmp-client-qemu.c b/src/test/test-qmp-client-qemu.c index ec520cc270a..df8d3c6e215 100644 --- a/src/test/test-qmp-client-qemu.c +++ b/src/test/test-qmp-client-qemu.c @@ -133,7 +133,7 @@ TEST(qmp_client_qemu_handshake_and_schema) { /* query-qmp-schema returns ~200KB -- validates the buffered reader handles large multi-read() * responses correctly. The handshake completes transparently inside invoke(). */ - r = qmp_client_invoke(client, "query-qmp-schema", NULL, on_test_result, &t); + r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-qmp-schema", NULL, on_test_result, &t); if (r < 0) { log_tests_skipped_errno(r, "QMP invoke failed (handshake or send)"); return; @@ -153,7 +153,7 @@ TEST(qmp_client_qemu_handshake_and_schema) { qmp_test_result_done(&t); /* Clean shutdown */ - ASSERT_OK(qmp_client_invoke(client, "quit", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "quit", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); qmp_test_result_done(&t); @@ -197,7 +197,7 @@ TEST(qmp_client_qemu_query_status) { /* query-status validates response parsing against real QEMU output format. * The handshake completes transparently inside invoke(). */ - r = qmp_client_invoke(client, "query-status", NULL, on_test_result, &t); + r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t); if (r < 0) { log_tests_skipped_errno(r, "QMP invoke failed (handshake or send)"); return; @@ -219,13 +219,13 @@ TEST(qmp_client_qemu_query_status) { qmp_test_result_done(&t); /* Test stop + cont to exercise command sequencing and id correlation */ - ASSERT_OK(qmp_client_invoke(client, "stop", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "stop", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); qmp_test_result_done(&t); /* Verify status changed */ - ASSERT_OK(qmp_client_invoke(client, "query-status", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); ASSERT_NOT_NULL(t.result); @@ -236,13 +236,13 @@ TEST(qmp_client_qemu_query_status) { qmp_test_result_done(&t); - ASSERT_OK(qmp_client_invoke(client, "cont", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "cont", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); qmp_test_result_done(&t); /* Clean shutdown */ - ASSERT_OK(qmp_client_invoke(client, "quit", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "quit", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); qmp_test_result_done(&t); diff --git a/src/test/test-qmp-client.c b/src/test/test-qmp-client.c index c70be2c0f4e..bbc3f152869 100644 --- a/src/test/test-qmp-client.c +++ b/src/test/test-qmp-client.c @@ -248,7 +248,7 @@ TEST(qmp_client_basic) { qmp_client_bind_event(client, test_event_callback, &event_received); /* Execute query-status */ - ASSERT_OK(qmp_client_invoke(client, "query-status", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); ASSERT_NOT_NULL(t.result); @@ -262,13 +262,13 @@ TEST(qmp_client_basic) { qmp_test_result_done(&t); /* Execute stop */ - ASSERT_OK(qmp_client_invoke(client, "stop", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "stop", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); qmp_test_result_done(&t); /* Execute cont -- the STOP event should be dispatched by the IO callback */ - ASSERT_OK(qmp_client_invoke(client, "cont", NULL, on_test_result, &t)); + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "cont", NULL, on_test_result, &t)); qmp_test_wait(event, &t); ASSERT_EQ(t.error, 0); qmp_test_result_done(&t); @@ -320,7 +320,7 @@ TEST(qmp_client_eof) { /* Executing a command should fail with a disconnect error because the server * closed. The handshake may succeed or fail inside invoke() — either way the * invoke itself or the async callback should report a disconnect. */ - r = qmp_client_invoke(client, "query-status", NULL, on_test_result, &t); + r = qmp_client_invoke(client, /* ret_slot= */ NULL, "query-status", NULL, on_test_result, &t); if (r < 0) ASSERT_TRUE(ERRNO_IS_NEG_DISCONNECT(r)); else { @@ -430,7 +430,7 @@ TEST(qmp_client_first_invoke_with_fd) { /* THIS is the previously-broken pattern: very first invoke against the client, * carrying an fd, with the handshake still pending. */ - ASSERT_OK(qmp_client_invoke(client, "add-fd", + ASSERT_OK(qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)), on_test_result, &t)); @@ -478,7 +478,7 @@ TEST(qmp_client_invoke_failure_closes_fds) { /* invoke must fail because the peer is gone. The TAKE_FD inside the macro * has already zeroed our local fd_to_pass; if invoke leaked the fd here, * the fd would stay open in our process. */ - int r = qmp_client_invoke(client, "add-fd", + int r = qmp_client_invoke(client, /* ret_slot= */ NULL, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd_to_pass)), on_test_result, &t); ASSERT_TRUE(r < 0); @@ -493,6 +493,125 @@ TEST(qmp_client_invoke_failure_closes_fds) { ASSERT_EQ(errno, EBADF); } +/* Mock for the slot lifecycle + cancel tests: greets, accepts capabilities, then accepts + * query-status and stop, replying with dummy returns. A cancelled query-status still gets + * sent on the wire (cancel merely removes the pending slot), so the server must be prepared + * to read and reply to it. */ +static _noreturn_ void mock_qmp_server_slot(int fd) { + _cleanup_(json_stream_done) JsonStream s = {}; + _cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL; + + mock_qmp_init(&s, fd); + + mock_qmp_send_literal(&s, + "{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}"); + + mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL); + + ASSERT_OK(sd_json_buildo( + &status_return, + SD_JSON_BUILD_PAIR_BOOLEAN("running", true), + SD_JSON_BUILD_PAIR_STRING("status", "running"))); + mock_qmp_expect_and_reply(&s, "query-status", status_return); + + mock_qmp_expect_and_reply(&s, "stop", NULL); + + _exit(EXIT_SUCCESS); +} + +/* Verify that when qmp_client_invoke() returns a slot, qmp_slot_get_client() tracks the + * connection state: the client pointer is reported while the call is in flight, and flipped + * back to NULL once the reply has been dispatched. The caller must still be able to drop its + * ref safely after that. */ +TEST(qmp_client_invoke_slot_lifecycle) { + _cleanup_(qmp_client_unrefp) QmpClient *client = NULL; + _cleanup_(sd_event_unrefp) sd_event *event = NULL; + _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL; + _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL; + QmpTestResult t = {}; + int qmp_fds[2]; + int r; + + ASSERT_OK(sd_event_new(&event)); + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds)); + + r = ASSERT_OK(pidref_safe_fork("(mock-qmp-slot-life)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid)); + if (r == 0) { + safe_close(qmp_fds[0]); + mock_qmp_server_slot(qmp_fds[1]); + } + safe_close(qmp_fds[1]); + + ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0])); + ASSERT_OK(qmp_client_attach_event(client, event, SD_EVENT_PRIORITY_NORMAL)); + + ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, on_test_result, &t)); + + /* While in flight the slot still references its client. */ + ASSERT_NOT_NULL(slot); + ASSERT_PTR_EQ(qmp_slot_get_client(slot), client); + + qmp_test_wait(event, &t); + ASSERT_EQ(t.error, 0); + ASSERT_NOT_NULL(t.result); + + /* Once dispatched, the slot is disconnected from the client but still owned by us. */ + ASSERT_NULL(qmp_slot_get_client(slot)); + + qmp_test_result_done(&t); + + /* Drop our ref explicitly (out of order w.r.t. cleanup) to exercise the + * already-disconnected path in qmp_slot_free(). */ + slot = qmp_slot_unref(slot); + ASSERT_NULL(slot); +} + +/* Verify that dropping the only reference on a pending slot before the reply arrives cancels + * the callback. The command is already enqueued on the stream at that point, so the server + * still sees it and replies — but the reply lands on an unknown id and is discarded. */ +TEST(qmp_client_invoke_slot_cancel) { + _cleanup_(qmp_client_unrefp) QmpClient *client = NULL; + _cleanup_(pidref_done_sigkill_wait) PidRef pid = PIDREF_NULL; + QmpTestResult t_cancelled = {}; + QmpSlot *slot = NULL; + int qmp_fds[2]; + int r; + + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds)); + + r = ASSERT_OK(pidref_safe_fork("(mock-qmp-slot-cancel)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid)); + if (r == 0) { + safe_close(qmp_fds[0]); + mock_qmp_server_slot(qmp_fds[1]); + } + safe_close(qmp_fds[1]); + + /* Drive without an event loop so the subsequent qmp_client_call() owns all pumping; + * it serializes write→read round-trips, which avoids the mock server seeing the + * cancelled query-status and the follow-up stop concatenated into a single recv(). */ + ASSERT_OK(qmp_client_connect_fd(&client, qmp_fds[0])); + + ASSERT_OK(qmp_client_invoke(client, &slot, "query-status", NULL, on_test_result, &t_cancelled)); + ASSERT_NOT_NULL(slot); + + /* Drop our sole ref → slot disconnects itself from the client's pending set. The + * enqueued query-status is still on the wire; when its reply arrives, dispatch_reply + * won't find a matching slot and will log-and-discard it. */ + slot = qmp_slot_unref(slot); + ASSERT_NULL(slot); + + /* Synchronous call drives its own process+wait pump: it first drains the already- + * enqueued query-status write, consumes (and discards) its reply, then sends stop + * and waits for that reply. Any improper fire of the cancelled callback would have + * happened during that process() pass. */ + ASSERT_EQ(qmp_client_call(client, "stop", NULL, NULL, NULL), 1); + + /* The cancelled callback must never have fired. */ + ASSERT_FALSE(t_cancelled.done); + ASSERT_NULL(t_cancelled.result); + ASSERT_NULL(t_cancelled.error_desc); +} + /* Drives a small wire dance for the sync call test: greeting, capabilities, one successful * command reply, and two error replies (one for the ret_error_desc path, one for the -EIO * path). */ diff --git a/src/vmspawn/vmspawn-qmp.c b/src/vmspawn/vmspawn-qmp.c index 4171772ee7c..f69d2a5e7c6 100644 --- a/src/vmspawn/vmspawn-qmp.c +++ b/src/vmspawn/vmspawn-qmp.c @@ -126,7 +126,7 @@ static int qmp_fdset_add(QmpClient *qmp, int fd_consume, char **ret_path) { if (asprintf(&path, "/dev/fdset/%u", id) < 0) return -ENOMEM; - r = qmp_client_invoke(qmp, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd)), + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "add-fd", QMP_CLIENT_ARGS_FD(args, TAKE_FD(fd)), on_qmp_setup_complete, (void*) "add-fd"); if (r < 0) return r; @@ -221,7 +221,7 @@ static int qmp_add_file_node(QmpClient *qmp, const QmpFileNodeParams *p) { if (r < 0) return r; - return qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "blockdev-add"); + return qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "blockdev-add"); } /* Get the virtual size of an image from the fd directly. For raw images the virtual size @@ -320,7 +320,7 @@ static int on_ephemeral_create_concluded(QmpClient *qmp, void *userdata) { if (r < 0) return log_error_errno(r, "Failed to build overlay format JSON for '%s': %m", ctx->node_name); - r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add"); if (r < 0) return r; @@ -339,7 +339,7 @@ static int on_ephemeral_create_concluded(QmpClient *qmp, void *userdata) { if (r < 0) return log_error_errno(r, "Failed to build device_add JSON for '%s': %m", ctx->node_name); - r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); if (r < 0) return r; @@ -408,7 +408,7 @@ static int qmp_setup_ephemeral_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, D if (r < 0) return r; - r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(base_fmt_args), on_qmp_setup_complete, (void*) "blockdev-add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(base_fmt_args), on_qmp_setup_complete, (void*) "blockdev-add"); if (r < 0) return log_error_errno(r, "Failed to send blockdev-add for base format '%s': %m", drive->path); @@ -424,7 +424,7 @@ static int qmp_setup_ephemeral_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, D if (r < 0) return r; - r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(overlay_file_args), on_qmp_setup_complete, (void*) "blockdev-add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(overlay_file_args), on_qmp_setup_complete, (void*) "blockdev-add"); if (r < 0) return log_error_errno(r, "Failed to send blockdev-add for overlay file '%s': %m", drive->path); @@ -482,7 +482,7 @@ static int qmp_setup_ephemeral_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, D TAKE_PTR(ectx); - r = qmp_client_invoke(qmp, "blockdev-create", QMP_CLIENT_ARGS(cmd_args), on_qmp_setup_complete, (void*) "blockdev-create"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-create", QMP_CLIENT_ARGS(cmd_args), on_qmp_setup_complete, (void*) "blockdev-create"); if (r < 0) return log_error_errno(r, "Failed to send blockdev-create for '%s': %m", drive->path); @@ -532,7 +532,7 @@ static int qmp_setup_regular_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, Dri if (r < 0) return r; - r = qmp_client_invoke(qmp, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(fmt_args), on_qmp_setup_complete, (void*) "blockdev-add"); if (r < 0) return log_error_errno(r, "Failed to send blockdev-add format for '%s': %m", drive->path); @@ -542,7 +542,7 @@ static int qmp_setup_regular_drive(VmspawnQmpBridge *bridge, QmpClient *qmp, Dri if (r < 0) return r; - r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); if (r < 0) return log_error_errno(r, "Failed to send device_add for '%s': %m", drive->path); @@ -584,7 +584,7 @@ int vmspawn_qmp_setup_network(VmspawnQmpBridge *bridge, NetworkInfo *network) { if (r < 0) return log_error_errno(r, "Failed to build getfd JSON: %m"); - r = qmp_client_invoke(qmp, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(network->fd)), + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(network->fd)), on_qmp_setup_complete, (void*) "getfd"); if (r < 0) return log_error_errno(r, "Failed to send getfd for TAP fd: %m"); @@ -606,7 +606,7 @@ int vmspawn_qmp_setup_network(VmspawnQmpBridge *bridge, NetworkInfo *network) { if (r < 0) return log_error_errno(r, "Failed to build netdev_add JSON: %m"); - r = qmp_client_invoke(qmp, "netdev_add", QMP_CLIENT_ARGS(netdev_args), on_qmp_setup_complete, (void*) "netdev_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "netdev_add", QMP_CLIENT_ARGS(netdev_args), on_qmp_setup_complete, (void*) "netdev_add"); if (r < 0) return log_error_errno(r, "Failed to send netdev_add: %m"); @@ -623,7 +623,7 @@ int vmspawn_qmp_setup_network(VmspawnQmpBridge *bridge, NetworkInfo *network) { if (r < 0) return log_error_errno(r, "Failed to build NIC device_add JSON: %m"); - r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); if (r < 0) return log_error_errno(r, "Failed to send NIC device_add: %m"); @@ -658,7 +658,7 @@ static int vmspawn_qmp_setup_one_virtiofs(QmpClient *qmp, const VirtiofsInfo *vf if (r < 0) return log_error_errno(r, "Failed to build chardev-add JSON for '%s': %m", vfs->id); - r = qmp_client_invoke(qmp, "chardev-add", QMP_CLIENT_ARGS(chardev_args), on_qmp_setup_complete, (void*) "chardev-add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "chardev-add", QMP_CLIENT_ARGS(chardev_args), on_qmp_setup_complete, (void*) "chardev-add"); if (r < 0) return log_error_errno(r, "Failed to send chardev-add '%s': %m", vfs->id); @@ -675,7 +675,7 @@ static int vmspawn_qmp_setup_one_virtiofs(QmpClient *qmp, const VirtiofsInfo *vf if (r < 0) return log_error_errno(r, "Failed to build virtiofs device_add JSON for '%s': %m", vfs->id); - r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); if (r < 0) return log_error_errno(r, "Failed to send virtiofs device_add '%s': %m", vfs->id); @@ -719,7 +719,7 @@ int vmspawn_qmp_setup_vsock(VmspawnQmpBridge *bridge, VsockInfo *vsock) { if (r < 0) return log_error_errno(r, "Failed to build getfd JSON for VSOCK: %m"); - r = qmp_client_invoke(qmp, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(vsock->fd)), + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "getfd", QMP_CLIENT_ARGS_FD(getfd_args, TAKE_FD(vsock->fd)), on_qmp_setup_complete, (void*) "getfd"); if (r < 0) return log_error_errno(r, "Failed to send getfd for VSOCK fd: %m"); @@ -736,7 +736,7 @@ int vmspawn_qmp_setup_vsock(VmspawnQmpBridge *bridge, VsockInfo *vsock) { if (r < 0) return log_error_errno(r, "Failed to build VSOCK device_add JSON: %m"); - r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(device_args), on_qmp_setup_complete, (void*) "device_add"); if (r < 0) return log_error_errno(r, "Failed to send VSOCK device_add: %m"); @@ -766,7 +766,7 @@ static int qmp_setup_scsi_controller(QmpClient *qmp, const char *pcie_port) { if (r < 0) return log_error_errno(r, "Failed to build SCSI controller JSON: %m"); - r = qmp_client_invoke(qmp, "device_add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "device_add"); + r = qmp_client_invoke(qmp, /* ret_slot= */ NULL, "device_add", QMP_CLIENT_ARGS(args), on_qmp_setup_complete, (void*) "device_add"); if (r < 0) return log_error_errno(r, "Failed to send SCSI controller device_add: %m"); @@ -909,8 +909,8 @@ static int on_io_uring_probe_add_reply( if (r < 0) return r; - return qmp_client_invoke(c, "blockdev-del", QMP_CLIENT_ARGS(del_args), - on_io_uring_probe_del_reply, bridge); + return qmp_client_invoke(c, /* ret_slot= */ NULL, "blockdev-del", QMP_CLIENT_ARGS(del_args), + on_io_uring_probe_del_reply, bridge); } static int probe_io_uring(QmpClient *c, VmspawnQmpBridge *bridge) { @@ -930,8 +930,8 @@ static int probe_io_uring(QmpClient *c, VmspawnQmpBridge *bridge) { if (r < 0) return r; - return qmp_client_invoke(c, "blockdev-add", QMP_CLIENT_ARGS(args), - on_io_uring_probe_add_reply, bridge); + return qmp_client_invoke(c, /* ret_slot= */ NULL, "blockdev-add", QMP_CLIENT_ARGS(args), + on_io_uring_probe_add_reply, bridge); } static int on_probe_schema_reply( @@ -965,8 +965,8 @@ static int probe_schema(QmpClient *c, VmspawnQmpBridge *bridge) { assert(c); assert(bridge); - return qmp_client_invoke(c, "query-qmp-schema", QMP_CLIENT_ARGS(NULL), - on_probe_schema_reply, bridge); + return qmp_client_invoke(c, /* ret_slot= */ NULL, "query-qmp-schema", QMP_CLIENT_ARGS(NULL), + on_probe_schema_reply, bridge); } int vmspawn_qmp_init(VmspawnQmpBridge **ret, int fd, sd_event *event) { @@ -1055,5 +1055,5 @@ static int on_cont_complete( int vmspawn_qmp_start(VmspawnQmpBridge *bridge) { assert(bridge); - return qmp_client_invoke(bridge->qmp, "cont", /* args= */ NULL, on_cont_complete, /* userdata= */ NULL); + return qmp_client_invoke(bridge->qmp, /* ret_slot= */ NULL, "cont", /* args= */ NULL, on_cont_complete, /* userdata= */ NULL); } diff --git a/src/vmspawn/vmspawn-varlink.c b/src/vmspawn/vmspawn-varlink.c index c73372e7a1a..4a11b6fd4e1 100644 --- a/src/vmspawn/vmspawn-varlink.c +++ b/src/vmspawn/vmspawn-varlink.c @@ -71,7 +71,7 @@ static int qmp_execute_varlink_async( sd_varlink_ref(link); - r = qmp_client_invoke(ctx->bridge->qmp, command, QMP_CLIENT_ARGS(arguments), callback, link); + r = qmp_client_invoke(ctx->bridge->qmp, /* ret_slot= */ NULL, command, QMP_CLIENT_ARGS(arguments), callback, link); if (r < 0) sd_varlink_unref(link); @@ -241,7 +241,7 @@ static int dispatch_pending_job(VmspawnQmpBridge *bridge, sd_json_variant *data) if (r < 0) return sd_event_exit(qmp_client_get_event(bridge->qmp), r); - r = qmp_client_invoke(bridge->qmp, "job-dismiss", QMP_CLIENT_ARGS(dismiss_args), + r = qmp_client_invoke(bridge->qmp, /* ret_slot= */ NULL, "job-dismiss", QMP_CLIENT_ARGS(dismiss_args), on_job_dismiss_complete, /* userdata= */ NULL); if (r < 0) return sd_event_exit(qmp_client_get_event(bridge->qmp), r);