]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
varlinkctl: add protocol upgrade support
authorMichael Vogt <michael@amutable.com>
Tue, 24 Mar 2026 08:48:33 +0000 (09:48 +0100)
committerMichael Vogt <michael@amutable.com>
Tue, 31 Mar 2026 16:25:09 +0000 (18:25 +0200)
The varlink spec supports protocol upgrades and they are very
useful to e.g. transfer binary data directly via varlink. So
far varlinkctl/sd-varlink was not supporting this. This commit
adds support for it in varlinkctl by using the new code in
sd-varlink and the generalized socket-forward code.

man/varlinkctl.xml
shell-completion/bash/varlinkctl
shell-completion/zsh/_varlinkctl
src/varlinkctl/varlinkctl.c
test/units/TEST-74-AUX-UTILS.varlinkctl.sh

index b96e35372fa9b90c34b772c640d740f20131ab7d..2a771c1049e8bd8cd19b669d09a9923e2f8c0893 100644 (file)
         <xi:include href="version-info.xml" xpointer="v255"/></listitem>
       </varlistentry>
 
+      <varlistentry>
+        <term><option>--upgrade</option></term>
+
+        <listitem><para>When used with <command>call</command>: request a protocol upgrade. The method call
+        is sent with the <constant>upgrade</constant> flag set. The service is expected to send a single
+        reply confirming the upgrade. After the reply, the Varlink protocol is no longer in effect on
+        the connection. Instead, <command>varlinkctl</command> acts as a bidirectional proxy: data read
+        from standard input is forwarded to the upgraded connection, and data received from the connection
+        is written to standard output.</para>
+
+        <para>This option may not be combined with <option>--more</option>, <option>--oneway</option>,
+        <option>--collect</option>, <option>--exec</option>, <option>--graceful=</option>, or
+        <option>--push-fd=</option>.</para>
+
+        <xi:include href="version-info.xml" xpointer="v261"/></listitem>
+      </varlistentry>
+
       <varlistentry>
         <term><option>--json=<replaceable>MODE</replaceable></option></term>
 
index e6557ffb0203655ac03a7b0af4c70410ec3d16b9..53c62dbae51354558da61bb9eff04948dc8617e2 100644 (file)
@@ -43,7 +43,7 @@ _varlinkctl() {
     local cur=${COMP_WORDS[COMP_CWORD]} prev=${COMP_WORDS[COMP_CWORD-1]}
     local -A OPTS=(
         [STANDALONE]='-h --help --version --no-pager -q --quiet --no-ask-password
-                      --oneway --collect --more --exec -j -E'
+                      --oneway --collect --more --exec --upgrade -j -E'
         [ARG]='--graceful --timeout --push-fd --json'
     )
 
index f33eea24b60fcf36afd3858608259dc8c99eeb19..a071f6ec2efb85ff517e998d099c126b312f15d5 100644 (file)
@@ -65,6 +65,7 @@ local -a opts=(
     '--no-pager[Do not pipe output to a pager]'
     '--more[Request multiple responses]'
     '--collect[Collect multiple responses in a JSON array]'
+    '--upgrade[Upgrade connection to raw protocol after method call]'
     {-j+,--json=}'[Output as json]:json-mode:(pretty short)'
 )
 _arguments -S $opts '*:: := _varlinkctl_command'
index 775a481ae8e6db29bc906c9357ff27887f47be0c..e73c72ab97d6e87aac4d031398660b0cab513aa6 100644 (file)
@@ -30,6 +30,7 @@
 #include "process-util.h"
 #include "recurse-dir.h"
 #include "runtime-scope.h"
+#include "socket-forward.h"
 #include "string-util.h"
 #include "strv.h"
 #include "terminal-util.h"
@@ -52,6 +53,7 @@ static bool arg_quiet = false;
 static char **arg_graceful = NULL;
 static usec_t arg_timeout = 0;
 static bool arg_exec = false;
+static bool arg_upgrade = false;
 static PushFds arg_push_fds = {};
 static bool arg_ask_password = true;
 static bool arg_legend = true;
@@ -111,6 +113,8 @@ static int help(void) {
                "     --graceful=ERROR    Treat specified Varlink error as success\n"
                "     --timeout=SECS      Maximum time to wait for method call completion\n"
                "  -E                     Short for --more --timeout=infinity\n"
+               "     --upgrade           Request protocol upgrade (connection becomes raw\n"
+               "                         bidirectional pipe on stdin/stdout after reply)\n"
                "     --push-fd=FD        Pass the specified fd along with method call\n"
                "\nSee the %2$s for details.\n",
                program_invocation_short_name,
@@ -139,6 +143,7 @@ static int parse_argv(int argc, char *argv[]) {
                 ARG_GRACEFUL,
                 ARG_TIMEOUT,
                 ARG_EXEC,
+                ARG_UPGRADE,
                 ARG_PUSH_FD,
                 ARG_NO_ASK_PASSWORD,
                 ARG_USER,
@@ -157,6 +162,7 @@ static int parse_argv(int argc, char *argv[]) {
                 { "graceful",        required_argument, NULL, ARG_GRACEFUL        },
                 { "timeout",         required_argument, NULL, ARG_TIMEOUT         },
                 { "exec",            no_argument,       NULL, ARG_EXEC            },
+                { "upgrade",         no_argument,       NULL, ARG_UPGRADE         },
                 { "push-fd",         required_argument, NULL, ARG_PUSH_FD         },
                 { "no-ask-password", no_argument,       NULL, ARG_NO_ASK_PASSWORD },
                 { "user",            no_argument,       NULL, ARG_USER            },
@@ -245,6 +251,10 @@ static int parse_argv(int argc, char *argv[]) {
                         arg_exec = true;
                         break;
 
+                case ARG_UPGRADE:
+                        arg_upgrade = true;
+                        break;
+
                 case ARG_PUSH_FD: {
                         if (!GREEDY_REALLOC(arg_push_fds.fds, arg_push_fds.n_fds + 1))
                                 return log_oom();
@@ -634,6 +644,75 @@ static int reply_callback(
         return r;
 }
 
+static int upgrade_forward_done(SocketForward *sf, int error, void *userdata) {
+        sd_event *event = ASSERT_PTR(userdata);
+
+        return sd_event_exit(event, error < 0 ? error : 0);
+}
+
+static int varlink_call_and_upgrade(const char *url, const char *method, sd_json_variant *parameters) {
+        _cleanup_(sd_varlink_unrefp) sd_varlink *vl = NULL;
+        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+        _cleanup_(socket_forward_freep) SocketForward *sf = NULL;
+        _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
+        const char *error_id = NULL;
+        int r;
+
+        r = varlink_connect_auto(&vl, url);
+        if (r < 0)
+                return r;
+
+        r = sd_varlink_call_and_upgrade(
+                        vl,
+                        method,
+                        parameters,
+                        /* ret_parameters= */ NULL,
+                        &error_id,
+                        &input_fd,
+                        &output_fd);
+        if (r < 0)
+                return log_error_errno(r, "Failed to upgrade connection via %s(): %m", method);
+        if (!isempty(error_id))
+                return log_error_errno(SYNTHETIC_ERRNO(EBADE), "Upgrade via %s() failed with error: %s", method, error_id);
+
+        /* For bidirectional sockets input_fd == output_fd. Dup immediately so that _cleanup_close_
+         * on both variables can never double-close the same fd. Note that on fcntl() failure
+         * output_fd is overwritten with -1, so only input_fd holds the real fd at cleanup time. */
+        if (input_fd == output_fd) {
+                output_fd = fcntl(input_fd, F_DUPFD_CLOEXEC, 3);
+                if (output_fd < 0)
+                        return log_error_errno(errno, "Failed to dup upgraded connection fd: %m");
+        }
+
+        /* Use a clean event loop just for the forwarding to avoid any interference. */
+        r = sd_event_new(&event);
+        if (r < 0)
+                return log_error_errno(r, "Failed to allocate event loop: %m");
+
+        _cleanup_close_ int stdin_fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 3);
+        if (stdin_fd < 0)
+                return log_error_errno(errno, "Failed to dup stdin: %m");
+
+        _cleanup_close_ int stdout_fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 3);
+        if (stdout_fd < 0)
+                return log_error_errno(errno, "Failed to dup stdout: %m");
+
+        r = socket_forward_new_pair(
+                        event,
+                        TAKE_FD(stdin_fd), TAKE_FD(stdout_fd),
+                        TAKE_FD(input_fd), TAKE_FD(output_fd),
+                        upgrade_forward_done, event,
+                        &sf);
+        if (r < 0)
+                return log_error_errno(r, "Failed to set up socket forwarding for varlink: %m");
+
+        r = sd_event_loop(event);
+        if (r < 0)
+                return log_error_errno(r, "Failed to run socket forward for varlink: %m");
+
+        return 0;
+}
+
 static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) {
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *jp = NULL;
         _cleanup_(sd_varlink_unrefp) sd_varlink *vl = NULL;
@@ -651,6 +730,12 @@ static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) {
         if (arg_exec && (arg_collect || (arg_method_flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE))) != 0)
                 return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "--exec and --collect/--more/--oneway may not be combined.");
 
+        if (arg_upgrade) {
+                /* TODO: support --exec with --upgrade, passing the upgraded socket fd to a child process */
+                if (arg_exec || arg_collect || arg_method_flags != 0 || arg_push_fds.n_fds > 0 || !strv_isempty(arg_graceful))
+                        return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "--upgrade may not be combined with --exec/--collect/--more/--oneway/--push-fd=/--graceful.");
+        }
+
         (void) polkit_agent_open_if_enabled(BUS_TRANSPORT_LOCAL, arg_ask_password);
 
         url = argv[1];
@@ -658,6 +743,20 @@ static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) {
         parameter = argc > 3 && !streq(argv[3], "-") ? argv[3] : NULL;
         cmdline = strv_skip(argv, 4);
 
+        if (!varlink_idl_qualified_symbol_name_is_valid(method))
+                return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s' (Expected valid Varlink interface name, followed by a dot, followed by a valid Varlink symbol name.)", method);
+
+        if (arg_upgrade) {
+                /* For --upgrade, parse parameters from argv only (stdin is used for the upgraded connection) */
+                if (parameter) {
+                        r = sd_json_parse(parameter, SD_JSON_PARSE_MUST_BE_OBJECT, &jp, /* reterr_line= */ NULL, /* reterr_column= */ NULL);
+                        if (r < 0)
+                                return log_error_errno(r, "Failed to parse parameters: %m");
+                }
+
+                return varlink_call_and_upgrade(url, method, jp);
+        }
+
         /* No JSON mode explicitly configured? Then default to the same as -j (except if --exec is used, in
          * which case generate shortest possible JSON since we are going to pass it to a program rather than
          * a user anyway) */
@@ -674,9 +773,6 @@ static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) {
          * leave incomplete lines hanging around. */
         arg_json_format_flags |= SD_JSON_FORMAT_NEWLINE;
 
-        if (!varlink_idl_qualified_symbol_name_is_valid(method))
-                return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s' (Expected valid Varlink interface name, followed by a dot, followed by a valid Varlink symbol name.)", method);
-
         unsigned line = 0, column = 0;
         if (parameter) {
                 source = "<argv[4]>";
index dc5f952e592772b5a49e88a905d15c43a9f6ba30..24b29e55c3a0105ae7c3dfc1279ad8666b0f6510 100755 (executable)
@@ -255,3 +255,102 @@ systemd-run --wait --pipe --user --machine testuser@ \
 # test io.systemd.Unit in user manager
 systemd-run --wait --pipe --user --machine testuser@ \
         varlinkctl --more call "/run/user/$testuser_uid/systemd/io.systemd.Manager" io.systemd.Unit.List '{}'
+
+# test --upgrade (protocol upgrade)
+UPGRADE_SOCKET="$(mktemp -d)/upgrade.sock"
+UPGRADE_SERVER="$(mktemp)"
+cat >"$UPGRADE_SERVER" <<'PYEOF'
+#!/usr/bin/env python3
+"""Varlink upgrade test server. With a socket path argument, listens on a unix socket.
+Without arguments, speaks over stdin/stdout (for ssh-exec: transport testing)."""
+import json, os, socket, sys
+
+if len(sys.argv) > 1:
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    sock.bind(sys.argv[1])
+    sock.listen(1)
+    print("READY", flush=True)
+    conn, _ = sock.accept()
+    inp = conn.makefile("rb")
+    out = conn.makefile("wb")
+else:
+    inp = sys.stdin.buffer
+    out = sys.stdout.buffer
+    conn = sock = None
+
+# Read the varlink request (NUL-terminated JSON)
+data = b""
+while True:
+    chunk = inp.read(1)
+    assert chunk, "Connection closed before receiving full varlink request"
+    data += chunk
+    if b"\0" in data:
+        break
+
+msg = json.loads(data.split(b"\0")[0])
+received_parameters = msg.get("parameters", {})
+out.write(b'{"parameters": {}}\0')
+out.flush()
+
+# Upgraded protocol: send a non-JSON banner first to prove we're truly in raw mode,
+# then echo the received parameters, then reverse lines from the client
+out.write(b"<<< UPGRADED >>>\n")
+out.write((json.dumps(received_parameters) + "\n").encode())
+out.flush()
+data = inp.read().decode().rstrip("\n")
+out.write((data[::-1] + "\n").encode())
+out.flush()
+
+if conn:
+    conn.close()
+if sock:
+    sock.close()
+PYEOF
+chmod +x "$UPGRADE_SERVER"
+
+# Start the server in the background
+python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET" &
+SERVER_PID=$!
+
+# Wait for server readiness
+timeout 5 bash -c "while [ ! -S '$UPGRADE_SOCKET' ]; do sleep 0.1; done"
+
+# Test proxy mode: pipe data through --upgrade, passing parameters and validate
+result="$(echo "hello world" | varlinkctl call --upgrade "unix:$UPGRADE_SOCKET" io.systemd.test.Reverse '{"foo":"bar"}')"
+echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
+echo "$result" | grep '"foo": "bar"' >/dev/null
+echo "$result" | grep "dlrow olleh" >/dev/null
+
+wait "$SERVER_PID" || :
+
+# Test --upgrade with stdin redirected from a regular file (epoll can't poll regular files,
+# so this exercises the fork+pipe fallback path)
+UPGRADE_SOCKET2="$(mktemp -d)/upgrade.sock"
+python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET2" &
+SERVER_PID=$!
+timeout 5 bash -c "while [ ! -S '$UPGRADE_SOCKET2' ]; do sleep 0.1; done"
+
+echo "file input test" > /tmp/test-upgrade-input
+result="$(varlinkctl call --upgrade "unix:$UPGRADE_SOCKET2" io.systemd.test.Reverse '{"foo":"file"}' < /tmp/test-upgrade-input)"
+echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
+echo "$result" | grep '"foo": "file"' >/dev/null
+echo "$result" | grep "tset tupni elif" >/dev/null
+
+wait "$SERVER_PID" || :
+
+# Test --upgrade over ssh-exec: transport (pipe pair, not a bidirectional socket).
+# This exercises the input_fd != output_fd path in sd_varlink_call_and_upgrade().
+# Reuse the same server script without a socket argument - it speaks over stdin/stdout.
+cat > "$SSHBINDIR"/ssh <<EOF
+#!/usr/bin/env bash
+exec python3 "$UPGRADE_SERVER"
+EOF
+chmod +x "$SSHBINDIR"/ssh
+
+result="$(echo "ssh pipe test" | SYSTEMD_SSH="$SSHBINDIR/ssh" varlinkctl call --upgrade ssh-exec:foobar:test-upgrade io.systemd.test.Reverse '{"foo":"ssh"}')"
+echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
+echo "$result" | grep '"foo": "ssh"' >/dev/null
+echo "$result" | grep "tset epip hss" >/dev/null
+
+rm -f "$UPGRADE_SOCKET" "$UPGRADE_SOCKET2" "$UPGRADE_SERVER" /tmp/test-upgrade-input
+rm -rf "$(dirname "$UPGRADE_SOCKET")" "$(dirname "$UPGRADE_SOCKET2")"