]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
libsystemd: add sd_varlink_reply_and_upgrade protocol upgrade
authorMichael Vogt <michael@amutable.com>
Wed, 1 Apr 2026 14:55:55 +0000 (16:55 +0200)
committerDaan De Meyer <daan@amutable.com>
Thu, 9 Apr 2026 11:01:50 +0000 (13:01 +0200)
This commit adds protocol upgrade support in the libsystemd server
side API code.

src/libsystemd/libsystemd.sym
src/libsystemd/sd-varlink/sd-varlink-idl.c
src/libsystemd/sd-varlink/sd-varlink.c
src/systemd/sd-varlink-idl.h
src/systemd/sd-varlink.h
src/test/test-varlink.c

index 6af86aa2b4a2b1bb19fcccaa51d5db8ef6eaff9f..619bcf820c8755c71ce1df274b1f270a1bb1164d 100644 (file)
@@ -1094,5 +1094,6 @@ global:
 LIBSYSTEMD_261 {
 global:
         sd_varlink_call_and_upgrade;
+        sd_varlink_reply_and_upgrade;
         sd_varlink_set_sentinel;
 } LIBSYSTEMD_260;
index 0b0ea244d6c9f4eccf966b1ac2b2b94514910a8d..dc09080cdabf36984cff2e9569f80e9adbf743d7 100644 (file)
@@ -401,6 +401,16 @@ static int varlink_idl_format_symbol(
                         fputs("\n", f);
                 }
 
+                if ((symbol->symbol_flags & (SD_VARLINK_REQUIRES_UPGRADE|SD_VARLINK_SUPPORTS_UPGRADE)) != 0) {
+                        fputs(colors[COLOR_COMMENT], f);
+                        if (FLAGS_SET(symbol->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE))
+                                fputs("# [Requires 'upgrade' flag]", f);
+                        else
+                                fputs("# [Supports 'upgrade' flag]", f);
+                        fputs(colors[COLOR_RESET], f);
+                        fputs("\n", f);
+                }
+
                 fputs(colors[COLOR_SYMBOL_TYPE], f);
                 fputs("method ", f);
                 fputs(colors[COLOR_IDENTIFIER], f);
@@ -1945,6 +1955,10 @@ int varlink_idl_validate_method_call(const sd_varlink_symbol *method, sd_json_va
         if (FLAGS_SET(method->symbol_flags, SD_VARLINK_REQUIRES_MORE) && !FLAGS_SET(flags, SD_VARLINK_METHOD_MORE))
                 return -EBADE;
 
+        /* Same for upgrade */
+        if (FLAGS_SET(method->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE) && !FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE))
+                return -EBADE;
+
         return varlink_idl_validate_symbol(method, v, SD_VARLINK_INPUT, reterr_bad_field);
 }
 
index 25e67b4b7484958af5959659e971d27cb4600e05..1c03cfc17367ecb1e45e298084a269b27ed3ed89 100644 (file)
@@ -1544,6 +1544,8 @@ static int varlink_dispatch_method(sd_varlink *v) {
                              (flags & SD_VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
                                                                   VARLINK_PROCESSING_METHOD);
 
+        v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
+
         assert(v->server);
 
         /* First consult user supplied method implementations */
@@ -1566,11 +1568,15 @@ static int varlink_dispatch_method(sd_varlink *v) {
 
                         r = varlink_idl_validate_method_call(v->current_method, parameters, flags, &bad_field);
                         if (r == -EBADE) {
-                                varlink_log_errno(v, r, "Method %s() called without 'more' flag, but flag needs to be set.",
-                                                  method);
+                                bool missing_upgrade = FLAGS_SET(v->current_method->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE) &&
+                                                       !FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
+
+                                varlink_log_errno(v, r, "Method %s() called without '%s' flag, but flag needs to be set.",
+                                                  method, missing_upgrade ? "upgrade" : "more");
 
                                 if (v->state == VARLINK_PROCESSING_METHOD) {
-                                        r = sd_varlink_error(v, SD_VARLINK_ERROR_EXPECTED_MORE, NULL);
+                                        r = sd_varlink_error(v, missing_upgrade ? SD_VARLINK_ERROR_EXPECTED_UPGRADE
+                                                                                : SD_VARLINK_ERROR_EXPECTED_MORE, NULL);
                                         /* If we didn't manage to enqueue an error response, then fail the
                                          * connection completely. Otherwise ignore the error from
                                          * sd_varlink_error() here, as it is synthesized from the function's
@@ -2821,6 +2827,97 @@ _public_ int sd_varlink_replyb(sd_varlink *v, ...) {
         return sd_varlink_reply(v, parameters);
 }
 
+_public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parameters, int *ret_input_fd, int *ret_output_fd) {
+        int r;
+
+        assert_return(v, -EINVAL);
+        assert_return(ret_input_fd || ret_output_fd, -EINVAL);
+
+        if (v->state == VARLINK_DISCONNECTED)
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
+
+        if (!IN_SET(v->state,
+                    VARLINK_PROCESSING_METHOD,
+                    VARLINK_PENDING_METHOD))
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
+
+        /* Verify the client actually requested a protocol upgrade */
+        if (!v->protocol_upgrade)
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
+                                         "Method call did not request a protocol upgrade.");
+
+        /* Ensure we did not buffer any data beyond the upgrade request. Check this before sending the
+         * reply so that we can return a normal error (the framework will send an error reply to the
+         * client). In normal operation this cannot happen because the client waits for our reply before
+         * sending raw data, and we set protocol_upgrade=true in dispatch to limit subsequent reads to
+         * single bytes. But a misbehaving client could pipeline data early. */
+        if (v->input_buffer_size > 0)
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADMSG),
+                                         "Unexpected buffered data from client during protocol upgrade.");
+
+        /* Validate parameters BEFORE sanitization (same validation as sd_varlink_reply(), but upgrade
+         * replies never carry the 'continues' flag so we always pass flags=0) */
+        if (v->current_method) {
+                const char *bad_field = NULL;
+
+                r = varlink_idl_validate_method_reply(v->current_method, parameters, /* flags= */ 0, &bad_field);
+                if (r < 0)
+                        /* Please adjust test/units/end.sh when updating the log message. */
+                        varlink_log_errno(v, r, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m",
+                                          v->current_method->name, strna(bad_field));
+        }
+
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
+        r = sd_json_buildo(&m, JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
+        if (r < 0)
+                return varlink_log_errno(v, r, "Failed to build json message: %m");
+
+        r = varlink_enqueue_json(v, m);
+        if (r < 0)
+                return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+
+        /* Flush the reply to the socket before stealing the fds. The reply must be fully written
+         * before the caller starts speaking the upgraded protocol. */
+        for (;;) {
+                r = varlink_write(v);
+                if (r < 0) {
+                        varlink_log_errno(v, r, "Failed to flush reply: %m");
+                        goto disconnect;
+                }
+                if (v->output_buffer_size == 0 && !v->output_queue)
+                        break;
+                if (v->write_disconnected) {
+                        r = varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET),
+                                              "Write disconnected during upgrade reply flush.");
+                        goto disconnect;
+                }
+
+                r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY);
+                if (ERRNO_IS_NEG_TRANSIENT(r))
+                        continue;
+                if (r < 0) {
+                        varlink_log_errno(v, r, "Failed to wait for writable fd: %m");
+                        goto disconnect;
+                }
+                assert(r > 0);
+
+                handle_revents(v, r);
+        }
+
+        /* Detach from the event loop before stealing the fds */
+        varlink_detach_event_sources(v);
+
+        /* Now hand the original FDs over to the caller, from this point on we have nothing to do with the
+         * connection anymore, it's up to the caller and we close the connection below */
+        r = varlink_handle_upgrade_fds(v, ret_input_fd, ret_output_fd);
+
+disconnect:
+        /* This also sets the connection state to VARLINK_DISCONNECTED */
+        sd_varlink_close(v);
+
+        return r < 0 ? r : 1;
+}
+
 _public_ int sd_varlink_reset_fds(sd_varlink *v) {
         assert_return(v, -EINVAL);
 
@@ -4572,6 +4669,7 @@ _public_ int sd_varlink_error_to_errno(const char *error, sd_json_variant *param
                 { SD_VARLINK_ERROR_INVALID_PARAMETER,      -EINVAL        },
                 { SD_VARLINK_ERROR_PERMISSION_DENIED,      -EACCES        },
                 { SD_VARLINK_ERROR_EXPECTED_MORE,          -EBADE         },
+                { SD_VARLINK_ERROR_EXPECTED_UPGRADE,       -EPROTOTYPE    },
         };
 
         int r;
index ab85a95cc751211b291414fbfadcc8d1125e0223..1122e31324206150846f321e9a34c76ea3decea0 100644 (file)
@@ -52,7 +52,9 @@ __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_symbol_type_t) {
 __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_symbol_flags_t) {
         SD_VARLINK_SUPPORTS_MORE         = 1 << 0, /* Call supports "more" flag */
         SD_VARLINK_REQUIRES_MORE         = 1 << 1, /* Call requires "more" flag */
-        _SD_VARLINK_SYMBOL_FLAGS_MAX     = (1 << 2) - 1,
+        SD_VARLINK_SUPPORTS_UPGRADE      = 1 << 2, /* Call supports "upgrade" flag */
+        SD_VARLINK_REQUIRES_UPGRADE      = 1 << 3, /* Call requires "upgrade" flag */
+        _SD_VARLINK_SYMBOL_FLAGS_MAX     = (1 << 4) - 1,
         _SD_VARLINK_SYMBOL_FLAGS_INVALID = -EINVAL,
         _SD_ENUM_FORCE_S64(SD_VARLINK_SYMBOL_FLAGS)
 } sd_varlink_symbol_flags_t;
index 9d6e939d64f0d9e843bedbfe49883163118671df..0b999b9154d2fb0e1fa57ad51d02f4741ee9c175 100644 (file)
@@ -137,8 +137,9 @@ int sd_varlink_callb(sd_varlink *v, const char *method, sd_json_variant **ret_pa
         sd_varlink_callb((v), (method), (ret_parameters), (ret_error_id), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
 
 /* Send method call with upgrade, wait for reply, then steal the connection fds for raw I/O.
- * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd.
- * ret_parameters and ret_error_id are borrowed references valid only until v is closed or unreffed.
+ * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd. Callers
+ * that need independent fds should dup() one of them. ret_parameters and ret_error_id are
+ * borrowed references valid only until v is closed or unreffed.
  * Returns > 0 if the connection was upgraded, 0 if a Varlink error occurred (and ret_error_id was set),
  * or < 0 on local failure. */
 int sd_varlink_call_and_upgrade(sd_varlink *v, const char *method, sd_json_variant *parameters, sd_json_variant **ret_parameters, const char **ret_error_id, int *ret_input_fd, int *ret_output_fd);
@@ -168,6 +169,18 @@ int sd_varlink_replyb(sd_varlink *v, ...);
 #define sd_varlink_replybo(v, ...)                         \
         sd_varlink_replyb((v), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
 
+/* Send a final reply to an upgrade request, then steal the connection fds for raw I/O.
+ * The fds are returned in blocking mode. The varlink connection is disconnected afterwards.
+ * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd. Callers
+ * that need independent fds should dup() one of them. For pipe pairs (e.g. ssh-exec
+ * transport) they will differ. Either ret pointer may be NULL.
+ *
+ * Note: this call synchronously blocks until the reply is flushed to the socket. This is
+ * usually fine as flush is fast but a misbehaving/adversary client that stops reading
+ * could stall the caller. So do not use in servers that multiplex many varlink
+ * connections. */
+int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parameters, int *ret_input_fd, int *ret_output_fd);
+
 /* Enqueue a (final) error */
 int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_variant *parameters);
 int sd_varlink_errorb(sd_varlink *v, const char *error_id, ...);
@@ -322,6 +335,7 @@ _SD_DEFINE_POINTER_CLEANUP_FUNC(sd_varlink_server, sd_varlink_server_unref);
 #define SD_VARLINK_ERROR_INVALID_PARAMETER "org.varlink.service.InvalidParameter"
 #define SD_VARLINK_ERROR_PERMISSION_DENIED "org.varlink.service.PermissionDenied"
 #define SD_VARLINK_ERROR_EXPECTED_MORE "org.varlink.service.ExpectedMore"
+#define SD_VARLINK_ERROR_EXPECTED_UPGRADE "org.varlink.service.ExpectedUpgrade"
 
 _SD_END_DECLARATIONS;
 
index 3324421f687876113a1b5da8a84f9eb165c074a4..36a46393760c67d33506971743594e495d2af513 100644 (file)
@@ -11,6 +11,7 @@
 #include "sd-varlink.h"
 
 #include "fd-util.h"
+#include "io-util.h"
 #include "json-util.h"
 #include "memfd-util.h"
 #include "rm-rf.h"
@@ -725,7 +726,7 @@ static int reply_notify_then_error(sd_varlink *link, sd_json_variant *parameters
 
 TEST(notify_then_error) {
         _cleanup_(sd_event_unrefp) sd_event *e = NULL;
-        ASSERT_OK(sd_event_default(&e));
+        ASSERT_OK(sd_event_new(&e));
 
         _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
         ASSERT_OK(sd_varlink_server_new(&s, 0));
@@ -752,4 +753,112 @@ TEST(notify_then_error) {
         ASSERT_OK(sd_event_loop(e));
 }
 
+static int method_upgrade(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
+        int r;
+
+        ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE));
+
+        r = sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd);
+        if (r < 0)
+                return r;
+
+        /* For a socketpair connection, both fds point to the same socket — avoid double-close */
+        if (input_fd == output_fd)
+                output_fd = -EBADF;
+
+        /* After upgrade, do raw I/O: read until EOF, reverse, write back.
+         * The client shuts down its write side after sending, so we get a clean EOF. */
+        char buf[64] = {};
+        ssize_t n = ASSERT_OK(loop_read(input_fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
+        ASSERT_GT(n, 0);
+
+        /* Reverse the received bytes */
+        for (ssize_t i = 0; i < n / 2; i++)
+                SWAP_TWO(buf[i], buf[n - 1 - i]);
+
+        int write_fd = output_fd >= 0 ? output_fd : input_fd;
+        ASSERT_OK(loop_write(write_fd, buf, n));
+
+        return 0;
+}
+
+static int method_upgrade_without_flag(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        int input_fd = -EBADF, output_fd = -EBADF;
+
+        /* Calling reply_and_upgrade without the client requesting it should fail with -EPROTO */
+        ASSERT_ERROR(sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd), EPROTO);
+
+        sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS);
+
+        return sd_varlink_reply(link, /* parameters= */ NULL);
+}
+
+static void *upgrade_thread(void *arg) {
+        _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
+        _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
+        sd_json_variant *o = NULL;
+        const char *error_id = NULL;
+
+        ASSERT_OK(sd_varlink_connect_address(&c, arg));
+        ASSERT_OK(sd_varlink_set_description(c, "upgrade-client"));
+
+        ASSERT_OK(sd_varlink_call_and_upgrade(c, "io.test.Upgrade", /* parameters= */ NULL, &o, &error_id, &input_fd, &output_fd));
+        ASSERT_NULL(error_id);
+        ASSERT_GE(input_fd, 0);
+        ASSERT_GE(output_fd, 0);
+
+        /* For a socketpair connection, both fds point to the same socket — avoid double-close */
+        if (input_fd == output_fd)
+                output_fd = -EBADF;
+
+        /* Send a test string, expect reversed reply */
+        static const char msg[] = "Hello!";
+        int write_fd = output_fd >= 0 ? output_fd : input_fd;
+        ASSERT_OK(loop_write(write_fd, msg, strlen(msg)));
+        ASSERT_OK_ERRNO(shutdown(write_fd, SHUT_WR));
+
+        char buf[64] = {};
+        ssize_t n = ASSERT_OK(loop_read(input_fd, buf, strlen(msg), /* do_poll= */ true));
+        ASSERT_EQ((size_t) n, strlen(msg));
+        ASSERT_STREQ(buf, "!olleH");
+
+        /* Also test that a regular call (without upgrade flag) correctly rejects reply_and_upgrade on
+         * the server side, and still works as a normal call */
+        _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c2 = NULL;
+        ASSERT_OK(sd_varlink_connect_address(&c2, arg));
+        ASSERT_OK(sd_varlink_set_description(c2, "no-upgrade-client"));
+        ASSERT_OK(sd_varlink_call(c2, "io.test.UpgradeWithoutFlag", /* parameters= */ NULL, &o, &error_id));
+        ASSERT_NULL(error_id);
+
+        return NULL;
+}
+
+TEST(upgrade) {
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        pthread_t t;
+        const char *sp;
+
+        ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
+        sp = strjoina(tmpdir, "/socket");
+
+        ASSERT_OK(sd_event_new(&e));
+
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+        ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
+        ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(-pthread_create(&t, NULL, upgrade_thread, (void*) sp));
+
+        /* Run the event loop until no more connections (the thread will disconnect when done) */
+        ASSERT_OK(sd_event_loop(e));
+
+        ASSERT_OK(-pthread_join(t, NULL));
+}
+
 DEFINE_TEST_MAIN(LOG_DEBUG);