]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-varlink: add sd_varlink_call_and_upgrade() for protocol upgrades
authorMichael Vogt <michael@amutable.com>
Tue, 24 Mar 2026 08:48:20 +0000 (09:48 +0100)
committerMichael Vogt <michael@amutable.com>
Tue, 31 Mar 2026 16:25:09 +0000 (18:25 +0200)
The varlink spec supports protocol upgrades and they are very
useful to e.g. transfer binary data directly via varlink. So
far sd-varlink was not supporting this.

This commit adds a new public sd_varlink_call_and_upgrade()
that sends a method call, waits for the reply, then steals
the connection fds for raw I/O. It returns separate input_fd
and output_fd to support both bidirectional sockets and pipe
pairs.

A helper is extracted and shared between sd_varlink_call_full()
and sd_varlink_call_and_upgrade(). A new `protocol_upgrade`
bool in `struct sd_varlink` ensures that on a protocol upgrade
request we only exactly read the varlink protocol bytes and
leave anything beyond that to the caller that speaks the upgraded
protocol.

Note that this is the client side of the library implementation
only for now. The server side needs work but this is already
useful as it allows to talk to varlink servers that speak protocol
upgrades (like the rust implemenations of varlink).

src/libsystemd/libsystemd.sym
src/libsystemd/sd-varlink/sd-varlink.c
src/libsystemd/sd-varlink/varlink-internal.h
src/systemd/sd-varlink.h

index e90a9460e28e06752c5ecdb2e4f52f12c873e1c5..6af86aa2b4a2b1bb19fcccaa51d5db8ef6eaff9f 100644 (file)
@@ -1093,5 +1093,6 @@ global:
 
 LIBSYSTEMD_261 {
 global:
+        sd_varlink_call_and_upgrade;
         sd_varlink_set_sentinel;
 } LIBSYSTEMD_260;
index 9938ca7063dc48d245ca3bd017e8ba12f8675304..64606adcea998d537524e254fb691d63009fbb2c 100644 (file)
@@ -894,7 +894,12 @@ static int varlink_read(sd_varlink *v) {
         }
 
         p = v->input_buffer + v->input_buffer_index + v->input_buffer_size;
-        rs = MALLOC_SIZEOF_SAFE(v->input_buffer) - (v->input_buffer_index + v->input_buffer_size);
+
+        /* When a protocol upgrade is requested we can't consume any post-upgrade data from the socket buffer */
+        if (v->protocol_upgrade)
+                rs = 1;
+        else
+                rs = MALLOC_SIZEOF_SAFE(v->input_buffer) - (v->input_buffer_index + v->input_buffer_size);
 
         if (v->allow_fd_passing_input > 0) {
                 iov = IOVEC_MAKE(p, rs);
@@ -1493,7 +1498,7 @@ static int varlink_dispatch_method(sd_varlink *v) {
 
                 } else if (streq(k, "oneway")) {
 
-                        if ((flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE)) != 0)
+                        if ((flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE|SD_VARLINK_METHOD_UPGRADE)) != 0)
                                 goto invalid;
 
                         if (!sd_json_variant_is_boolean(e))
@@ -1504,7 +1509,7 @@ static int varlink_dispatch_method(sd_varlink *v) {
 
                 } else if (streq(k, "more")) {
 
-                        if ((flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE)) != 0)
+                        if ((flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE|SD_VARLINK_METHOD_UPGRADE)) != 0)
                                 goto invalid;
 
                         if (!sd_json_variant_is_boolean(e))
@@ -1513,6 +1518,17 @@ static int varlink_dispatch_method(sd_varlink *v) {
                         if (sd_json_variant_boolean(e))
                                 flags |= SD_VARLINK_METHOD_MORE;
 
+                } else if (streq(k, "upgrade")) {
+
+                        if ((flags & (SD_VARLINK_METHOD_ONEWAY|SD_VARLINK_METHOD_MORE|SD_VARLINK_METHOD_UPGRADE)) != 0)
+                                goto invalid;
+
+                        if (!sd_json_variant_is_boolean(e))
+                                goto invalid;
+
+                        if (sd_json_variant_boolean(e))
+                                flags |= SD_VARLINK_METHOD_UPGRADE;
+
                 } else
                         goto invalid;
         }
@@ -2256,19 +2272,13 @@ _public_ int sd_varlink_observeb(sd_varlink *v, const char *method, ...) {
         return sd_varlink_observe(v, method, parameters);
 }
 
-_public_ int sd_varlink_call_full(
-                sd_varlink *v,
-                const char *method,
-                sd_json_variant *parameters,
-                sd_json_variant **ret_parameters,
-                const char **ret_error_id,
-                sd_varlink_reply_flags_t *ret_flags) {
-
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
+/* On success v->state will equal VARLINK_CALLED, the caller is responsible to adjust the state further if
+ * needed */
+static int varlink_call_internal(sd_varlink *v, sd_json_variant *request) {
         int r;
 
-        assert_return(v, -EINVAL);
-        assert_return(method, -EINVAL);
+        assert(v);
+        assert(request);
 
         if (v->state == VARLINK_DISCONNECTED)
                 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
@@ -2281,14 +2291,7 @@ _public_ int sd_varlink_call_full(
          * that we can assign a new reply shortly. */
         varlink_clear_current(v);
 
-        r = sd_json_buildo(
-                        &m,
-                        SD_JSON_BUILD_PAIR_STRING("method", method),
-                        JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
-        if (r < 0)
-                return varlink_log_errno(v, r, "Failed to build json message: %m");
-
-        r = varlink_enqueue_json(v, m);
+        r = varlink_enqueue_json(v, request);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -2310,29 +2313,9 @@ _public_ int sd_varlink_call_full(
 
         switch (v->state) {
 
-        case VARLINK_CALLED: {
+        case VARLINK_CALLED:
                 assert(v->current);
-
-                varlink_set_state(v, VARLINK_IDLE_CLIENT);
-                assert(v->n_pending == 1);
-                v->n_pending--;
-
-                sd_json_variant *e = sd_json_variant_by_key(v->current, "error"),
-                        *p = sd_json_variant_by_key(v->current, "parameters");
-
-                /* If caller doesn't ask for the error string, then let's return an error code in case of failure */
-                if (!ret_error_id && e)
-                        return sd_varlink_error_to_errno(sd_json_variant_string(e), p);
-
-                if (ret_parameters)
-                        *ret_parameters = p;
-                if (ret_error_id)
-                        *ret_error_id = e ? sd_json_variant_string(e) : NULL;
-                if (ret_flags)
-                        *ret_flags = v->current_reply_flags;
-
-                return 1;
-        }
+                return 0;
 
         case VARLINK_PENDING_DISCONNECT:
         case VARLINK_DISCONNECTED:
@@ -2346,6 +2329,52 @@ _public_ int sd_varlink_call_full(
         }
 }
 
+_public_ int sd_varlink_call_full(
+                sd_varlink *v,
+                const char *method,
+                sd_json_variant *parameters,
+                sd_json_variant **ret_parameters,
+                const char **ret_error_id,
+                sd_varlink_reply_flags_t *ret_flags) {
+
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
+        int r;
+
+        assert_return(v, -EINVAL);
+        assert_return(method, -EINVAL);
+
+        r = sd_json_buildo(
+                        &m,
+                        SD_JSON_BUILD_PAIR_STRING("method", method),
+                        JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
+        if (r < 0)
+                return varlink_log_errno(v, r, "Failed to build json message: %m");
+
+        r = varlink_call_internal(v, m);
+        if (r < 0)
+                return r;
+
+        varlink_set_state(v, VARLINK_IDLE_CLIENT);
+        assert(v->n_pending == 1);
+        v->n_pending--;
+
+        sd_json_variant *e = sd_json_variant_by_key(v->current, "error"),
+                *p = sd_json_variant_by_key(v->current, "parameters");
+
+        /* If caller doesn't ask for the error string, then let's return an error code in case of failure */
+        if (!ret_error_id && e)
+                return sd_varlink_error_to_errno(sd_json_variant_string(e), p);
+
+        if (ret_parameters)
+                *ret_parameters = p;
+        if (ret_error_id)
+                *ret_error_id = e ? sd_json_variant_string(e) : NULL;
+        if (ret_flags)
+                *ret_flags = v->current_reply_flags;
+
+        return 1;
+}
+
 _public_ int sd_varlink_call(
                 sd_varlink *v,
                 const char *method,
@@ -2356,6 +2385,116 @@ _public_ int sd_varlink_call(
         return sd_varlink_call_full(v, method, parameters, ret_parameters, ret_error_id, NULL);
 }
 
+_public_ int sd_varlink_call_and_upgrade(
+                sd_varlink *v,
+                const char *method,
+                sd_json_variant *parameters,
+                sd_json_variant **ret_parameters,
+                const char **ret_error_id,
+                int *ret_input_fd,
+                int *ret_output_fd) {
+
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
+        int r;
+
+        assert_return(v, -EINVAL);
+        assert_return(method, -EINVAL);
+        assert_return(ret_input_fd || ret_output_fd, -EINVAL);
+
+        r = sd_json_buildo(
+                        &m,
+                        SD_JSON_BUILD_PAIR_STRING("method", method),
+                        JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters),
+                        SD_JSON_BUILD_PAIR_BOOLEAN("upgrade", true));
+        if (r < 0)
+                return varlink_log_errno(v, r, "Failed to build json message: %m");
+
+        v->protocol_upgrade = true;
+        r = varlink_call_internal(v, m);
+        if (r < 0) {
+                v->protocol_upgrade = false;
+                return r;
+        }
+
+        /* ensure we did not consume any data from the upgraded protocol */
+        assert(v->input_buffer_size == 0);
+
+        sd_json_variant *e = sd_json_variant_by_key(v->current, "error"),
+                *p = sd_json_variant_by_key(v->current, "parameters");
+
+        /* don't steal the fd on server error */
+        if (e) {
+                if (ret_error_id) {
+                        *ret_error_id = sd_json_variant_string(e);
+                        if (ret_parameters)
+                                *ret_parameters = p;
+                        r = 0;
+                } else
+                        r = sd_varlink_error_to_errno(sd_json_variant_string(e), p);
+
+                varlink_set_state(v, VARLINK_IDLE_CLIENT);
+                goto finish;
+        }
+
+        /* Pass the connection fds to the caller, it owns them now. Reset to blocking mode
+         * since callers of the upgraded protocol will generally expect normal blocking
+         * semantics. */
+        r = fd_nonblock(v->input_fd, false);
+        if (r < 0) {
+                varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m");
+                goto disconnect;
+        }
+        if (v->input_fd != v->output_fd) {
+                r = fd_nonblock(v->output_fd, false);
+                if (r < 0) {
+                        varlink_log_errno(v, r, "Failed to set output fd to blocking mode: %m");
+                        goto disconnect;
+                }
+        }
+
+        /* Handle the case where the caller is not interested in one of the fds. We need
+         * to consider the case when (input_fd == output_fd) just clear the alias
+         * rather than closing it, since the other branch may hand it out. */
+        bool same_fd = v->input_fd == v->output_fd;
+
+        if (ret_input_fd)
+                *ret_input_fd = TAKE_FD(v->input_fd);
+        else if (!same_fd) {
+                (void) shutdown(v->input_fd, SHUT_RD);
+                v->input_fd = safe_close(v->input_fd);
+        } else
+                v->input_fd = -EBADF;
+
+        if (ret_output_fd)
+                *ret_output_fd = TAKE_FD(v->output_fd);
+        else if (!same_fd) {
+                (void) shutdown(v->output_fd, SHUT_WR);
+                v->output_fd = safe_close(v->output_fd);
+        } else
+                v->output_fd = -EBADF;
+
+        varlink_set_state(v, VARLINK_DISCONNECTED);
+        assert(v->n_pending == 1);
+        v->n_pending--;
+
+        if (ret_parameters)
+                *ret_parameters = p;
+        if (ret_error_id)
+                *ret_error_id = NULL;
+
+        return 1;
+
+disconnect:
+        /* If we fail after the server already accepted the upgrade, nothing can be done but disconnect.
+         * The other side is speaking raw protocol while we expect JSON. */
+        varlink_set_state(v, VARLINK_DISCONNECTED);
+finish:
+        v->protocol_upgrade = false;
+        assert(v->n_pending == 1);
+        v->n_pending--;
+        return r;
+}
+
 _public_ int sd_varlink_callb_ap(
                 sd_varlink *v,
                 const char *method,
index 39b15e12a1fee24727659cb4dc12af20e654a76a..3dfe86d4487e6df64a61e870fc3413e33b152fda 100644 (file)
@@ -170,6 +170,8 @@ typedef struct sd_varlink {
         bool prefer_write:1;
         bool got_pollhup:1;
 
+        bool protocol_upgrade:1; /* Whether a protocol_upgrade was requested */
+
         bool output_buffer_sensitive:1; /* whether to erase the output buffer after writing it to the socket */
         bool input_sensitive:1; /* Whether incoming messages might be sensitive */
 
index 7cda9e7e56ef7d1f63e053aacd1a234a5a1a987e..9d6e939d64f0d9e843bedbfe49883163118671df 100644 (file)
@@ -55,8 +55,9 @@ __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_reply_flags_t) {
 } sd_varlink_reply_flags_t;
 
 __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_method_flags_t) {
-        SD_VARLINK_METHOD_ONEWAY = 1 << 0,
-        SD_VARLINK_METHOD_MORE   = 1 << 1,
+        SD_VARLINK_METHOD_ONEWAY  = 1 << 0,
+        SD_VARLINK_METHOD_MORE    = 1 << 1,
+        SD_VARLINK_METHOD_UPGRADE = 1 << 2,
         _SD_ENUM_FORCE_S64(SD_VARLINK_METHOD)
 } sd_varlink_method_flags_t;
 
@@ -135,6 +136,13 @@ int sd_varlink_callb(sd_varlink *v, const char *method, sd_json_variant **ret_pa
 #define sd_varlink_callbo(v, method, ret_parameters, ret_error_id, ...)    \
         sd_varlink_callb((v), (method), (ret_parameters), (ret_error_id), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
 
+/* Send method call with upgrade, wait for reply, then steal the connection fds for raw I/O.
+ * For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd.
+ * ret_parameters and ret_error_id are borrowed references valid only until v is closed or unreffed.
+ * Returns > 0 if the connection was upgraded, 0 if a Varlink error occurred (and ret_error_id was set),
+ * or < 0 on local failure. */
+int sd_varlink_call_and_upgrade(sd_varlink *v, const char *method, sd_json_variant *parameters, sd_json_variant **ret_parameters, const char **ret_error_id, int *ret_input_fd, int *ret_output_fd);
+
 /* Send method call and begin collecting all 'more' replies into an array, finishing when a final reply is sent */
 int sd_varlink_collect_full(sd_varlink *v, const char *method, sd_json_variant *parameters, sd_json_variant **ret_parameters, const char **ret_error_id, sd_varlink_reply_flags_t *ret_flags);
 int sd_varlink_collect(sd_varlink *v, const char *method, sd_json_variant *parameters, sd_json_variant **ret_parameters, const char **ret_error_id);