From: Christian Brauner Date: Mon, 30 Mar 2026 12:02:10 +0000 (+0200) Subject: shared: add QMP client library X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=50dd55bb6ebcf8ae43490b8db2a917330048f1ba;p=thirdparty%2Fsystemd.git shared: add QMP client library Async QMP client for talking to QEMU's machine monitor from libsystemd-shared. The I/O core (buffered read/write, output queue, event source management) follows sd-varlink's patterns. State machine: INITIAL -> GREETING_RECEIVED -> CAPABILITIES_SENT -> RUNNING | v DISCONNECTED The QMP handshake (greeting + qmp_capabilities) is driven transparently by qmp_client_invoke() through an internal qmp_client_ensure_running() helper, matching sd-bus's bus_ensure_running() pattern. Callers never wait for it explicitly. qmp_client_invoke() is the only command interface: asynchronous, with per-command callback. Slots are tracked in a Set keyed by id; replies are dispatched by id match. SCM_RIGHTS fd passing is bundled through QmpClientArgs and the QMP_CLIENT_ARGS_FD macro. qmp_client_process() and qmp_client_wait() are exposed publicly, mirroring sd_varlink_process() and sd_varlink_wait(). Callers that need to drive the client synchronously — e.g. feature probing before entering the sd_event main loop — can loop on them exactly like varlink_call_internal() does on its varlink equivalents. Other features: - Buffered stream reader for QMP's \r\n-delimited JSON, handling multi-read responses (query-qmp-schema is ~200 KiB). - Fdset id allocation via qmp_client_next_fdset_id(). - Synthetic SHUTDOWN event on unexpected disconnect. - Disconnect detection with callback notification and pending-command cleanup. - -ENOBUFS from the 16 MiB input-buffer cap is treated as recoverable (not a transport error). Signed-off-by: Christian Brauner (Amutable) --- diff --git a/src/shared/meson.build b/src/shared/meson.build index 924987b024c..705cb955bd6 100644 --- a/src/shared/meson.build +++ b/src/shared/meson.build @@ -165,6 +165,7 @@ shared_sources = files( 'printk-util.c', 'prompt-util.c', 'ptyfwd.c', + 'qmp-client.c', 'qrcode-util.c', 'quota-util.c', 'reboot-util.c', diff --git a/src/shared/qmp-client.c b/src/shared/qmp-client.c new file mode 100644 index 00000000000..6a92550e727 --- /dev/null +++ b/src/shared/qmp-client.c @@ -0,0 +1,845 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include "sd-event.h" +#include "sd-json.h" + +#include "alloc-util.h" +#include "fd-util.h" +#include "hash-funcs.h" +#include "json-stream.h" +#include "json-util.h" +#include "qmp-client.h" +#include "set.h" +#include "siphash24.h" +#include "string-util.h" + +typedef enum QmpClientState { + QMP_CLIENT_HANDSHAKE_INITIAL, /* waiting for QMP greeting */ + QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, /* greeting received, sending qmp_capabilities */ + QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT, /* waiting for qmp_capabilities response */ + QMP_CLIENT_RUNNING, /* connected, ready for commands */ + QMP_CLIENT_DISCONNECTED, /* connection closed */ + _QMP_CLIENT_STATE_MAX, + _QMP_CLIENT_STATE_INVALID = -EINVAL, +} QmpClientState; + +/* States routed to dispatch_handshake. */ +#define QMP_CLIENT_STATE_IS_HANDSHAKE(s) \ + IN_SET(s, \ + QMP_CLIENT_HANDSHAKE_INITIAL, \ + QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED, \ + QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT) + +typedef struct QmpSlot { + uint64_t id; + qmp_command_callback_t callback; + void *userdata; +} QmpSlot; + +struct QmpClient { + unsigned n_ref; + + JsonStream stream; + + sd_event_source *quit_event_source; + sd_event_source *defer_event_source; + + uint64_t next_id; + Set *slots; /* QmpSlot* entries indexed by id, for async dispatch */ + + qmp_event_callback_t event_callback; + void *event_userdata; + qmp_disconnect_callback_t disconnect_callback; + void *disconnect_userdata; + + unsigned next_fdset_id; /* monotonic fdset-id allocator for add-fd */ + + QmpClientState state; + sd_json_variant *current; /* most recently parsed message, pending dispatch */ +}; + +static void qmp_slot_hash_func(const QmpSlot *p, struct siphash *state) { + siphash24_compress_typesafe(p->id, state); +} + +static int qmp_slot_compare_func(const QmpSlot *a, const QmpSlot *b) { + return CMP(a->id, b->id); +} + +DEFINE_PRIVATE_HASH_OPS(qmp_slot_hash_ops, + QmpSlot, qmp_slot_hash_func, qmp_slot_compare_func); + +static void qmp_client_clear(QmpClient *c); + +static QmpClient* qmp_client_destroy(QmpClient *c) { + if (!c) + return NULL; + + qmp_client_clear(c); + + return mfree(c); +} + +DEFINE_PRIVATE_TRIVIAL_REF_FUNC(QmpClient, qmp_client); +DEFINE_TRIVIAL_UNREF_FUNC(QmpClient, qmp_client, qmp_client_destroy); + +static void qmp_client_clear_current(QmpClient *c) { + assert(c); + + c->current = sd_json_variant_unref(c->current); +} + +static void qmp_client_dispatch_event(QmpClient *c, sd_json_variant *v) { + int r; + + assert(c); + assert(v); + + if (!c->event_callback) + return; + + struct { + const char *event; + sd_json_variant *data; + } p = {}; + + static const sd_json_dispatch_field table[] = { + { "event", SD_JSON_VARIANT_STRING, sd_json_dispatch_const_string, voffsetof(p, event), SD_JSON_MANDATORY }, + { "data", SD_JSON_VARIANT_OBJECT, sd_json_dispatch_variant_noref, voffsetof(p, data), 0 }, + {}, + }; + + r = sd_json_dispatch(v, table, SD_JSON_ALLOW_EXTENSIONS|SD_JSON_LOG|SD_JSON_DEBUG, &p); + if (r < 0) + return; + + r = c->event_callback(c, p.event, p.data, c->event_userdata); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Event callback returned error, ignoring: %m"); +} + +/* QEMU's error "class" is effectively always "GenericError"; only "desc" carries useful info. */ +static const char* qmp_extract_error_description(sd_json_variant *v) { + sd_json_variant *error = sd_json_variant_by_key(v, "error"); + if (!error) + return NULL; + sd_json_variant *desc = sd_json_variant_by_key(error, "desc"); + if (desc) + return sd_json_variant_string(desc); + return "unspecified error"; +} + +/* Returns 1 with id set; 0 if absent (e.g. pre-parse error responses); -EBADMSG on wrong type. */ +static int qmp_extract_response_id(sd_json_variant *v, uint64_t *ret) { + sd_json_variant *id_variant; + + assert(v); + assert(ret); + + id_variant = sd_json_variant_by_key(v, "id"); + if (!id_variant) { + *ret = 0; + return 0; + } + if (!sd_json_variant_is_unsigned(id_variant)) + return log_debug_errno(SYNTHETIC_ERRNO(EBADMSG), + "QMP response 'id' field is not an unsigned integer."); + + *ret = sd_json_variant_unsigned(id_variant); + return 1; +} + +/* Returns 0 on success (ret_result = "return" value), -EIO on QMP error (reterr_desc set). */ +static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, const char **reterr_desc) { + const char *desc; + + desc = qmp_extract_error_description(v); + if (desc) { + if (reterr_desc) + *reterr_desc = desc; + return -EIO; + } + + if (ret_result) + *ret_result = sd_json_variant_by_key(v, "return"); + return 0; +} + +static int qmp_client_build_command( + QmpClient *c, + const char *command, + sd_json_variant *arguments, + sd_json_variant **ret, + uint64_t *ret_id) { + + uint64_t id; + int r; + + assert(c); + assert(command); + assert(ret); + assert(ret_id); + + id = c->next_id++; + + r = sd_json_buildo( + ret, + SD_JSON_BUILD_PAIR_STRING("execute", command), + SD_JSON_BUILD_PAIR_CONDITION(!!arguments, "arguments", SD_JSON_BUILD_VARIANT(arguments)), + SD_JSON_BUILD_PAIR_UNSIGNED("id", id)); + if (r < 0) + return r; + + *ret_id = id; + return 0; +} + +/* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */ +static int qmp_client_dispatch_reply(QmpClient *c) { + sd_json_variant *result = NULL; + const char *desc = NULL; + uint64_t id; + int error, r; + + assert(c); + + if (!c->current) + return 0; + + /* Events have an "event" key */ + if (sd_json_variant_by_key(c->current, "event")) { + _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); + qmp_client_dispatch_event(c, v); + return 1; + } + + /* Command responses carry an "id" matching a request we sent */ + r = qmp_extract_response_id(c->current, &id); + if (r < 0) { + qmp_client_clear_current(c); + return json_stream_log_errno(&c->stream, r, "Discarding QMP response with malformed id: %m"); + } + if (r == 0) { + qmp_client_clear_current(c); + json_stream_log(&c->stream, "Discarding unrecognized QMP message"); + return 1; + } + + _cleanup_free_ QmpSlot *pending = set_remove(c->slots, &(QmpSlot) { .id = id }); + if (!pending) { + qmp_client_clear_current(c); + json_stream_log(&c->stream, "Discarding QMP response with unknown id %" PRIu64, id); + return 1; + } + + _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); + error = qmp_parse_response(v, &result, &desc); + + r = pending->callback(c, result, desc, error, pending->userdata); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Command callback returned error, ignoring: %m"); + + return 1; +} + +/* Fail all pending async commands with the given error. Called on disconnect. */ +static void qmp_client_fail_pending(QmpClient *c, int error) { + QmpSlot *p; + int r; + + assert(c); + + while ((p = set_steal_first(c->slots))) { + r = p->callback(c, /* result= */ NULL, /* error_desc= */ NULL, error, p->userdata); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Command callback returned error, ignoring: %m"); + free(p); + } +} + +/* Synthetic SHUTDOWN on unexpected disconnect so subscribers learn the VM is gone. */ +static void qmp_client_emit_synthetic_shutdown(QmpClient *c) { + _cleanup_(sd_json_variant_unrefp) sd_json_variant *data = NULL; + int r; + + assert(c); + + if (!c->event_callback) + return; + + r = sd_json_buildo( + &data, + SD_JSON_BUILD_PAIR_BOOLEAN("guest", false), + SD_JSON_BUILD_PAIR_STRING("reason", "disconnected")); + if (r < 0) { + json_stream_log_errno(&c->stream, r, "Failed to build synthetic SHUTDOWN event data, skipping: %m"); + return; + } + + r = c->event_callback(c, "SHUTDOWN", data, c->event_userdata); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Event callback returned error, ignoring: %m"); +} + +static bool qmp_client_handle_disconnect(QmpClient *c) { + assert(c); + + if (c->state == QMP_CLIENT_DISCONNECTED) + return false; + + c->state = QMP_CLIENT_DISCONNECTED; + + /* Disable defer event source so we don't busy-loop on the EOF condition. */ + if (c->defer_event_source) + (void) sd_event_source_set_enabled(c->defer_event_source, SD_EVENT_OFF); + + qmp_client_fail_pending(c, -ECONNRESET); + qmp_client_emit_synthetic_shutdown(c); + if (c->disconnect_callback) + c->disconnect_callback(c, c->disconnect_userdata); + + return true; +} + +static bool qmp_client_test_disconnect(QmpClient *c) { + assert(c); + + /* Already disconnected? */ + if (c->state == QMP_CLIENT_DISCONNECTED) + return false; + + if (!json_stream_should_disconnect(&c->stream)) + return false; + + return qmp_client_handle_disconnect(c); +} + +/* INITIAL → greeting → GREETING_RECEIVED → qmp_capabilities → CAPABILITIES_SENT → response → RUNNING. */ +static int qmp_client_dispatch_handshake(QmpClient *c) { + int r; + + assert(c); + assert(QMP_CLIENT_STATE_IS_HANDSHAKE(c->state)); + + if (!c->current) + return 0; + + /* Defensive: QEMU shouldn't emit events during capability negotiation, but if one + * arrives, dispatch it as an event rather than mis-parsing it as a handshake reply. */ + if (sd_json_variant_by_key(c->current, "event")) { + _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); + qmp_client_dispatch_event(c, v); + return 1; + } + + switch (c->state) { + + case QMP_CLIENT_HANDSHAKE_INITIAL: { + /* Waiting for QMP greeting. Take ownership so by_key()'s borrowed pointer + * stays valid through the case scope. */ + _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); + if (!sd_json_variant_by_key(v, "QMP")) + return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO), + "Expected QMP greeting, got something else"); + + c->state = QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED; + + /* Fall through to immediately send capabilities */ + _fallthrough_; + } + + case QMP_CLIENT_HANDSHAKE_GREETING_RECEIVED: { + /* Send qmp_capabilities command */ + _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL; + r = sd_json_buildo( + &cmd, + SD_JSON_BUILD_PAIR_STRING("execute", "qmp_capabilities"), + SD_JSON_BUILD_PAIR_UNSIGNED("id", c->next_id++)); + if (r < 0) + return r; + + r = json_stream_enqueue(&c->stream, cmd); + if (r < 0) + return r; + + c->state = QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT; + return 1; + } + + case QMP_CLIENT_HANDSHAKE_CAPABILITIES_SENT: { + /* Take ownership so desc (borrowed from v's "error.desc") survives the format string. */ + _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = TAKE_PTR(c->current); + const char *desc = NULL; + r = qmp_parse_response(v, /* ret_result= */ NULL, &desc); + if (r < 0) + return json_stream_log_errno(&c->stream, SYNTHETIC_ERRNO(EPROTO), + "qmp_capabilities failed: %s", desc); + + c->state = QMP_CLIENT_RUNNING; + return 1; + } + + default: + assert_not_reached(); + } +} + +static int qmp_client_dispatch(QmpClient *c) { + assert(c); + + if (!c->current) + return 0; + + if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state)) + return qmp_client_dispatch_handshake(c); + + return qmp_client_dispatch_reply(c); +} + +/* Single step: write → dispatch → parse → read → disconnect. Matches sd_varlink_process(). */ +int qmp_client_process(QmpClient *c) { + int r; + + assert(c); + + if (c->state < 0 || c->state == QMP_CLIENT_DISCONNECTED) + return -ENOTCONN; + + /* Pin against a callback dropping the last ref mid-dispatch. Matches sd_varlink_process(). */ + qmp_client_ref(c); + + /* 1. Write — drain output buffer */ + r = json_stream_write(&c->stream); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Failed to write to QMP socket: %m"); + if (r != 0) + goto finish; + + /* 2. Dispatch — route based on state */ + r = qmp_client_dispatch(c); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Failed to dispatch QMP message: %m"); + if (r != 0) + goto finish; + + /* 3. Parse — extract one complete message into c->current */ + if (!c->current) { + r = json_stream_parse(&c->stream, &c->current); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Failed to parse QMP message: %m"); + if (r != 0) + goto finish; + } + + /* 4. Read — fill input buffer from fd */ + if (!c->current) { + r = json_stream_read(&c->stream); + if (r < 0) + json_stream_log_errno(&c->stream, r, "Failed to read from QMP socket: %m"); + if (r != 0) + goto finish; + } + + /* 5. Test disconnect */ + if (qmp_client_test_disconnect(c)) { + r = 1; + goto finish; + } + +finish: + /* Re-arm defer source on progress so we get called again next iteration. */ + if (r >= 0 && c->defer_event_source) { + int q; + + q = sd_event_source_set_enabled(c->defer_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF); + if (q < 0) + r = json_stream_log_errno(&c->stream, q, "Failed to enable deferred event source: %m"); + } + + /* -ENOBUFS is the buffered stream's 16 MiB cap, not a transport error — propagate without disconnecting. */ + if (r < 0 && r != -ENOBUFS && c->state != QMP_CLIENT_DISCONNECTED) + qmp_client_handle_disconnect(c); + + qmp_client_unref(c); + return r; +} + +int qmp_client_wait(QmpClient *c, uint64_t timeout_usec) { + assert(c); + + if (c->state == QMP_CLIENT_DISCONNECTED) + return -ENOTCONN; + + return json_stream_wait(&c->stream, timeout_usec); +} + +bool qmp_client_is_idle(QmpClient *c) { + assert(c); + return set_isempty(c->slots); +} + +bool qmp_client_is_disconnected(QmpClient *c) { + assert(c); + return c->state == QMP_CLIENT_DISCONNECTED; +} + +/* Map our state to the transport phase used for POLLIN / salvage / timeout decisions. */ +static JsonStreamPhase qmp_client_phase(void *userdata) { + QmpClient *c = ASSERT_PTR(userdata); + + /* A parsed-but-undispatched message is mid-processing, not waiting on the wire. */ + if (c->current) + return JSON_STREAM_PHASE_OTHER; + + /* During handshake we're waiting for the greeting or qmp_capabilities response. */ + if (QMP_CLIENT_STATE_IS_HANDSHAKE(c->state)) + return JSON_STREAM_PHASE_AWAITING_REPLY; + + /* Running with pending async commands — waiting for their responses. */ + if (c->state == QMP_CLIENT_RUNNING && !set_isempty(c->slots)) + return JSON_STREAM_PHASE_AWAITING_REPLY; + + /* Running with no pending commands — waiting for unsolicited events. */ + if (c->state == QMP_CLIENT_RUNNING) + return JSON_STREAM_PHASE_READING; + + return JSON_STREAM_PHASE_OTHER; +} + +static int qmp_client_dispatch_cb(void *userdata) { + QmpClient *c = ASSERT_PTR(userdata); + return qmp_client_process(c); +} + +static int qmp_client_defer_callback(sd_event_source *source, void *userdata) { + QmpClient *c = ASSERT_PTR(userdata); + + assert(source); + + (void) qmp_client_process(c); + + return 1; +} + +/* Drive handshake to completion. Matches sd-bus's bus_ensure_running(). */ +static int qmp_client_ensure_running(QmpClient *c) { + int r; + + assert(c); + + if (c->state == QMP_CLIENT_RUNNING) + return 1; + + for (;;) { + if (c->state < 0 || c->state == QMP_CLIENT_DISCONNECTED) + return -ENOTCONN; + + r = qmp_client_process(c); + if (r < 0) + return r; + if (c->state == QMP_CLIENT_RUNNING) + return 1; + if (r > 0) + continue; + + r = qmp_client_wait(c, USEC_INFINITY); + if (r < 0) + return r; + } +} + +static void qmp_client_detach_event(QmpClient *c) { + if (!c) + return; + + c->defer_event_source = sd_event_source_disable_unref(c->defer_event_source); + c->quit_event_source = sd_event_source_disable_unref(c->quit_event_source); + json_stream_detach_event(&c->stream); +} + +static void qmp_client_clear(QmpClient *c) { + assert(c); + + qmp_client_handle_disconnect(c); + qmp_client_detach_event(c); + qmp_client_clear_current(c); + json_stream_done(&c->stream); + c->slots = set_free(c->slots); +} + +/* Blocks until output buffer is empty. Matches sd_varlink_flush(). */ +static int qmp_client_flush(QmpClient *c) { + if (!c) + return 0; + + if (c->state == QMP_CLIENT_DISCONNECTED) + return -ENOTCONN; + + return json_stream_flush(&c->stream); +} + +/* Notify callbacks, fire disconnect, detach sources, close fd. Matches sd_varlink_close(). */ +static int qmp_client_close(QmpClient *c) { + if (!c) + return 0; + + /* Take a temporary ref to prevent destruction mid-callback, + * matching sd_varlink_close()'s pattern. */ + qmp_client_ref(c); + qmp_client_clear(c); + qmp_client_unref(c); + + return 1; +} + +static int qmp_client_quit_callback(sd_event_source *source, void *userdata) { + QmpClient *c = ASSERT_PTR(userdata); + + assert(source); + + qmp_client_flush(c); + qmp_client_close(c); + + return 1; +} + +int qmp_client_connect_fd(QmpClient **ret, int fd) { + _cleanup_(qmp_client_unrefp) QmpClient *c = NULL; + int r; + + assert(ret); + assert(fd >= 0); + + c = new(QmpClient, 1); + if (!c) + return -ENOMEM; + + *c = (QmpClient) { + .n_ref = 1, + .state = QMP_CLIENT_HANDSHAKE_INITIAL, + .next_id = 1, + }; + + const JsonStreamParams params = { + .delimiter = "\r\n", + .phase = qmp_client_phase, + .dispatch = qmp_client_dispatch_cb, + .userdata = c, + }; + + r = json_stream_init(&c->stream, ¶ms); + if (r < 0) + return r; + + r = json_stream_connect_fd_pair(&c->stream, fd, fd); + if (r < 0) + return r; + + *ret = TAKE_PTR(c); + return 0; +} + +int qmp_client_attach_event(QmpClient *c, sd_event *event, int64_t priority) { + int r; + + assert(c); + assert(event); + assert(!json_stream_get_event(&c->stream)); + + r = json_stream_attach_event(&c->stream, event, priority); + if (r < 0) + return r; + + sd_event *ev = json_stream_get_event(&c->stream); + + r = sd_event_add_exit(ev, &c->quit_event_source, qmp_client_quit_callback, c); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(c->quit_event_source, priority); + if (r < 0) + goto fail; + + (void) sd_event_source_set_description(c->quit_event_source, "qmp-client-quit"); + + r = sd_event_add_defer(ev, &c->defer_event_source, qmp_client_defer_callback, c); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(c->defer_event_source, priority); + if (r < 0) + goto fail; + + r = sd_event_source_set_enabled(c->defer_event_source, SD_EVENT_OFF); + if (r < 0) + goto fail; + + (void) sd_event_source_set_description(c->defer_event_source, "qmp-client-defer"); + + return 0; + +fail: + qmp_client_detach_event(c); + return r; +} + +/* Cleanup hook: closes any fds in *args not yet transferred to the stream. */ +static QmpClientArgs* qmp_client_args_close_fds(QmpClientArgs *p) { + assert(p); + close_many_unset(p->fds_consume, p->n_fds); + return NULL; +} + +DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds); + +/* Transfer fds to the stream. On partial failure narrow args to the unstaged tail so + * the caller's cleanup closes only the untransferred fds. */ +static int qmp_client_stage_fds(QmpClient *c, QmpClientArgs *args) { + int r; + + assert(c); + + if (!args || args->n_fds == 0) + return 0; + + assert(args->fds_consume); + + for (size_t i = 0; i < args->n_fds; i++) { + r = json_stream_push_fd(&c->stream, args->fds_consume[i]); + if (r < 0) { + /* Already-staged are owned by the stream; narrow args to the rest. */ + json_stream_reset_pushed_fds(&c->stream); + args->fds_consume = &args->fds_consume[i]; + args->n_fds -= i; + return r; + } + } + + args->n_fds = 0; + return 0; +} + +int qmp_client_invoke( + QmpClient *c, + const char *command, + QmpClientArgs *args, + qmp_command_callback_t callback, + void *userdata) { + + _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL; + _cleanup_free_ QmpSlot *pending = NULL; + /* Closes any fds in args not yet handed to the stream on every early-return path; + * TAKE_PTR()'d on the success path below once stage_fds has consumed them. */ + _cleanup_(qmp_client_args_close_fdsp) QmpClientArgs *fds_owner = args; + uint64_t id; + int r; + + assert(c); + assert(command); + assert(callback); + + r = qmp_client_ensure_running(c); + if (r < 0) + return r; + + r = qmp_client_build_command(c, command, args ? args->arguments : NULL, &cmd, &id); + if (r < 0) + return r; + + pending = new(QmpSlot, 1); + if (!pending) + return -ENOMEM; + + *pending = (QmpSlot) { + .id = id, + .callback = callback, + .userdata = userdata, + }; + + r = set_ensure_put(&c->slots, &qmp_slot_hash_ops, pending); + if (r < 0) + return r; + assert(r > 0); + + /* Stage AFTER ensure_running() drained internal enqueues so the next enqueue is ours. */ + r = qmp_client_stage_fds(c, args); + if (r < 0) { + set_remove(c->slots, pending); + return r; + } + + r = json_stream_enqueue(&c->stream, cmd); + if (r < 0) { + json_stream_reset_pushed_fds(&c->stream); + set_remove(c->slots, pending); + return r; + } + + /* Arm defer so process() drains the output on the next iteration. */ + if (c->defer_event_source) + (void) sd_event_source_set_enabled(c->defer_event_source, SD_EVENT_ON); + + TAKE_PTR(pending); + TAKE_PTR(fds_owner); + return 0; +} + +void qmp_client_bind_event(QmpClient *c, qmp_event_callback_t callback, void *userdata) { + assert(c); + c->event_callback = callback; + c->event_userdata = userdata; +} + +void qmp_client_bind_disconnect(QmpClient *c, qmp_disconnect_callback_t callback, void *userdata) { + assert(c); + c->disconnect_callback = callback; + c->disconnect_userdata = userdata; +} + +int qmp_client_set_description(QmpClient *c, const char *description) { + assert(c); + return json_stream_set_description(&c->stream, description); +} + +sd_event* qmp_client_get_event(QmpClient *c) { + assert(c); + return json_stream_get_event(&c->stream); +} + +unsigned qmp_client_next_fdset_id(QmpClient *c) { + assert(c); + return c->next_fdset_id++; +} + +bool qmp_schema_has_member(sd_json_variant *schema, const char *member_name) { + sd_json_variant *entry; + + assert(member_name); + + if (!sd_json_variant_is_array(schema)) + return false; + + JSON_VARIANT_ARRAY_FOREACH(entry, schema) { + if (!sd_json_variant_is_object(entry)) + continue; + + sd_json_variant *meta = sd_json_variant_by_key(entry, "meta-type"); + if (!meta || !streq_ptr(sd_json_variant_string(meta), "object")) + continue; + + sd_json_variant *members = sd_json_variant_by_key(entry, "members"); + if (!sd_json_variant_is_array(members)) + continue; + + sd_json_variant *m; + JSON_VARIANT_ARRAY_FOREACH(m, members) { + if (!sd_json_variant_is_object(m)) + continue; + sd_json_variant *mn = sd_json_variant_by_key(m, "name"); + if (mn && streq_ptr(sd_json_variant_string(mn), member_name)) + return true; + } + } + + return false; +} diff --git a/src/shared/qmp-client.h b/src/shared/qmp-client.h new file mode 100644 index 00000000000..dbe65162e97 --- /dev/null +++ b/src/shared/qmp-client.h @@ -0,0 +1,78 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include "shared-forward.h" + +typedef int (*qmp_event_callback_t)( + QmpClient *client, + const char *event, + sd_json_variant *data, + void *userdata); + +typedef void (*qmp_disconnect_callback_t)( + QmpClient *client, + void *userdata); + +/* Success: (result, NULL, 0). QMP error: (NULL, desc, -EIO). Transport: (NULL, NULL, -errno). */ +typedef int (*qmp_command_callback_t)( + QmpClient *client, + sd_json_variant *result, + const char *error_desc, + int error, + void *userdata); + +/* Bundles arguments + fds for one command. Construct fresh per invoke via the macros below. */ +typedef struct QmpClientArgs { + sd_json_variant *arguments; + int *fds_consume; + size_t n_fds; +} QmpClientArgs; + +#define QMP_CLIENT_ARGS(args_) \ + (&(QmpClientArgs){ .arguments = (args_) }) +#define QMP_CLIENT_ARGS_FD(args_, fd_) \ + (&(QmpClientArgs){ .arguments = (args_), .fds_consume = (int[]){ (fd_) }, .n_fds = 1 }) + +/* Takes ownership of fd; handshake runs lazily on first invoke or from the event loop. */ +int qmp_client_connect_fd(QmpClient **ret, int fd); + +int qmp_client_attach_event(QmpClient *c, sd_event *event, int64_t priority); + +/* Single non-blocking pump step: write → dispatch → parse → read → disconnect. Returns >0 if + * progress was made, 0 if idle, <0 on error (-ENOTCONN on disconnect). Same contract as + * sd_varlink_process(). */ +int qmp_client_process(QmpClient *c); + +/* Block on the transport fd until readable, or until timeout (USEC_INFINITY for no timeout). + * Same contract as sd_varlink_wait(). */ +int qmp_client_wait(QmpClient *c, uint64_t timeout_usec); + +/* True iff there are no outstanding command replies (slots set is empty). Useful as the pump-loop + * exit condition for callers driving the client synchronously via process() + wait(). */ +bool qmp_client_is_idle(QmpClient *c); + +/* True iff the connection is dead. Stable terminal state — once set, it stays set. */ +bool qmp_client_is_disconnected(QmpClient *c); + +/* Async send. Returns 0 on send (callback will fire later), negative errno on failure. */ +int qmp_client_invoke( + QmpClient *client, + const char *command, + QmpClientArgs *args, + qmp_command_callback_t callback, + void *userdata); + +void qmp_client_bind_event(QmpClient *c, qmp_event_callback_t callback, void *userdata); +void qmp_client_bind_disconnect(QmpClient *c, qmp_disconnect_callback_t callback, void *userdata); +int qmp_client_set_description(QmpClient *c, const char *description); +sd_event* qmp_client_get_event(QmpClient *c); +unsigned qmp_client_next_fdset_id(QmpClient *client); + +QmpClient* qmp_client_unref(QmpClient *p); + +DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClient *, qmp_client_unref); + +/* Returns true iff any object entry in schema (result of query-qmp-schema) has a member with this + * name. QEMU's introspection replaces type names with opaque numeric ids, so lookup-by-type-name is + * impossible — but member names are real. Use only when the member name is unique in the schema. */ +bool qmp_schema_has_member(sd_json_variant *schema, const char *member_name); diff --git a/src/shared/shared-forward.h b/src/shared/shared-forward.h index e07b25c056a..38349d9dbbc 100644 --- a/src/shared/shared-forward.h +++ b/src/shared/shared-forward.h @@ -78,6 +78,7 @@ typedef struct MountOptions MountOptions; typedef struct MStack MStack; typedef struct OpenFile OpenFile; typedef struct Pkcs11EncryptedKey Pkcs11EncryptedKey; +typedef struct QmpClient QmpClient; typedef struct Table Table; typedef struct Tpm2Context Tpm2Context; typedef struct Tpm2Handle Tpm2Handle;