From: Michael Vogt Date: Tue, 24 Mar 2026 08:48:33 +0000 (+0100) Subject: varlinkctl: add protocol upgrade support X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=0fc3f85d35f155b1242423df120bdadecc0696e7;p=thirdparty%2Fsystemd.git varlinkctl: add protocol upgrade support The varlink spec supports protocol upgrades and they are very useful to e.g. transfer binary data directly via varlink. So far varlinkctl/sd-varlink was not supporting this. This commit adds support for it in varlinkctl by using the new code in sd-varlink and the generalized socket-forward code. --- diff --git a/man/varlinkctl.xml b/man/varlinkctl.xml index b96e35372fa..2a771c1049e 100644 --- a/man/varlinkctl.xml +++ b/man/varlinkctl.xml @@ -269,6 +269,23 @@ + + + + When used with call: request a protocol upgrade. The method call + is sent with the upgrade flag set. The service is expected to send a single + reply confirming the upgrade. After the reply, the Varlink protocol is no longer in effect on + the connection. Instead, varlinkctl acts as a bidirectional proxy: data read + from standard input is forwarded to the upgraded connection, and data received from the connection + is written to standard output. + + This option may not be combined with , , + , , , or + . + + + + diff --git a/shell-completion/bash/varlinkctl b/shell-completion/bash/varlinkctl index e6557ffb020..53c62dbae51 100644 --- a/shell-completion/bash/varlinkctl +++ b/shell-completion/bash/varlinkctl @@ -43,7 +43,7 @@ _varlinkctl() { local cur=${COMP_WORDS[COMP_CWORD]} prev=${COMP_WORDS[COMP_CWORD-1]} local -A OPTS=( [STANDALONE]='-h --help --version --no-pager -q --quiet --no-ask-password - --oneway --collect --more --exec -j -E' + --oneway --collect --more --exec --upgrade -j -E' [ARG]='--graceful --timeout --push-fd --json' ) diff --git a/shell-completion/zsh/_varlinkctl b/shell-completion/zsh/_varlinkctl index f33eea24b60..a071f6ec2ef 100644 --- a/shell-completion/zsh/_varlinkctl +++ b/shell-completion/zsh/_varlinkctl @@ -65,6 +65,7 @@ local -a opts=( '--no-pager[Do not pipe output to a pager]' '--more[Request multiple responses]' '--collect[Collect multiple responses in a JSON array]' + '--upgrade[Upgrade connection to raw protocol after method call]' {-j+,--json=}'[Output as json]:json-mode:(pretty short)' ) _arguments -S $opts '*:: := _varlinkctl_command' diff --git a/src/varlinkctl/varlinkctl.c b/src/varlinkctl/varlinkctl.c index 775a481ae8e..e73c72ab97d 100644 --- a/src/varlinkctl/varlinkctl.c +++ b/src/varlinkctl/varlinkctl.c @@ -30,6 +30,7 @@ #include "process-util.h" #include "recurse-dir.h" #include "runtime-scope.h" +#include "socket-forward.h" #include "string-util.h" #include "strv.h" #include "terminal-util.h" @@ -52,6 +53,7 @@ static bool arg_quiet = false; static char **arg_graceful = NULL; static usec_t arg_timeout = 0; static bool arg_exec = false; +static bool arg_upgrade = false; static PushFds arg_push_fds = {}; static bool arg_ask_password = true; static bool arg_legend = true; @@ -111,6 +113,8 @@ static int help(void) { " --graceful=ERROR Treat specified Varlink error as success\n" " --timeout=SECS Maximum time to wait for method call completion\n" " -E Short for --more --timeout=infinity\n" + " --upgrade Request protocol upgrade (connection becomes raw\n" + " bidirectional pipe on stdin/stdout after reply)\n" " --push-fd=FD Pass the specified fd along with method call\n" "\nSee the %2$s for details.\n", program_invocation_short_name, @@ -139,6 +143,7 @@ static int parse_argv(int argc, char *argv[]) { ARG_GRACEFUL, ARG_TIMEOUT, ARG_EXEC, + ARG_UPGRADE, ARG_PUSH_FD, ARG_NO_ASK_PASSWORD, ARG_USER, @@ -157,6 +162,7 @@ static int parse_argv(int argc, char *argv[]) { { "graceful", required_argument, NULL, ARG_GRACEFUL }, { "timeout", required_argument, NULL, ARG_TIMEOUT }, { "exec", no_argument, NULL, ARG_EXEC }, + { "upgrade", no_argument, NULL, ARG_UPGRADE }, { "push-fd", required_argument, NULL, ARG_PUSH_FD }, { "no-ask-password", no_argument, NULL, ARG_NO_ASK_PASSWORD }, { "user", no_argument, NULL, ARG_USER }, @@ -245,6 +251,10 @@ static int parse_argv(int argc, char *argv[]) { arg_exec = true; break; + case ARG_UPGRADE: + arg_upgrade = true; + break; + case ARG_PUSH_FD: { if (!GREEDY_REALLOC(arg_push_fds.fds, arg_push_fds.n_fds + 1)) return log_oom(); @@ -634,6 +644,75 @@ static int reply_callback( return r; } +static int upgrade_forward_done(SocketForward *sf, int error, void *userdata) { + sd_event *event = ASSERT_PTR(userdata); + + return sd_event_exit(event, error < 0 ? error : 0); +} + +static int varlink_call_and_upgrade(const char *url, const char *method, sd_json_variant *parameters) { + _cleanup_(sd_varlink_unrefp) sd_varlink *vl = NULL; + _cleanup_(sd_event_unrefp) sd_event *event = NULL; + _cleanup_(socket_forward_freep) SocketForward *sf = NULL; + _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF; + const char *error_id = NULL; + int r; + + r = varlink_connect_auto(&vl, url); + if (r < 0) + return r; + + r = sd_varlink_call_and_upgrade( + vl, + method, + parameters, + /* ret_parameters= */ NULL, + &error_id, + &input_fd, + &output_fd); + if (r < 0) + return log_error_errno(r, "Failed to upgrade connection via %s(): %m", method); + if (!isempty(error_id)) + return log_error_errno(SYNTHETIC_ERRNO(EBADE), "Upgrade via %s() failed with error: %s", method, error_id); + + /* For bidirectional sockets input_fd == output_fd. Dup immediately so that _cleanup_close_ + * on both variables can never double-close the same fd. Note that on fcntl() failure + * output_fd is overwritten with -1, so only input_fd holds the real fd at cleanup time. */ + if (input_fd == output_fd) { + output_fd = fcntl(input_fd, F_DUPFD_CLOEXEC, 3); + if (output_fd < 0) + return log_error_errno(errno, "Failed to dup upgraded connection fd: %m"); + } + + /* Use a clean event loop just for the forwarding to avoid any interference. */ + r = sd_event_new(&event); + if (r < 0) + return log_error_errno(r, "Failed to allocate event loop: %m"); + + _cleanup_close_ int stdin_fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 3); + if (stdin_fd < 0) + return log_error_errno(errno, "Failed to dup stdin: %m"); + + _cleanup_close_ int stdout_fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 3); + if (stdout_fd < 0) + return log_error_errno(errno, "Failed to dup stdout: %m"); + + r = socket_forward_new_pair( + event, + TAKE_FD(stdin_fd), TAKE_FD(stdout_fd), + TAKE_FD(input_fd), TAKE_FD(output_fd), + upgrade_forward_done, event, + &sf); + if (r < 0) + return log_error_errno(r, "Failed to set up socket forwarding for varlink: %m"); + + r = sd_event_loop(event); + if (r < 0) + return log_error_errno(r, "Failed to run socket forward for varlink: %m"); + + return 0; +} + static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) { _cleanup_(sd_json_variant_unrefp) sd_json_variant *jp = NULL; _cleanup_(sd_varlink_unrefp) sd_varlink *vl = NULL; @@ -651,6 +730,12 @@ static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) { if (arg_exec && (arg_collect || (arg_method_flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE))) != 0) return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "--exec and --collect/--more/--oneway may not be combined."); + if (arg_upgrade) { + /* TODO: support --exec with --upgrade, passing the upgraded socket fd to a child process */ + if (arg_exec || arg_collect || arg_method_flags != 0 || arg_push_fds.n_fds > 0 || !strv_isempty(arg_graceful)) + return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "--upgrade may not be combined with --exec/--collect/--more/--oneway/--push-fd=/--graceful."); + } + (void) polkit_agent_open_if_enabled(BUS_TRANSPORT_LOCAL, arg_ask_password); url = argv[1]; @@ -658,6 +743,20 @@ static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) { parameter = argc > 3 && !streq(argv[3], "-") ? argv[3] : NULL; cmdline = strv_skip(argv, 4); + if (!varlink_idl_qualified_symbol_name_is_valid(method)) + return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s' (Expected valid Varlink interface name, followed by a dot, followed by a valid Varlink symbol name.)", method); + + if (arg_upgrade) { + /* For --upgrade, parse parameters from argv only (stdin is used for the upgraded connection) */ + if (parameter) { + r = sd_json_parse(parameter, SD_JSON_PARSE_MUST_BE_OBJECT, &jp, /* reterr_line= */ NULL, /* reterr_column= */ NULL); + if (r < 0) + return log_error_errno(r, "Failed to parse parameters: %m"); + } + + return varlink_call_and_upgrade(url, method, jp); + } + /* No JSON mode explicitly configured? Then default to the same as -j (except if --exec is used, in * which case generate shortest possible JSON since we are going to pass it to a program rather than * a user anyway) */ @@ -674,9 +773,6 @@ static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) { * leave incomplete lines hanging around. */ arg_json_format_flags |= SD_JSON_FORMAT_NEWLINE; - if (!varlink_idl_qualified_symbol_name_is_valid(method)) - return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s' (Expected valid Varlink interface name, followed by a dot, followed by a valid Varlink symbol name.)", method); - unsigned line = 0, column = 0; if (parameter) { source = ""; diff --git a/test/units/TEST-74-AUX-UTILS.varlinkctl.sh b/test/units/TEST-74-AUX-UTILS.varlinkctl.sh index dc5f952e592..24b29e55c3a 100755 --- a/test/units/TEST-74-AUX-UTILS.varlinkctl.sh +++ b/test/units/TEST-74-AUX-UTILS.varlinkctl.sh @@ -255,3 +255,102 @@ systemd-run --wait --pipe --user --machine testuser@ \ # test io.systemd.Unit in user manager systemd-run --wait --pipe --user --machine testuser@ \ varlinkctl --more call "/run/user/$testuser_uid/systemd/io.systemd.Manager" io.systemd.Unit.List '{}' + +# test --upgrade (protocol upgrade) +UPGRADE_SOCKET="$(mktemp -d)/upgrade.sock" +UPGRADE_SERVER="$(mktemp)" +cat >"$UPGRADE_SERVER" <<'PYEOF' +#!/usr/bin/env python3 +"""Varlink upgrade test server. With a socket path argument, listens on a unix socket. +Without arguments, speaks over stdin/stdout (for ssh-exec: transport testing).""" +import json, os, socket, sys + +if len(sys.argv) > 1: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(sys.argv[1]) + sock.listen(1) + print("READY", flush=True) + conn, _ = sock.accept() + inp = conn.makefile("rb") + out = conn.makefile("wb") +else: + inp = sys.stdin.buffer + out = sys.stdout.buffer + conn = sock = None + +# Read the varlink request (NUL-terminated JSON) +data = b"" +while True: + chunk = inp.read(1) + assert chunk, "Connection closed before receiving full varlink request" + data += chunk + if b"\0" in data: + break + +msg = json.loads(data.split(b"\0")[0]) +received_parameters = msg.get("parameters", {}) +out.write(b'{"parameters": {}}\0') +out.flush() + +# Upgraded protocol: send a non-JSON banner first to prove we're truly in raw mode, +# then echo the received parameters, then reverse lines from the client +out.write(b"<<< UPGRADED >>>\n") +out.write((json.dumps(received_parameters) + "\n").encode()) +out.flush() +data = inp.read().decode().rstrip("\n") +out.write((data[::-1] + "\n").encode()) +out.flush() + +if conn: + conn.close() +if sock: + sock.close() +PYEOF +chmod +x "$UPGRADE_SERVER" + +# Start the server in the background +python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET" & +SERVER_PID=$! + +# Wait for server readiness +timeout 5 bash -c "while [ ! -S '$UPGRADE_SOCKET' ]; do sleep 0.1; done" + +# Test proxy mode: pipe data through --upgrade, passing parameters and validate +result="$(echo "hello world" | varlinkctl call --upgrade "unix:$UPGRADE_SOCKET" io.systemd.test.Reverse '{"foo":"bar"}')" +echo "$result" | grep "<<< UPGRADED >>>" >/dev/null +echo "$result" | grep '"foo": "bar"' >/dev/null +echo "$result" | grep "dlrow olleh" >/dev/null + +wait "$SERVER_PID" || : + +# Test --upgrade with stdin redirected from a regular file (epoll can't poll regular files, +# so this exercises the fork+pipe fallback path) +UPGRADE_SOCKET2="$(mktemp -d)/upgrade.sock" +python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET2" & +SERVER_PID=$! +timeout 5 bash -c "while [ ! -S '$UPGRADE_SOCKET2' ]; do sleep 0.1; done" + +echo "file input test" > /tmp/test-upgrade-input +result="$(varlinkctl call --upgrade "unix:$UPGRADE_SOCKET2" io.systemd.test.Reverse '{"foo":"file"}' < /tmp/test-upgrade-input)" +echo "$result" | grep "<<< UPGRADED >>>" >/dev/null +echo "$result" | grep '"foo": "file"' >/dev/null +echo "$result" | grep "tset tupni elif" >/dev/null + +wait "$SERVER_PID" || : + +# Test --upgrade over ssh-exec: transport (pipe pair, not a bidirectional socket). +# This exercises the input_fd != output_fd path in sd_varlink_call_and_upgrade(). +# Reuse the same server script without a socket argument - it speaks over stdin/stdout. +cat > "$SSHBINDIR"/ssh <>>" >/dev/null +echo "$result" | grep '"foo": "ssh"' >/dev/null +echo "$result" | grep "tset epip hss" >/dev/null + +rm -f "$UPGRADE_SOCKET" "$UPGRADE_SOCKET2" "$UPGRADE_SERVER" /tmp/test-upgrade-input +rm -rf "$(dirname "$UPGRADE_SOCKET")" "$(dirname "$UPGRADE_SOCKET2")"