/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include "sd-event.h"
+#include "sd-future.h"
#include "sd-json.h"
#include "alloc-util.h"
return 1;
}
-/* Returns 0 on success (ret_result = "return" value), -EIO on QMP error (reterr_desc set). */
-static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, const char **reterr_desc) {
+/* Returns 0 on success (ret_result = freshly reffed "return" value), -EIO on QMP error
+ * (ret_error_desc set to a freshly allocated string). Caller owns both outputs. */
+static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, char **reterr_error_desc) {
const char *desc;
desc = qmp_extract_error_description(v);
if (desc) {
- if (reterr_desc)
- *reterr_desc = desc;
+ if (reterr_error_desc) {
+ *reterr_error_desc = strdup(desc);
+ if (!*reterr_error_desc)
+ return -ENOMEM;
+ }
return -EIO;
}
if (ret_result)
- *ret_result = sd_json_variant_by_key(v, "return");
+ *ret_result = sd_json_variant_ref(sd_json_variant_by_key(v, "return"));
return 0;
}
/* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */
static int qmp_client_dispatch(QmpClient *c) {
- sd_json_variant *result = NULL;
- const char *desc = NULL;
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
+ _cleanup_free_ char *desc = NULL;
uint64_t id;
int error, r;
}
/* Synchronous slot (no callback): leave c->current pinned so qmp_client_call() can
- * pick up the reply and hand out borrowed pointers into it. The sync caller owns a
- * ref on the slot and detects completion by observing slot->client turning NULL. */
+ * pick the reply up after its pump loop. The sync caller owns a ref on the slot and
+ * detects completion by observing slot->client turning NULL. */
if (!slot->callback) {
qmp_slot_disconnect(slot, /* unref= */ true);
return 1;
qmp_client_detach_event(c);
qmp_client_clear_current(c);
json_stream_done(&c->stream);
+ /* qmp_client_handle_disconnect() above drained every entry via qmp_client_fail_pending();
+ * the set is borrow-only for non-floating slots, so set_free() can't safely run a
+ * destructor over leftovers — enforce the drain invariant instead. */
+ assert(set_isempty(c->slots));
c->slots = set_free(c->slots);
}
/* Shared send path for qmp_client_invoke() and qmp_client_call(). A NULL callback registers
* a "synchronous" slot: dispatch_reply leaves c->current pinned on match instead of invoking
- * a callback, so qmp_client_call() can hand out borrowed pointers into the reply. If ret_slot
+ * a callback, so qmp_client_call() can pick the reply up after its pump loop. If ret_slot
* is NULL the slot is allocated as floating (owned by c->slots); otherwise a reference is
* handed back to the caller. */
static int qmp_client_send(
return qmp_client_send(c, command, args, callback, userdata, ret_slot);
}
+typedef struct QmpFuture {
+ QmpSlot *slot; /* owned, non-floating; NULL once disconnected */
+ sd_json_variant *result;
+ char *error_desc;
+} QmpFuture;
+
+static void* qmp_future_alloc(void) {
+ return new0(QmpFuture, 1);
+}
+
+static void qmp_future_free(sd_future *f) {
+ QmpFuture *qf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+ qmp_slot_unref(qf->slot);
+ sd_json_variant_unref(qf->result);
+ free(qf->error_desc);
+ free(qf);
+}
+
+static int qmp_future_cancel(sd_future *f) {
+ QmpFuture *qf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
+
+ /* Drop the pending slot so dispatch_reply won't try to fire our callback (and touch
+ * freed memory) when the reply eventually arrives. */
+ qf->slot = qmp_slot_unref(qf->slot);
+ return sd_future_resolve(f, -ECANCELED);
+}
+
+static const sd_future_ops qmp_call_future_ops = {
+ .size = sizeof(sd_future_ops),
+ .alloc = qmp_future_alloc,
+ .free = qmp_future_free,
+ .cancel = qmp_future_cancel,
+};
+
+static int qmp_future_callback(
+ QmpClient *c,
+ sd_json_variant *result,
+ const char *desc,
+ int error,
+ void *userdata) {
+
+ sd_future *f = ASSERT_PTR(userdata);
+ QmpFuture *qf = ASSERT_PTR(sd_future_get_private(f));
+ int r;
+
+ assert(result || desc || error);
+
+ if (result)
+ qf->result = sd_json_variant_ref(result);
+ if (desc) {
+ qf->error_desc = strdup(desc);
+ if (!qf->error_desc)
+ /* No usable reply payload to surface — propagate as transport-style
+ * failure so suspend() / sd_future_result() see the OOM. */
+ return sd_future_resolve(f, -ENOMEM);
+ }
+
+ /* Resolve with 0 on a success reply and -EIO on a QMP-level error (matching the synchronous
+ * path's errno for the desc-without-ret_error_desc case), so a caller awaiting the future
+ * learns about call failures from the resolution value alone. The reply payload itself
+ * (result or error_desc) is always stashed on the QmpFuture so future_get_qmp_reply() can
+ * hand the description string back on top of the bare -EIO. With no reply at all (transport
+ * failure, disconnect), resolve with the propagated transport errno; cancellation surfaces
+ * as -ECANCELED via qmp_future_cancel(). */
+ if (result)
+ r = 0;
+ else if (desc)
+ r = -EIO;
+ else
+ r = error;
+
+ return sd_future_resolve(f, r);
+}
+
+int qmp_client_call_future(
+ QmpClient *c,
+ const char *command,
+ QmpClientArgs *args,
+ sd_future **ret) {
+
+ int r;
+
+ assert(c);
+ assert(command);
+ assert(ret);
+
+ _cleanup_(sd_future_unrefp) sd_future *f = NULL;
+ r = sd_future_new(&qmp_call_future_ops, &f);
+ if (r < 0)
+ return r;
+
+ QmpFuture *qf = sd_future_get_private(f);
+
+ r = qmp_client_send(c, command, args, qmp_future_callback, f, &qf->slot);
+ if (r < 0)
+ return r;
+
+ *ret = TAKE_PTR(f);
+ return 0;
+}
+
+/* Extract the reply from a resolved qmp_client_call_future(). Returns 1 on success (with
+ * *ret_result a fresh reference the caller unrefs), -EIO on a QMP-level error (with the detail
+ * description copied into *ret_error_desc when the caller passed one to receive it), and the
+ * future's negative resume errno when no reply landed at all (transport failure / cancellation).
+ */
+int future_get_qmp_reply(sd_future *f, sd_json_variant **ret_result, char **reterr_error_desc) {
+ assert(f);
+ assert(sd_future_get_ops(f) == &qmp_call_future_ops);
+ assert(sd_future_state(f) == SD_FUTURE_RESOLVED);
+
+ QmpFuture *qf = ASSERT_PTR(sd_future_get_private(f));
+
+ /* No reply at all: transport failure or cancellation — surface the future result. */
+ if (!qf->result && !qf->error_desc)
+ return sd_future_result(f);
+
+ if (qf->error_desc) {
+ if (reterr_error_desc) {
+ char *desc = strdup(qf->error_desc);
+ if (!desc)
+ return -ENOMEM;
+ *reterr_error_desc = desc;
+ }
+ return -EIO;
+ }
+
+ if (reterr_error_desc)
+ *reterr_error_desc = NULL;
+ if (ret_result)
+ *ret_result = sd_json_variant_ref(qf->result);
+
+ return 1;
+}
+
+static int qmp_client_call_suspend(
+ QmpClient *c,
+ const char *command,
+ QmpClientArgs *args,
+ sd_json_variant **ret_result,
+ char **ret_error_desc) {
+
+ int r;
+
+ assert(c);
+ assert(command);
+ assert(sd_fiber_is_running());
+
+ _cleanup_(sd_future_cancel_wait_unrefp) sd_future *call = NULL;
+ r = qmp_client_call_future(c, command, args, &call);
+ if (r < 0)
+ return r;
+
+ r = sd_fiber_suspend();
+
+ /* If the future isn't resolved, the suspend was interrupted before a reply arrived (fiber
+ * cancelled, fiber-wide SD_FIBER_TIMEOUT scope expired, …). There's no reply to extract,
+ * so surface the resume error directly. When the future is resolved, future_get_qmp_reply()
+ * already encodes success (1), QMP-level error (-EIO with the desc captured if asked for),
+ * and no-reply (negative future result) — pass it through. */
+ if (sd_future_state(call) != SD_FUTURE_RESOLVED)
+ return r;
+
+ return future_get_qmp_reply(call, ret_result, ret_error_desc);
+}
+
int qmp_client_call(
QmpClient *c,
const char *command,
QmpClientArgs *args,
sd_json_variant **ret_result,
- const char **ret_error_desc) {
+ char **reterr_error_desc) {
- _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
+ _cleanup_free_ char *desc = NULL;
int r;
assert_return(c, -EINVAL);
assert_return(command, -EINVAL);
- /* Drop any reply pinned by a previous qmp_client_call() before we pin a new one. */
- qmp_client_clear_current(c);
+ /* If we're on a fiber sharing the QMP client's event loop, use the async + suspend path so
+ * multiple concurrent qmp_client_call() invocations across fibers don't deadlock each other
+ * on the process+wait pump. */
+ if (sd_fiber_is_running() && qmp_client_get_event(c) == sd_fiber_get_event())
+ return qmp_client_call_suspend(c, command, args, ret_result, reterr_error_desc);
+
+ _cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
/* NULL callback marks this as a synchronous slot: dispatch_reply matches on id like
* any other slot (so stray unknown-id replies still get logged and dropped), but
return r;
}
- sd_json_variant *result = NULL;
- const char *desc = NULL;
- int error = qmp_parse_response(c->current, &result, &desc);
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *current = TAKE_PTR(c->current);
+ r = qmp_parse_response(current, &result, &desc);
+ if (r < 0 && r != -EIO)
+ return r;
- /* If caller doesn't ask for the error string, surface the error as the return code. */
- if (!ret_error_desc && error < 0)
- return error;
+ /* QMP-level error: copy the description into *ret_error_desc when the caller asked for it,
+ * and surface the failure via the return value (matching qmp_client_call_suspend() /
+ * sd_bus_call()'s "negative on error" convention). */
+ if (desc) {
+ if (reterr_error_desc)
+ *reterr_error_desc = TAKE_PTR(desc);
+ return -EIO;
+ }
if (ret_result)
- *ret_result = result;
- if (ret_error_desc)
- *ret_error_desc = desc;
+ *ret_result = TAKE_PTR(result);
+ if (reterr_error_desc)
+ *reterr_error_desc = NULL;
return 1;
}