#include "json-util.h"
#include "memfd-util.h"
#include "rm-rf.h"
+#include "socket-util.h"
#include "tests.h"
#include "tmpfile-util.h"
#include "varlink-util.h"
ASSERT_OK(sd_event_new(&e));
- ASSERT_OK(sd_varlink_server_new(&s, 0));
+ ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
ASSERT_OK(-pthread_join(t, NULL));
}
+static int method_upgrade_and_exit(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ sd_event *event = ASSERT_PTR(userdata);
+
+ int r = method_upgrade(link, parameters, flags, /* userdata= */ NULL);
+
+ /* Exit the event loop after the upgrade is handled. We can't use sd_varlink_get_event()
+ * here because the connection is already disconnected after reply_and_upgrade. */
+ (void) sd_event_exit(event, r < 0 ? r : EXIT_SUCCESS);
+ return r;
+}
+
+static void *upgrade_pipelining_thread(void *arg) {
+ union sockaddr_union sa = {};
+ _cleanup_close_ int fd = -EBADF;
+
+ /* Connect a raw socket and pipeline: upgrade JSON + \0 + raw data in a single write.
+ * This tests that the server's byte-by-byte reading (SD_VARLINK_SERVER_UPGRADABLE)
+ * doesn't consume the raw data into the varlink input buffer. */
+ fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+ ASSERT_FD(fd);
+ int addrlen = sockaddr_un_set_path(&sa.un, arg);
+ ASSERT_OK(addrlen);
+ ASSERT_OK_ERRNO(connect(fd, &sa.sa, addrlen));
+
+ /* Build pipelined message: upgrade JSON + \0 + raw payload, all in one write */
+ static const char upgrade_msg[] = "{\"method\":\"io.test.Upgrade\",\"upgrade\":true}";
+ static const char raw_payload[] = "Pipelined!";
+ char send_buf[sizeof(upgrade_msg) + sizeof(raw_payload)]; /* includes \0 from upgrade_msg as delimiter */
+
+ memcpy(send_buf, upgrade_msg, sizeof(upgrade_msg)); /* copies trailing \0 = varlink delimiter */
+ memcpy(send_buf + sizeof(upgrade_msg), raw_payload, sizeof(raw_payload) - 1);
+
+ size_t total = sizeof(upgrade_msg) + strlen(raw_payload);
+ ASSERT_OK(loop_write(fd, send_buf, total));
+
+ /* Shut down write side so server's method_upgrade sees EOF after raw payload */
+ ASSERT_OK_ERRNO(shutdown(fd, SHUT_WR));
+
+ /* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes
+ * the connection after writing, so loop_read() reads until EOF and gets it all. */
+ char buf[256] = {};
+ ssize_t n = ASSERT_OK(loop_read(fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
+ ASSERT_GT(n, 0);
+
+ /* Split at the \0 delimiter between JSON reply and raw payload */
+ char *delim = memchr(buf, 0, n);
+ ASSERT_NOT_NULL(delim);
+
+ char *raw = delim + 1;
+ size_t raw_size = (size_t) n - (size_t)(raw - buf);
+
+ ASSERT_EQ(raw_size, strlen(raw_payload));
+ ASSERT_STREQ(strndupa_safe(raw, raw_size), "!denilepiP");
+
+ return NULL;
+}
+
+TEST(upgrade_pipelining) {
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ pthread_t t;
+ const char *sp;
+
+ ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
+ sp = strjoina(tmpdir, "/socket");
+
+ ASSERT_OK(sd_event_new(&e));
+
+ ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE|SD_VARLINK_SERVER_INHERIT_USERDATA));
+ ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-pipelining-server"));
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade_and_exit));
+ ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+ sd_varlink_server_set_userdata(s, e);
+
+ ASSERT_OK(-pthread_create(&t, NULL, upgrade_pipelining_thread, (void*) sp));
+
+ ASSERT_OK(sd_event_loop(e));
+
+ ASSERT_OK(-pthread_join(t, NULL));
+}
+
DEFINE_TEST_MAIN(LOG_DEBUG);