LIBSYSTEMD_261 {
global:
sd_varlink_call_and_upgrade;
+ sd_varlink_reply_and_upgrade;
sd_varlink_set_sentinel;
} LIBSYSTEMD_260;
fputs("\n", f);
}
+ if ((symbol->symbol_flags & (SD_VARLINK_REQUIRES_UPGRADE|SD_VARLINK_SUPPORTS_UPGRADE)) != 0) {
+ fputs(colors[COLOR_COMMENT], f);
+ if (FLAGS_SET(symbol->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE))
+ fputs("# [Requires 'upgrade' flag]", f);
+ else
+ fputs("# [Supports 'upgrade' flag]", f);
+ fputs(colors[COLOR_RESET], f);
+ fputs("\n", f);
+ }
+
fputs(colors[COLOR_SYMBOL_TYPE], f);
fputs("method ", f);
fputs(colors[COLOR_IDENTIFIER], f);
if (FLAGS_SET(method->symbol_flags, SD_VARLINK_REQUIRES_MORE) && !FLAGS_SET(flags, SD_VARLINK_METHOD_MORE))
return -EBADE;
+ /* Same for upgrade */
+ if (FLAGS_SET(method->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE) && !FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE))
+ return -EBADE;
+
return varlink_idl_validate_symbol(method, v, SD_VARLINK_INPUT, reterr_bad_field);
}
(flags & SD_VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
VARLINK_PROCESSING_METHOD);
+ v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
+
assert(v->server);
/* First consult user supplied method implementations */
r = varlink_idl_validate_method_call(v->current_method, parameters, flags, &bad_field);
if (r == -EBADE) {
- varlink_log_errno(v, r, "Method %s() called without 'more' flag, but flag needs to be set.",
- method);
+ bool missing_upgrade = FLAGS_SET(v->current_method->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE) &&
+ !FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
+
+ varlink_log_errno(v, r, "Method %s() called without '%s' flag, but flag needs to be set.",
+ method, missing_upgrade ? "upgrade" : "more");
if (v->state == VARLINK_PROCESSING_METHOD) {
- r = sd_varlink_error(v, SD_VARLINK_ERROR_EXPECTED_MORE, NULL);
+ r = sd_varlink_error(v, missing_upgrade ? SD_VARLINK_ERROR_EXPECTED_UPGRADE
+ : SD_VARLINK_ERROR_EXPECTED_MORE, NULL);
/* If we didn't manage to enqueue an error response, then fail the
* connection completely. Otherwise ignore the error from
* sd_varlink_error() here, as it is synthesized from the function's
return sd_varlink_reply(v, parameters);
}
+_public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parameters, int *ret_input_fd, int *ret_output_fd) {
+ int r;
+
+ assert_return(v, -EINVAL);
+ assert_return(ret_input_fd || ret_output_fd, -EINVAL);
+
+ if (v->state == VARLINK_DISCONNECTED)
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
+
+ if (!IN_SET(v->state,
+ VARLINK_PROCESSING_METHOD,
+ VARLINK_PENDING_METHOD))
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
+
+ /* Verify the client actually requested a protocol upgrade */
+ if (!v->protocol_upgrade)
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
+ "Method call did not request a protocol upgrade.");
+
+ /* Ensure we did not buffer any data beyond the upgrade request. Check this before sending the
+ * reply so that we can return a normal error (the framework will send an error reply to the
+ * client). In normal operation this cannot happen because the client waits for our reply before
+ * sending raw data, and we set protocol_upgrade=true in dispatch to limit subsequent reads to
+ * single bytes. But a misbehaving client could pipeline data early. */
+ if (v->input_buffer_size > 0)
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADMSG),
+ "Unexpected buffered data from client during protocol upgrade.");
+
+ /* Validate parameters BEFORE sanitization (same validation as sd_varlink_reply(), but upgrade
+ * replies never carry the 'continues' flag so we always pass flags=0) */
+ if (v->current_method) {
+ const char *bad_field = NULL;
+
+ r = varlink_idl_validate_method_reply(v->current_method, parameters, /* flags= */ 0, &bad_field);
+ if (r < 0)
+ /* Please adjust test/units/end.sh when updating the log message. */
+ varlink_log_errno(v, r, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m",
+ v->current_method->name, strna(bad_field));
+ }
+
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
+ r = sd_json_buildo(&m, JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to build json message: %m");
+
+ r = varlink_enqueue_json(v, m);
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+
+ /* Flush the reply to the socket before stealing the fds. The reply must be fully written
+ * before the caller starts speaking the upgraded protocol. */
+ for (;;) {
+ r = varlink_write(v);
+ if (r < 0) {
+ varlink_log_errno(v, r, "Failed to flush reply: %m");
+ goto disconnect;
+ }
+ if (v->output_buffer_size == 0 && !v->output_queue)
+ break;
+ if (v->write_disconnected) {
+ r = varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET),
+ "Write disconnected during upgrade reply flush.");
+ goto disconnect;
+ }
+
+ r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY);
+ if (ERRNO_IS_NEG_TRANSIENT(r))
+ continue;
+ if (r < 0) {
+ varlink_log_errno(v, r, "Failed to wait for writable fd: %m");
+ goto disconnect;
+ }
+ assert(r > 0);
+
+ handle_revents(v, r);
+ }
+
+ /* Detach from the event loop before stealing the fds */
+ varlink_detach_event_sources(v);
+
+ /* Now hand the original FDs over to the caller, from this point on we have nothing to do with the
+ * connection anymore, it's up to the caller and we close the connection below */
+ r = varlink_handle_upgrade_fds(v, ret_input_fd, ret_output_fd);
+
+disconnect:
+ /* This also sets the connection state to VARLINK_DISCONNECTED */
+ sd_varlink_close(v);
+
+ return r < 0 ? r : 1;
+}
+
_public_ int sd_varlink_reset_fds(sd_varlink *v) {
assert_return(v, -EINVAL);
{ SD_VARLINK_ERROR_INVALID_PARAMETER, -EINVAL },
{ SD_VARLINK_ERROR_PERMISSION_DENIED, -EACCES },
{ SD_VARLINK_ERROR_EXPECTED_MORE, -EBADE },
+ { SD_VARLINK_ERROR_EXPECTED_UPGRADE, -EPROTOTYPE },
};
int r;
__extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_symbol_flags_t) {
SD_VARLINK_SUPPORTS_MORE = 1 << 0, /* Call supports "more" flag */
SD_VARLINK_REQUIRES_MORE = 1 << 1, /* Call requires "more" flag */
- _SD_VARLINK_SYMBOL_FLAGS_MAX = (1 << 2) - 1,
+ SD_VARLINK_SUPPORTS_UPGRADE = 1 << 2, /* Call supports "upgrade" flag */
+ SD_VARLINK_REQUIRES_UPGRADE = 1 << 3, /* Call requires "upgrade" flag */
+ _SD_VARLINK_SYMBOL_FLAGS_MAX = (1 << 4) - 1,
_SD_VARLINK_SYMBOL_FLAGS_INVALID = -EINVAL,
_SD_ENUM_FORCE_S64(SD_VARLINK_SYMBOL_FLAGS)
} sd_varlink_symbol_flags_t;
sd_varlink_callb((v), (method), (ret_parameters), (ret_error_id), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
/* Send method call with upgrade, wait for reply, then steal the connection fds for raw I/O.
- * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd.
- * ret_parameters and ret_error_id are borrowed references valid only until v is closed or unreffed.
+ * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd. Callers
+ * that need independent fds should dup() one of them. ret_parameters and ret_error_id are
+ * borrowed references valid only until v is closed or unreffed.
* Returns > 0 if the connection was upgraded, 0 if a Varlink error occurred (and ret_error_id was set),
* or < 0 on local failure. */
int sd_varlink_call_and_upgrade(sd_varlink *v, const char *method, sd_json_variant *parameters, sd_json_variant **ret_parameters, const char **ret_error_id, int *ret_input_fd, int *ret_output_fd);
#define sd_varlink_replybo(v, ...) \
sd_varlink_replyb((v), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
+/* Send a final reply to an upgrade request, then steal the connection fds for raw I/O.
+ * The fds are returned in blocking mode. The varlink connection is disconnected afterwards.
+ * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd. Callers
+ * that need independent fds should dup() one of them. For pipe pairs (e.g. ssh-exec
+ * transport) they will differ. Either ret pointer may be NULL.
+ *
+ * Note: this call synchronously blocks until the reply is flushed to the socket. This is
+ * usually fine as flush is fast but a misbehaving/adversary client that stops reading
+ * could stall the caller. So do not use in servers that multiplex many varlink
+ * connections. */
+int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parameters, int *ret_input_fd, int *ret_output_fd);
+
/* Enqueue a (final) error */
int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_variant *parameters);
int sd_varlink_errorb(sd_varlink *v, const char *error_id, ...);
#define SD_VARLINK_ERROR_INVALID_PARAMETER "org.varlink.service.InvalidParameter"
#define SD_VARLINK_ERROR_PERMISSION_DENIED "org.varlink.service.PermissionDenied"
#define SD_VARLINK_ERROR_EXPECTED_MORE "org.varlink.service.ExpectedMore"
+#define SD_VARLINK_ERROR_EXPECTED_UPGRADE "org.varlink.service.ExpectedUpgrade"
_SD_END_DECLARATIONS;
#include "sd-varlink.h"
#include "fd-util.h"
+#include "io-util.h"
#include "json-util.h"
#include "memfd-util.h"
#include "rm-rf.h"
TEST(notify_then_error) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
- ASSERT_OK(sd_event_default(&e));
+ ASSERT_OK(sd_event_new(&e));
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
ASSERT_OK(sd_varlink_server_new(&s, 0));
ASSERT_OK(sd_event_loop(e));
}
+static int method_upgrade(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
+ int r;
+
+ ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE));
+
+ r = sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd);
+ if (r < 0)
+ return r;
+
+ /* For a socketpair connection, both fds point to the same socket — avoid double-close */
+ if (input_fd == output_fd)
+ output_fd = -EBADF;
+
+ /* After upgrade, do raw I/O: read until EOF, reverse, write back.
+ * The client shuts down its write side after sending, so we get a clean EOF. */
+ char buf[64] = {};
+ ssize_t n = ASSERT_OK(loop_read(input_fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
+ ASSERT_GT(n, 0);
+
+ /* Reverse the received bytes */
+ for (ssize_t i = 0; i < n / 2; i++)
+ SWAP_TWO(buf[i], buf[n - 1 - i]);
+
+ int write_fd = output_fd >= 0 ? output_fd : input_fd;
+ ASSERT_OK(loop_write(write_fd, buf, n));
+
+ return 0;
+}
+
+static int method_upgrade_without_flag(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ int input_fd = -EBADF, output_fd = -EBADF;
+
+ /* Calling reply_and_upgrade without the client requesting it should fail with -EPROTO */
+ ASSERT_ERROR(sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd), EPROTO);
+
+ sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS);
+
+ return sd_varlink_reply(link, /* parameters= */ NULL);
+}
+
+static void *upgrade_thread(void *arg) {
+ _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
+ _cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
+ sd_json_variant *o = NULL;
+ const char *error_id = NULL;
+
+ ASSERT_OK(sd_varlink_connect_address(&c, arg));
+ ASSERT_OK(sd_varlink_set_description(c, "upgrade-client"));
+
+ ASSERT_OK(sd_varlink_call_and_upgrade(c, "io.test.Upgrade", /* parameters= */ NULL, &o, &error_id, &input_fd, &output_fd));
+ ASSERT_NULL(error_id);
+ ASSERT_GE(input_fd, 0);
+ ASSERT_GE(output_fd, 0);
+
+ /* For a socketpair connection, both fds point to the same socket — avoid double-close */
+ if (input_fd == output_fd)
+ output_fd = -EBADF;
+
+ /* Send a test string, expect reversed reply */
+ static const char msg[] = "Hello!";
+ int write_fd = output_fd >= 0 ? output_fd : input_fd;
+ ASSERT_OK(loop_write(write_fd, msg, strlen(msg)));
+ ASSERT_OK_ERRNO(shutdown(write_fd, SHUT_WR));
+
+ char buf[64] = {};
+ ssize_t n = ASSERT_OK(loop_read(input_fd, buf, strlen(msg), /* do_poll= */ true));
+ ASSERT_EQ((size_t) n, strlen(msg));
+ ASSERT_STREQ(buf, "!olleH");
+
+ /* Also test that a regular call (without upgrade flag) correctly rejects reply_and_upgrade on
+ * the server side, and still works as a normal call */
+ _cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c2 = NULL;
+ ASSERT_OK(sd_varlink_connect_address(&c2, arg));
+ ASSERT_OK(sd_varlink_set_description(c2, "no-upgrade-client"));
+ ASSERT_OK(sd_varlink_call(c2, "io.test.UpgradeWithoutFlag", /* parameters= */ NULL, &o, &error_id));
+ ASSERT_NULL(error_id);
+
+ return NULL;
+}
+
+TEST(upgrade) {
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ _cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ pthread_t t;
+ const char *sp;
+
+ ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
+ sp = strjoina(tmpdir, "/socket");
+
+ ASSERT_OK(sd_event_new(&e));
+
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+ ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
+ ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(-pthread_create(&t, NULL, upgrade_thread, (void*) sp));
+
+ /* Run the event loop until no more connections (the thread will disconnect when done) */
+ ASSERT_OK(sd_event_loop(e));
+
+ ASSERT_OK(-pthread_join(t, NULL));
+}
+
DEFINE_TEST_MAIN(LOG_DEBUG);