#include "errno-util.h"
#include "fd-util.h"
-#include "io-util.h"
+#include "json-stream.h"
#include "pidref.h"
#include "process-util.h"
#include "qmp-client.h"
-#include "socket-util.h"
#include "string-util.h"
#include "tests.h"
-/* Mock QMP server: runs in the child process of a fork, communicates via one end of a socketpair. */
+/* Mock QMP server: runs in the child process of a fork, communicates via one end of a socketpair.
+ * Uses JsonStream as the transport so framing (CRLF delimiter, message queuing, SCM_RIGHTS) is
+ * handled the same way as on the client side — individual recv() syscalls may coalesce multiple
+ * messages, and the parser must re-emit each one on its own. */
-static void mock_qmp_write_json(int fd, sd_json_variant *v) {
- _cleanup_free_ char *s = NULL;
+/* We drive the stream manually via read/parse/wait; always report READING so json_stream_wait()
+ * asks for POLLIN. */
+static JsonStreamPhase mock_qmp_phase(void *userdata) {
+ return JSON_STREAM_PHASE_READING;
+}
- ASSERT_OK(sd_json_variant_format(v, 0, &s));
- ASSERT_NOT_NULL(strextend(&s, "\r\n"));
- ASSERT_OK(loop_write(fd, s, SIZE_MAX));
+/* Never reached — we don't wire the mock stream up to sd-event — but required at init. */
+static int mock_qmp_dispatch(void *userdata) {
+ return 0;
}
-static void mock_qmp_write_literal(int fd, const char *msg) {
- ASSERT_OK(loop_write(fd, msg, SIZE_MAX));
- ASSERT_OK(loop_write(fd, "\r\n", 2));
+static void mock_qmp_init(JsonStream *s, int fd) {
+ static const JsonStreamParams params = {
+ .delimiter = "\r\n",
+ .phase = mock_qmp_phase,
+ .dispatch = mock_qmp_dispatch,
+ };
+
+ ASSERT_OK(json_stream_init(s, ¶ms));
+ ASSERT_OK(json_stream_connect_fd_pair(s, fd, fd));
}
-/* Read a command from the QMP client, verify it contains the expected command name, extract the id,
- * and send a reply with that id. If reply_data is NULL, an empty return object is sent. */
-static void mock_qmp_expect_and_reply(int fd, const char *expected_command, sd_json_variant *reply_data) {
- _cleanup_free_ char *buf = NULL;
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *reply_obj = NULL, *response = NULL;
+/* Read one complete JSON message, blocking until available. Handles the case where multiple
+ * client messages arrived coalesced into a single recv(): the parser walks the input buffer
+ * one CRLF-delimited message at a time. */
+static void mock_qmp_recv(JsonStream *s, sd_json_variant **ret) {
+ int r;
- buf = ASSERT_NOT_NULL(new(char, 4096));
+ for (;;) {
+ r = ASSERT_OK(json_stream_parse(s, ret));
+ if (r > 0)
+ return;
- ssize_t n = read(fd, buf, 4095);
- assert_se(n > 0);
- buf[n] = '\0';
+ r = ASSERT_OK(json_stream_read(s));
+ if (r > 0)
+ continue;
- ASSERT_OK(sd_json_parse(buf, 0, &cmd, NULL, NULL));
+ ASSERT_OK(json_stream_wait(s, USEC_INFINITY));
+ }
+}
+
+/* Enqueue one JSON variant and block until it has been fully written. */
+static void mock_qmp_send(JsonStream *s, sd_json_variant *v) {
+ ASSERT_OK(json_stream_enqueue(s, v));
+ ASSERT_OK(json_stream_flush(s));
+}
+
+/* Parse a literal JSON string and send it. Used for fixed greetings and unsolicited events. */
+static void mock_qmp_send_literal(JsonStream *s, const char *msg) {
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *v = NULL;
+
+ ASSERT_OK(sd_json_parse(msg, 0, &v, NULL, NULL));
+ mock_qmp_send(s, v);
+}
+
+/* Read a command from the client, verify it contains the expected command name, and send a
+ * reply carrying the same id. If reply_data is NULL, an empty return object is sent. */
+static void mock_qmp_expect_and_reply(JsonStream *s, const char *expected_command, sd_json_variant *reply_data) {
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *reply_obj = NULL, *response = NULL;
+
+ mock_qmp_recv(s, &cmd);
sd_json_variant *execute = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "execute"));
ASSERT_STREQ(sd_json_variant_string(execute), expected_command);
SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_VARIANT(reply_data ?: reply_obj)),
SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
- mock_qmp_write_json(fd, response);
+ mock_qmp_send(s, response);
+}
+
+/* Same shape as mock_qmp_expect_and_reply() but replies with a QMP error object. */
+static void mock_qmp_expect_and_reply_error(JsonStream *s, const char *expected_command, const char *error_desc) {
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *error_obj = NULL, *response = NULL;
+
+ mock_qmp_recv(s, &cmd);
+
+ sd_json_variant *execute = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "execute"));
+ ASSERT_STREQ(sd_json_variant_string(execute), expected_command);
+
+ sd_json_variant *id = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "id"));
+
+ ASSERT_OK(sd_json_buildo(
+ &error_obj,
+ SD_JSON_BUILD_PAIR_STRING("class", "GenericError"),
+ SD_JSON_BUILD_PAIR_STRING("desc", error_desc)));
+
+ ASSERT_OK(sd_json_buildo(
+ &response,
+ SD_JSON_BUILD_PAIR("error", SD_JSON_BUILD_VARIANT(error_obj)),
+ SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
+
+ mock_qmp_send(s, response);
}
static _noreturn_ void mock_qmp_server(int fd) {
+ _cleanup_(json_stream_done) JsonStream s = {};
_cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL;
+ mock_qmp_init(&s, fd);
+
/* Send QMP greeting */
- mock_qmp_write_literal(fd,
+ mock_qmp_send_literal(&s,
"{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 2, \"major\": 9}}, \"capabilities\": [\"oob\"]}}");
/* Accept qmp_capabilities */
- mock_qmp_expect_and_reply(fd, "qmp_capabilities", NULL);
+ mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
/* Accept query-status, reply with running state */
ASSERT_OK(sd_json_buildo(
&status_return,
SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
SD_JSON_BUILD_PAIR_STRING("status", "running")));
- mock_qmp_expect_and_reply(fd, "query-status", status_return);
+ mock_qmp_expect_and_reply(&s, "query-status", status_return);
/* Accept stop */
- mock_qmp_expect_and_reply(fd, "stop", NULL);
+ mock_qmp_expect_and_reply(&s, "stop", NULL);
/* Send a STOP event */
- mock_qmp_write_literal(fd,
+ mock_qmp_send_literal(&s,
"{\"event\": \"STOP\", \"timestamp\": {\"seconds\": 1234, \"microseconds\": 5678}}");
/* Accept cont */
- mock_qmp_expect_and_reply(fd, "cont", NULL);
+ mock_qmp_expect_and_reply(&s, "cont", NULL);
- /* Close to trigger EOF */
- safe_close(fd);
+ /* json_stream_done() on cleanup closes our fd and signals EOF. */
_exit(EXIT_SUCCESS);
}
ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
- r = pidref_safe_fork("(mock-qmp)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid);
- ASSERT_OK(r);
+ r = ASSERT_OK(pidref_safe_fork("(mock-qmp)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
if (r == 0) {
safe_close(qmp_fds[0]);
ASSERT_OK(sd_event_new(&event));
ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
- r = pidref_safe_fork("(mock-qmp-eof)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid);
- ASSERT_OK(r);
+ r = ASSERT_OK(pidref_safe_fork("(mock-qmp-eof)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
if (r == 0) {
+ _cleanup_(json_stream_done) JsonStream s = {};
+
safe_close(qmp_fds[0]);
+ mock_qmp_init(&s, qmp_fds[1]);
/* Send greeting and accept capabilities, then die */
- mock_qmp_write_literal(qmp_fds[1],
+ mock_qmp_send_literal(&s,
"{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
- mock_qmp_expect_and_reply(qmp_fds[1], "qmp_capabilities", NULL);
+ mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
- /* Close immediately to trigger EOF */
- safe_close(qmp_fds[1]);
+ /* _exit() closes our fd via kernel teardown, signalling EOF to the peer. */
_exit(EXIT_SUCCESS);
}
ASSERT_EQ(si.si_status, EXIT_SUCCESS);
}
-/* Read one QMP command from fd (one recvmsg, expecting it fits in the buffer for typical
- * test commands). Returns the number of SCM_RIGHTS fds that arrived attached to the read,
- * stores the first received fd in *ret_received_fd (or -EBADF if none) and closes any extras,
- * and parses the JSON into *ret_cmd. */
-static size_t mock_qmp_recv_command(int fd, sd_json_variant **ret_cmd, int *ret_received_fd) {
- char buf[4096];
- char ctrl[CMSG_SPACE(sizeof(int) * 4)];
- struct iovec iov = { .iov_base = buf, .iov_len = sizeof(buf) - 1 };
- struct msghdr mh = {
- .msg_iov = &iov, .msg_iovlen = 1,
- .msg_control = ctrl, .msg_controllen = sizeof(ctrl),
- };
- size_t n_fds = 0;
- int received_fd = -EBADF;
-
- ssize_t n = recvmsg(fd, &mh, MSG_CMSG_CLOEXEC);
- assert_se(n > 0);
- buf[n] = '\0';
-
- struct cmsghdr *cmsg;
- CMSG_FOREACH(cmsg, &mh) {
- if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
- continue;
- size_t k = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
- int *fds = (int*) CMSG_DATA(cmsg);
- for (size_t i = 0; i < k; i++) {
- if (received_fd < 0)
- received_fd = fds[i];
- else
- safe_close(fds[i]);
- }
- n_fds += k;
- }
-
- ASSERT_OK(sd_json_parse(buf, 0, ret_cmd, NULL, NULL));
-
- if (ret_received_fd)
- *ret_received_fd = received_fd;
- else if (received_fd >= 0)
- safe_close(received_fd);
-
- return n_fds;
-}
-
/* Mock QMP server for the fd-on-first-invoke regression. Drives the wire dance:
* greeting → (recv qmp_capabilities, expect 0 fds) → reply →
* (recv add-fd, expect exactly 1 fd) → reply
- * Asserts the cmsg fd counts directly so a regression flips the child to
+ * Asserts the attached fd counts directly so a regression flips the child to
* exit_failure and the parent test fails on the wait-for-terminate. */
static _noreturn_ void mock_qmp_server_fd_first(int fd) {
+ _cleanup_(json_stream_done) JsonStream s = {};
_cleanup_(sd_json_variant_unrefp) sd_json_variant *cap_cmd = NULL,
*addfd_cmd = NULL,
*cap_reply = NULL,
*addfd_return = NULL,
*addfd_reply = NULL;
- size_t n_fds;
- int received_fd = -EBADF;
+
+ mock_qmp_init(&s, fd);
+ /* Accept SCM_RIGHTS on incoming messages so we can count how many fds the client
+ * attaches to each sendmsg. */
+ ASSERT_OK(json_stream_set_allow_fd_passing_input(&s, true, /* with_sockopt= */ true));
/* Greeting */
- mock_qmp_write_literal(fd,
+ mock_qmp_send_literal(&s,
"{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
/* Receive qmp_capabilities — must arrive with NO fds attached. */
- n_fds = mock_qmp_recv_command(fd, &cap_cmd, /* ret_received_fd= */ NULL);
- ASSERT_EQ(n_fds, (size_t) 0);
+ mock_qmp_recv(&s, &cap_cmd);
+ ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 0);
ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(cap_cmd, "execute")), "qmp_capabilities");
sd_json_variant *cap_id = ASSERT_NOT_NULL(sd_json_variant_by_key(cap_cmd, "id"));
&cap_reply,
SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_EMPTY_OBJECT),
SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(cap_id))));
- mock_qmp_write_json(fd, cap_reply);
+ mock_qmp_send(&s, cap_reply);
/* Receive add-fd — must arrive with EXACTLY ONE fd attached. */
- n_fds = mock_qmp_recv_command(fd, &addfd_cmd, &received_fd);
- ASSERT_EQ(n_fds, (size_t) 1);
- ASSERT_TRUE(received_fd >= 0);
+ mock_qmp_recv(&s, &addfd_cmd);
+ ASSERT_EQ(json_stream_get_n_input_fds(&s), (size_t) 1);
ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(addfd_cmd, "execute")), "add-fd");
- safe_close(received_fd);
+ json_stream_close_input_fds(&s);
sd_json_variant *addfd_id = ASSERT_NOT_NULL(sd_json_variant_by_key(addfd_cmd, "id"));
ASSERT_OK(sd_json_buildo(
&addfd_reply,
SD_JSON_BUILD_PAIR("return", SD_JSON_BUILD_VARIANT(addfd_return)),
SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(addfd_id))));
- mock_qmp_write_json(fd, addfd_reply);
+ mock_qmp_send(&s, addfd_reply);
- safe_close(fd);
_exit(EXIT_SUCCESS);
}
ASSERT_OK(sd_event_new(&event));
ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, qmp_fds));
- r = pidref_safe_fork("(mock-qmp-fd-first)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid);
- ASSERT_OK(r);
+ r = ASSERT_OK(pidref_safe_fork("(mock-qmp-fd-first)", FORK_DEATHSIG_SIGKILL|FORK_LOG, &pid));
if (r == 0) {
safe_close(qmp_fds[0]);
ASSERT_EQ(errno, EBADF);
}
-/* Reads one command, asserts its execute name, and replies with a QMP error object carrying
- * the given description. Mirrors mock_qmp_expect_and_reply() but on the error branch. */
-static void mock_qmp_expect_and_reply_error(int fd, const char *expected_command, const char *error_desc) {
- _cleanup_free_ char *buf = NULL;
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL, *error_obj = NULL, *response = NULL;
-
- buf = ASSERT_NOT_NULL(new(char, 4096));
-
- ssize_t n = read(fd, buf, 4095);
- assert_se(n > 0);
- buf[n] = '\0';
-
- ASSERT_OK(sd_json_parse(buf, 0, &cmd, NULL, NULL));
-
- sd_json_variant *execute = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "execute"));
- ASSERT_STREQ(sd_json_variant_string(execute), expected_command);
-
- sd_json_variant *id = ASSERT_NOT_NULL(sd_json_variant_by_key(cmd, "id"));
-
- ASSERT_OK(sd_json_buildo(
- &error_obj,
- SD_JSON_BUILD_PAIR_STRING("class", "GenericError"),
- SD_JSON_BUILD_PAIR_STRING("desc", error_desc)));
-
- ASSERT_OK(sd_json_buildo(
- &response,
- SD_JSON_BUILD_PAIR("error", SD_JSON_BUILD_VARIANT(error_obj)),
- SD_JSON_BUILD_PAIR("id", SD_JSON_BUILD_VARIANT(id))));
-
- mock_qmp_write_json(fd, response);
-}
-
/* Drives a small wire dance for the sync call test: greeting, capabilities, one successful
* command reply, and two error replies (one for the ret_error_desc path, one for the -EIO
* path). */
static _noreturn_ void mock_qmp_server_call(int fd) {
+ _cleanup_(json_stream_done) JsonStream s = {};
_cleanup_(sd_json_variant_unrefp) sd_json_variant *status_return = NULL;
- mock_qmp_write_literal(fd,
+ mock_qmp_init(&s, fd);
+
+ mock_qmp_send_literal(&s,
"{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
- mock_qmp_expect_and_reply(fd, "qmp_capabilities", NULL);
+ mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
ASSERT_OK(sd_json_buildo(
&status_return,
SD_JSON_BUILD_PAIR_BOOLEAN("running", true),
SD_JSON_BUILD_PAIR_STRING("status", "running")));
- mock_qmp_expect_and_reply(fd, "query-status", status_return);
+ mock_qmp_expect_and_reply(&s, "query-status", status_return);
- mock_qmp_expect_and_reply_error(fd, "stop", "not running");
- mock_qmp_expect_and_reply_error(fd, "stop", "still not running");
+ mock_qmp_expect_and_reply_error(&s, "stop", "not running");
+ mock_qmp_expect_and_reply_error(&s, "stop", "still not running");
- safe_close(fd);
_exit(EXIT_SUCCESS);
}
/* Server variant for the sync-call disconnect test: greets, accepts capabilities, reads one
* command without replying, then closes the socket so the client sees EOF mid-wait. */
static _noreturn_ void mock_qmp_server_call_disconnect(int fd) {
- _cleanup_free_ char *buf = NULL;
+ _cleanup_(json_stream_done) JsonStream s = {};
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *stop_cmd = NULL;
+
+ mock_qmp_init(&s, fd);
- mock_qmp_write_literal(fd,
+ mock_qmp_send_literal(&s,
"{\"QMP\": {\"version\": {\"qemu\": {\"micro\": 0, \"minor\": 0, \"major\": 9}}, \"capabilities\": []}}");
- mock_qmp_expect_and_reply(fd, "qmp_capabilities", NULL);
+ mock_qmp_expect_and_reply(&s, "qmp_capabilities", NULL);
- /* Consume the stop command but don't reply — just close to trigger EOF while the
- * client is blocked in qmp_client_call()'s process+wait pump. */
- buf = ASSERT_NOT_NULL(new(char, 4096));
- ssize_t n = read(fd, buf, 4095);
- assert_se(n > 0);
+ /* Consume the stop command but don't reply — json_stream_done() on cleanup closes
+ * our fd, triggering EOF while the client is blocked in qmp_client_call()'s
+ * process+wait pump. */
+ mock_qmp_recv(&s, &stop_cmd);
- safe_close(fd);
_exit(EXIT_SUCCESS);
}