]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
varlink: use single byte reads on SD_VARLINK_SERVER_UPGRADABLE
authorMichael Vogt <michael@amutable.com>
Tue, 7 Apr 2026 15:47:50 +0000 (17:47 +0200)
committerDaan De Meyer <daan@amutable.com>
Thu, 9 Apr 2026 11:02:11 +0000 (13:02 +0200)
When the server side of a varlink connection supports connection
upgrades we need to go into single byte-read mode to avoid the
risk of a client that sends the json to protocol upgrade and then
immediately the custom protocol payload. This commit implements
this.

The next step is using MSG_PEEK to avoid the single-byte overhead.

src/libsystemd/sd-varlink/sd-varlink.c
src/systemd/sd-varlink.h
src/test/test-varlink.c
src/varlinkctl/varlinkctl.c

index fcf1704792d086421dc43cd1021e8ad9763a8e6b..bc7e93f40797bf98d8a68551e903b97e66efeefc 100644 (file)
@@ -3710,7 +3710,8 @@ _public_ int sd_varlink_server_new(sd_varlink_server **ret, sd_varlink_server_fl
                                  SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT|
                                  SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT|
                                  SD_VARLINK_SERVER_HANDLE_SIGINT|
-                                 SD_VARLINK_SERVER_HANDLE_SIGTERM)) == 0, -EINVAL);
+                                 SD_VARLINK_SERVER_HANDLE_SIGTERM|
+                                 SD_VARLINK_SERVER_UPGRADABLE)) == 0, -EINVAL);
 
         s = new(sd_varlink_server, 1);
         if (!s)
index 527415ab801651108368e02aa48cf4820bf9c8c8..3be82a7ddbc34886acc4cbb834bb654cd658ab63 100644 (file)
@@ -72,6 +72,7 @@ __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_server_flags_t) {
         SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT = 1 << 7, /* Reject input messages with fds if fd passing is disabled (needs kernel v6.16+) */
         SD_VARLINK_SERVER_HANDLE_SIGINT           = 1 << 8, /* Exit cleanly on SIGINT */
         SD_VARLINK_SERVER_HANDLE_SIGTERM          = 1 << 9, /* Exit cleanly on SIGTERM */
+        SD_VARLINK_SERVER_UPGRADABLE              = 1 << 10, /* Server has upgrade methods; avoid consuming post-upgrade data during reads */
         _SD_ENUM_FORCE_S64(SD_VARLINK_SERVER)
 } sd_varlink_server_flags_t;
 
index 219966c5d74df54e4937bf6ed310d3a75d25a8da..1bbc87c32c0f942dd9350d57836eafc106af4eac 100644 (file)
@@ -15,6 +15,7 @@
 #include "json-util.h"
 #include "memfd-util.h"
 #include "rm-rf.h"
+#include "socket-util.h"
 #include "tests.h"
 #include "tmpfile-util.h"
 #include "varlink-util.h"
@@ -837,7 +838,7 @@ TEST(upgrade) {
 
         ASSERT_OK(sd_event_new(&e));
 
-        ASSERT_OK(sd_varlink_server_new(&s, 0));
+        ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
         ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
         ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
         ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
@@ -852,4 +853,87 @@ TEST(upgrade) {
         ASSERT_OK(-pthread_join(t, NULL));
 }
 
+static int method_upgrade_and_exit(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        sd_event *event = ASSERT_PTR(userdata);
+
+        int r = method_upgrade(link, parameters, flags, /* userdata= */ NULL);
+
+        /* Exit the event loop after the upgrade is handled. We can't use sd_varlink_get_event()
+         * here because the connection is already disconnected after reply_and_upgrade. */
+        (void) sd_event_exit(event, r < 0 ? r : EXIT_SUCCESS);
+        return r;
+}
+
+static void *upgrade_pipelining_thread(void *arg) {
+        union sockaddr_union sa = {};
+        _cleanup_close_ int fd = -EBADF;
+
+        /* Connect a raw socket and pipeline: upgrade JSON + \0 + raw data in a single write.
+         * This tests that the server's byte-by-byte reading (SD_VARLINK_SERVER_UPGRADABLE)
+         * doesn't consume the raw data into the varlink input buffer. */
+        fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+        ASSERT_FD(fd);
+        int addrlen = sockaddr_un_set_path(&sa.un, arg);
+        ASSERT_OK(addrlen);
+        ASSERT_OK_ERRNO(connect(fd, &sa.sa, addrlen));
+
+        /* Build pipelined message: upgrade JSON + \0 + raw payload, all in one write */
+        static const char upgrade_msg[] = "{\"method\":\"io.test.Upgrade\",\"upgrade\":true}";
+        static const char raw_payload[] = "Pipelined!";
+        char send_buf[sizeof(upgrade_msg) + sizeof(raw_payload)]; /* includes \0 from upgrade_msg as delimiter */
+
+        memcpy(send_buf, upgrade_msg, sizeof(upgrade_msg)); /* copies trailing \0 = varlink delimiter */
+        memcpy(send_buf + sizeof(upgrade_msg), raw_payload, sizeof(raw_payload) - 1);
+
+        size_t total = sizeof(upgrade_msg) + strlen(raw_payload);
+        ASSERT_OK(loop_write(fd, send_buf, total));
+
+        /* Shut down write side so server's method_upgrade sees EOF after raw payload */
+        ASSERT_OK_ERRNO(shutdown(fd, SHUT_WR));
+
+        /* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes
+         * the connection after writing, so loop_read() reads until EOF and gets it all. */
+        char buf[256] = {};
+        ssize_t n = ASSERT_OK(loop_read(fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
+        ASSERT_GT(n, 0);
+
+        /* Split at the \0 delimiter between JSON reply and raw payload */
+        char *delim = memchr(buf, 0, n);
+        ASSERT_NOT_NULL(delim);
+
+        char *raw = delim + 1;
+        size_t raw_size = (size_t) n - (size_t)(raw - buf);
+
+        ASSERT_EQ(raw_size, strlen(raw_payload));
+        ASSERT_STREQ(strndupa_safe(raw, raw_size), "!denilepiP");
+
+        return NULL;
+}
+
+TEST(upgrade_pipelining) {
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        pthread_t t;
+        const char *sp;
+
+        ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
+        sp = strjoina(tmpdir, "/socket");
+
+        ASSERT_OK(sd_event_new(&e));
+
+        ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE|SD_VARLINK_SERVER_INHERIT_USERDATA));
+        ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-pipelining-server"));
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade_and_exit));
+        ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+        sd_varlink_server_set_userdata(s, e);
+
+        ASSERT_OK(-pthread_create(&t, NULL, upgrade_pipelining_thread, (void*) sp));
+
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(-pthread_join(t, NULL));
+}
+
 DEFINE_TEST_MAIN(LOG_DEBUG);
index 9a8e89b26c0bf74bc12c6e42335289236107a8db..18a639962c11f19b6d932d69ee87f35fed143be5 100644 (file)
@@ -1269,7 +1269,7 @@ static int verb_serve(int argc, char *argv[], uintptr_t _data, void *userdata) {
         if (r < 0)
                 return log_error_errno(r, "Failed to get event loop: %m");
 
-        r = sd_varlink_server_new(&s, SD_VARLINK_SERVER_INHERIT_USERDATA);
+        r = sd_varlink_server_new(&s, SD_VARLINK_SERVER_INHERIT_USERDATA|SD_VARLINK_SERVER_UPGRADABLE);
         if (r < 0)
                 return log_error_errno(r, "Failed to allocate varlink server: %m");