#include "process-util.h"
#include "recurse-dir.h"
#include "runtime-scope.h"
+#include "socket-forward.h"
#include "string-util.h"
#include "strv.h"
#include "terminal-util.h"
static char **arg_graceful = NULL;
static usec_t arg_timeout = 0;
static bool arg_exec = false;
+static bool arg_upgrade = false;
static PushFds arg_push_fds = {};
static bool arg_ask_password = true;
static bool arg_legend = true;
" --graceful=ERROR Treat specified Varlink error as success\n"
" --timeout=SECS Maximum time to wait for method call completion\n"
" -E Short for --more --timeout=infinity\n"
+ " --upgrade Request protocol upgrade (connection becomes raw\n"
+ " bidirectional pipe on stdin/stdout after reply)\n"
" --push-fd=FD Pass the specified fd along with method call\n"
"\nSee the %2$s for details.\n",
program_invocation_short_name,
ARG_GRACEFUL,
ARG_TIMEOUT,
ARG_EXEC,
+ ARG_UPGRADE,
ARG_PUSH_FD,
ARG_NO_ASK_PASSWORD,
ARG_USER,
{ "graceful", required_argument, NULL, ARG_GRACEFUL },
{ "timeout", required_argument, NULL, ARG_TIMEOUT },
{ "exec", no_argument, NULL, ARG_EXEC },
+ { "upgrade", no_argument, NULL, ARG_UPGRADE },
{ "push-fd", required_argument, NULL, ARG_PUSH_FD },
{ "no-ask-password", no_argument, NULL, ARG_NO_ASK_PASSWORD },
{ "user", no_argument, NULL, ARG_USER },
arg_exec = true;
break;
+ case ARG_UPGRADE:
+ arg_upgrade = true;
+ break;
+
case ARG_PUSH_FD: {
if (!GREEDY_REALLOC(arg_push_fds.fds, arg_push_fds.n_fds + 1))
return log_oom();
return r;
}
+static int upgrade_forward_done(SocketForward *sf, int error, void *userdata) {
+ sd_event *event = ASSERT_PTR(userdata);
+
+ return sd_event_exit(event, error < 0 ? error : 0);
+}
+
+static int varlink_call_and_upgrade(const char *url, const char *method, sd_json_variant *parameters) {
+ _cleanup_(sd_varlink_unrefp) sd_varlink *vl = NULL;
+ _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+ _cleanup_(socket_forward_freep) SocketForward *sf = NULL;
+ _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
+ const char *error_id = NULL;
+ int r;
+
+ r = varlink_connect_auto(&vl, url);
+ if (r < 0)
+ return r;
+
+ r = sd_varlink_call_and_upgrade(
+ vl,
+ method,
+ parameters,
+ /* ret_parameters= */ NULL,
+ &error_id,
+ &input_fd,
+ &output_fd);
+ if (r < 0)
+ return log_error_errno(r, "Failed to upgrade connection via %s(): %m", method);
+ if (!isempty(error_id))
+ return log_error_errno(SYNTHETIC_ERRNO(EBADE), "Upgrade via %s() failed with error: %s", method, error_id);
+
+ /* For bidirectional sockets input_fd == output_fd. Dup immediately so that _cleanup_close_
+ * on both variables can never double-close the same fd. Note that on fcntl() failure
+ * output_fd is overwritten with -1, so only input_fd holds the real fd at cleanup time. */
+ if (input_fd == output_fd) {
+ output_fd = fcntl(input_fd, F_DUPFD_CLOEXEC, 3);
+ if (output_fd < 0)
+ return log_error_errno(errno, "Failed to dup upgraded connection fd: %m");
+ }
+
+ /* Use a clean event loop just for the forwarding to avoid any interference. */
+ r = sd_event_new(&event);
+ if (r < 0)
+ return log_error_errno(r, "Failed to allocate event loop: %m");
+
+ _cleanup_close_ int stdin_fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, 3);
+ if (stdin_fd < 0)
+ return log_error_errno(errno, "Failed to dup stdin: %m");
+
+ _cleanup_close_ int stdout_fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, 3);
+ if (stdout_fd < 0)
+ return log_error_errno(errno, "Failed to dup stdout: %m");
+
+ r = socket_forward_new_pair(
+ event,
+ TAKE_FD(stdin_fd), TAKE_FD(stdout_fd),
+ TAKE_FD(input_fd), TAKE_FD(output_fd),
+ upgrade_forward_done, event,
+ &sf);
+ if (r < 0)
+ return log_error_errno(r, "Failed to set up socket forwarding for varlink: %m");
+
+ r = sd_event_loop(event);
+ if (r < 0)
+ return log_error_errno(r, "Failed to run socket forward for varlink: %m");
+
+ return 0;
+}
+
static int verb_call(int argc, char *argv[], uintptr_t _data, void *userdata) {
_cleanup_(sd_json_variant_unrefp) sd_json_variant *jp = NULL;
_cleanup_(sd_varlink_unrefp) sd_varlink *vl = NULL;
if (arg_exec && (arg_collect || (arg_method_flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE))) != 0)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "--exec and --collect/--more/--oneway may not be combined.");
+ if (arg_upgrade) {
+ /* TODO: support --exec with --upgrade, passing the upgraded socket fd to a child process */
+ if (arg_exec || arg_collect || arg_method_flags != 0 || arg_push_fds.n_fds > 0 || !strv_isempty(arg_graceful))
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "--upgrade may not be combined with --exec/--collect/--more/--oneway/--push-fd=/--graceful.");
+ }
+
(void) polkit_agent_open_if_enabled(BUS_TRANSPORT_LOCAL, arg_ask_password);
url = argv[1];
parameter = argc > 3 && !streq(argv[3], "-") ? argv[3] : NULL;
cmdline = strv_skip(argv, 4);
+ if (!varlink_idl_qualified_symbol_name_is_valid(method))
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s' (Expected valid Varlink interface name, followed by a dot, followed by a valid Varlink symbol name.)", method);
+
+ if (arg_upgrade) {
+ /* For --upgrade, parse parameters from argv only (stdin is used for the upgraded connection) */
+ if (parameter) {
+ r = sd_json_parse(parameter, SD_JSON_PARSE_MUST_BE_OBJECT, &jp, /* reterr_line= */ NULL, /* reterr_column= */ NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to parse parameters: %m");
+ }
+
+ return varlink_call_and_upgrade(url, method, jp);
+ }
+
/* No JSON mode explicitly configured? Then default to the same as -j (except if --exec is used, in
* which case generate shortest possible JSON since we are going to pass it to a program rather than
* a user anyway) */
* leave incomplete lines hanging around. */
arg_json_format_flags |= SD_JSON_FORMAT_NEWLINE;
- if (!varlink_idl_qualified_symbol_name_is_valid(method))
- return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s' (Expected valid Varlink interface name, followed by a dot, followed by a valid Varlink symbol name.)", method);
-
unsigned line = 0, column = 0;
if (parameter) {
source = "<argv[4]>";
# test io.systemd.Unit in user manager
systemd-run --wait --pipe --user --machine testuser@ \
varlinkctl --more call "/run/user/$testuser_uid/systemd/io.systemd.Manager" io.systemd.Unit.List '{}'
+
+# test --upgrade (protocol upgrade)
+UPGRADE_SOCKET="$(mktemp -d)/upgrade.sock"
+UPGRADE_SERVER="$(mktemp)"
+cat >"$UPGRADE_SERVER" <<'PYEOF'
+#!/usr/bin/env python3
+"""Varlink upgrade test server. With a socket path argument, listens on a unix socket.
+Without arguments, speaks over stdin/stdout (for ssh-exec: transport testing)."""
+import json, os, socket, sys
+
+if len(sys.argv) > 1:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(sys.argv[1])
+ sock.listen(1)
+ print("READY", flush=True)
+ conn, _ = sock.accept()
+ inp = conn.makefile("rb")
+ out = conn.makefile("wb")
+else:
+ inp = sys.stdin.buffer
+ out = sys.stdout.buffer
+ conn = sock = None
+
+# Read the varlink request (NUL-terminated JSON)
+data = b""
+while True:
+ chunk = inp.read(1)
+ assert chunk, "Connection closed before receiving full varlink request"
+ data += chunk
+ if b"\0" in data:
+ break
+
+msg = json.loads(data.split(b"\0")[0])
+received_parameters = msg.get("parameters", {})
+out.write(b'{"parameters": {}}\0')
+out.flush()
+
+# Upgraded protocol: send a non-JSON banner first to prove we're truly in raw mode,
+# then echo the received parameters, then reverse lines from the client
+out.write(b"<<< UPGRADED >>>\n")
+out.write((json.dumps(received_parameters) + "\n").encode())
+out.flush()
+data = inp.read().decode().rstrip("\n")
+out.write((data[::-1] + "\n").encode())
+out.flush()
+
+if conn:
+ conn.close()
+if sock:
+ sock.close()
+PYEOF
+chmod +x "$UPGRADE_SERVER"
+
+# Start the server in the background
+python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET" &
+SERVER_PID=$!
+
+# Wait for server readiness
+timeout 5 bash -c "while [ ! -S '$UPGRADE_SOCKET' ]; do sleep 0.1; done"
+
+# Test proxy mode: pipe data through --upgrade, passing parameters and validate
+result="$(echo "hello world" | varlinkctl call --upgrade "unix:$UPGRADE_SOCKET" io.systemd.test.Reverse '{"foo":"bar"}')"
+echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
+echo "$result" | grep '"foo": "bar"' >/dev/null
+echo "$result" | grep "dlrow olleh" >/dev/null
+
+wait "$SERVER_PID" || :
+
+# Test --upgrade with stdin redirected from a regular file (epoll can't poll regular files,
+# so this exercises the fork+pipe fallback path)
+UPGRADE_SOCKET2="$(mktemp -d)/upgrade.sock"
+python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET2" &
+SERVER_PID=$!
+timeout 5 bash -c "while [ ! -S '$UPGRADE_SOCKET2' ]; do sleep 0.1; done"
+
+echo "file input test" > /tmp/test-upgrade-input
+result="$(varlinkctl call --upgrade "unix:$UPGRADE_SOCKET2" io.systemd.test.Reverse '{"foo":"file"}' < /tmp/test-upgrade-input)"
+echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
+echo "$result" | grep '"foo": "file"' >/dev/null
+echo "$result" | grep "tset tupni elif" >/dev/null
+
+wait "$SERVER_PID" || :
+
+# Test --upgrade over ssh-exec: transport (pipe pair, not a bidirectional socket).
+# This exercises the input_fd != output_fd path in sd_varlink_call_and_upgrade().
+# Reuse the same server script without a socket argument - it speaks over stdin/stdout.
+cat > "$SSHBINDIR"/ssh <<EOF
+#!/usr/bin/env bash
+exec python3 "$UPGRADE_SERVER"
+EOF
+chmod +x "$SSHBINDIR"/ssh
+
+result="$(echo "ssh pipe test" | SYSTEMD_SSH="$SSHBINDIR/ssh" varlinkctl call --upgrade ssh-exec:foobar:test-upgrade io.systemd.test.Reverse '{"foo":"ssh"}')"
+echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
+echo "$result" | grep '"foo": "ssh"' >/dev/null
+echo "$result" | grep "tset epip hss" >/dev/null
+
+rm -f "$UPGRADE_SOCKET" "$UPGRADE_SOCKET2" "$UPGRADE_SERVER" /tmp/test-upgrade-input
+rm -rf "$(dirname "$UPGRADE_SOCKET")" "$(dirname "$UPGRADE_SOCKET2")"