]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-varlink: Introduce varlink_set_sentinel()
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Mon, 2 Feb 2026 13:23:40 +0000 (14:23 +0100)
committerDaan De Meyer <daan@amutable.com>
Wed, 11 Feb 2026 13:24:03 +0000 (14:24 +0100)
Streaming methods which are not used as a continuous subscription but
instead only send a series of objects all end up with the same workaround
to be able to figure out when to send sd_varlink_reply() or sd_varlink_notify().
Let's generalize this in sd-varlink itself.

Let's introduce the concept of a sentinel, which is a reply that will be sent
by sd-varlink if no other reply was queued by a method callback. The sentinel
is configured with varlink_set_sentinel(). If a sentinel is configured,
sd_varlink_reply() can be used more than once in streaming methods to queue
multiple values to stream to the client. The last queued reply is not sent
until the callback finishes. When the callback finishes, the last reply is
sent without "continues: more". If no reply was queued, the sentinel is sent.

This always using only sd_varlink_reply() in such streaming methods and
leaves sd_varlink_notify() available solely for continuous subscription
streaming methods, where we never use sd_varlink_reply() and instead disconnect
when the server exits.

src/libsystemd/sd-varlink/sd-varlink.c
src/libsystemd/sd-varlink/varlink-internal.h
src/libsystemd/sd-varlink/varlink-util.c
src/libsystemd/sd-varlink/varlink-util.h
src/test/test-varlink.c

index a3f7b75495431b9768f439853d0e10434c9a00a3..de1d172762aad792cf372ba458b6b73613eb7337 100644 (file)
@@ -616,6 +616,12 @@ static void varlink_clear_current(sd_varlink *v) {
         close_many(v->input_fds, v->n_input_fds);
         v->input_fds = mfree(v->input_fds);
         v->n_input_fds = 0;
+
+        v->previous = varlink_json_queue_item_free(v->previous);
+        if (v->sentinel != POINTER_MAX)
+                v->sentinel = mfree(v->sentinel);
+        else
+                v->sentinel = NULL;
 }
 
 static void varlink_clear(sd_varlink *v) {
@@ -1383,6 +1389,19 @@ static int varlink_format_queue(sd_varlink *v) {
         return 0;
 }
 
+static int varlink_enqueue_item(sd_varlink *v, VarlinkJsonQueueItem *q) {
+        assert(v);
+        assert(q);
+
+        if (v->n_output_queue >= VARLINK_QUEUE_MAX)
+                return -ENOBUFS;
+
+        LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q);
+        v->output_queue_tail = q;
+        v->n_output_queue++;
+        return 0;
+}
+
 static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) {
         VarlinkJsonQueueItem *q;
 
@@ -1404,9 +1423,9 @@ static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) {
 
         v->n_pushed_fds = 0; /* fds now belong to the queue entry */
 
-        LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q);
-        v->output_queue_tail = q;
-        v->n_output_queue++;
+        /* We already checked the precondition ourselves so this call cannot fail. */
+        assert_se(varlink_enqueue_item(v, q) >= 0);
+
         return 0;
 }
 
@@ -1536,16 +1555,46 @@ static int varlink_dispatch_method(sd_varlink *v) {
 
                 if (!invalid) {
                         r = callback(v, parameters, flags, v->userdata);
-                        if (r < 0 && VARLINK_STATE_WANTS_REPLY(v->state)) {
-                                varlink_log_errno(v, r, "Callback for %s returned error: %m", method);
+                        if (VARLINK_STATE_WANTS_REPLY(v->state)) {
+                                if (r < 0) {
+                                        varlink_log_errno(v, r, "Callback for %s returned error: %m", method);
+
+                                        /* We got an error back from the callback. Propagate it to the client
+                                         * if the method call remains unanswered. */
+                                        r = sd_varlink_error_errno(v, r);
+                                } else if (v->sentinel) {
+                                        if (v->previous) {
+                                                r = varlink_enqueue_item(v, v->previous);
+                                                if (r >= 0) {
+                                                        TAKE_PTR(v->previous);
+                                                        varlink_set_state(v, VARLINK_PROCESSED_METHOD);
+                                                }
+                                        } else {
+                                                char *sentinel = TAKE_PTR(v->sentinel);
+
+                                                /* Propagate the sentinel to the client if one was configured
+                                                 * and no replies were enqueued by the callback. */
+                                                if (sentinel == POINTER_MAX)
+                                                        r = sd_varlink_reply(v, NULL);
+                                                else
+                                                        r = sd_varlink_error(v, sentinel, NULL);
+
+                                                if (sentinel != POINTER_MAX)
+                                                        free(sentinel);
+                                        }
+                                        if (r < 0)
+                                                varlink_log_errno(v, r, "Failed to process sentinel for method '%s': %m", method);
+                                } else {
+                                        assert(!v->previous);
+                                        r = 0;
+                                }
 
-                                /* We got an error back from the callback. Propagate it to the client if the
-                                 * method call remains unanswered. */
-                                r = sd_varlink_error_errno(v, r);
-                                /* If we didn't manage to enqueue an error response, then fail the connection completely. */
+                                /* If we didn't manage to enqueue a response, then fail the connection completely. */
                                 if (r < 0 && VARLINK_STATE_WANTS_REPLY(v->state))
                                         goto fail;
-                        }
+
+                        } else
+                                assert(!v->previous);
                 }
         } else if (VARLINK_STATE_WANTS_REPLY(v->state)) {
                 r = sd_varlink_errorbo(v, SD_VARLINK_ERROR_METHOD_NOT_FOUND, SD_JSON_BUILD_PAIR_STRING("method", method));
@@ -2501,33 +2550,58 @@ _public_ int sd_varlink_collectb(
 }
 
 _public_ int sd_varlink_reply(sd_varlink *v, sd_json_variant *parameters) {
-        _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
         int r;
 
         assert_return(v, -EINVAL);
 
         if (v->state == VARLINK_DISCONNECTED)
-                return -ENOTCONN;
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
+
         if (!IN_SET(v->state,
                     VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
                     VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
-                return -EBUSY;
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
+
+        bool more = IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE);
 
         /* Validate parameters BEFORE sanitization */
         if (v->current_method) {
                 const char *bad_field = NULL;
 
-                r = varlink_idl_validate_method_reply(v->current_method, parameters, /* flags= */ 0, &bad_field);
-                if (r < 0)
+                r = varlink_idl_validate_method_reply(v->current_method, parameters, more && v->sentinel ? SD_VARLINK_REPLY_CONTINUES : 0, &bad_field);
+                if (r == -EBADE)
+                        varlink_log_errno(v, r, "Method reply for %s() has 'continues' flag set, but IDL structure doesn't allow that, ignoring: %m",
+                                          v->current_method->name);
+                else if (r < 0)
                         /* Please adjust test/units/end.sh when updating the log message. */
                         varlink_log_errno(v, r, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m",
                                           v->current_method->name, strna(bad_field));
         }
 
+        _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
         r = sd_json_buildo(&m, JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
+        if (more && v->sentinel) {
+                if (v->previous) {
+                        r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true);
+                        if (r < 0)
+                                return r;
+
+                        r = varlink_enqueue_item(v, v->previous);
+                        if (r < 0)
+                                return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+                }
+
+                v->previous = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds);
+                if (!v->previous)
+                        return -ENOMEM;
+
+                v->n_pushed_fds = 0; /* fds now belong to the queue entry */
+                return 1;
+        }
+
         r = varlink_enqueue_json(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
@@ -2590,6 +2664,21 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia
                     VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
                 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
 
+        if (v->previous) {
+                r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true);
+                if (r < 0)
+                        return r;
+
+                /* If we have a previous reply still ready make sure we queue it before the error. We only
+                 * ever set "previous" if we're in a streaming method so we pass more=true uncondtionally
+                 * here as we know we're still going to queue an error afterwards. */
+                r = varlink_enqueue_item(v, v->previous);
+                if (r < 0)
+                        return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+
+                TAKE_PTR(v->previous);
+        }
+
         /* Reset the list of pushed file descriptors before sending an error reply. We do this here to
          * simplify code that puts together a complex reply message with fds, and half-way something
          * fails. In that case the pushed fds need to be flushed out again. Under the assumption that it
@@ -2721,6 +2810,11 @@ _public_ int sd_varlink_notify(sd_varlink *v, sd_json_variant *parameters) {
 
         assert_return(v, -EINVAL);
 
+        if (v->sentinel)
+                return varlink_log_errno(v, SYNTHETIC_ERRNO(EINVAL), "Cannot use sd_varlink_notify() on method with sentinel set");
+
+        assert(!v->previous);
+
         if (v->state == VARLINK_DISCONNECTED)
                 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
 
index a2c54a0bf5a80fbb031d580985c02b3127aff3de..64868514244999d67d65c3eee8ec4dc468f86afd 100644 (file)
@@ -82,7 +82,7 @@ struct VarlinkJsonQueueItem {
         int fds[];
 };
 
-struct sd_varlink {
+typedef struct sd_varlink {
         unsigned n_ref;
 
         sd_varlink_server *server;
@@ -157,6 +157,9 @@ struct sd_varlink {
         sd_varlink_reply_flags_t current_reply_flags;
         sd_varlink_symbol *current_method;
 
+        VarlinkJsonQueueItem *previous;
+        char *sentinel;
+
         int peer_pidfd;
         struct ucred ucred;
         bool ucred_acquired:1;
@@ -189,7 +192,7 @@ struct sd_varlink {
         sd_event_source *defer_event_source;
 
         PidRef exec_pidref;
-};
+} sd_varlink;
 
 typedef struct VarlinkServerSocket VarlinkServerSocket;
 
@@ -204,7 +207,7 @@ struct VarlinkServerSocket {
         LIST_FIELDS(VarlinkServerSocket, sockets);
 };
 
-struct sd_varlink_server {
+typedef struct sd_varlink_server {
         unsigned n_ref;
         sd_varlink_server_flags_t flags;
 
@@ -234,7 +237,7 @@ struct sd_varlink_server {
         unsigned connections_per_uid_max;
 
         bool exit_on_idle;
-};
+} sd_varlink_server;
 
 #define varlink_log_errno(v, error, fmt, ...)                           \
         log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
index 7ae3a66af25a05ce3a1aa067625ac303d234735c..916ac2ba996fe68a108db082c46f51a8b587fd11 100644 (file)
@@ -6,6 +6,7 @@
 #include "pidref.h"
 #include "set.h"
 #include "string-util.h"
+#include "varlink-internal.h"
 #include "varlink-util.h"
 #include "version.h"
 
@@ -204,6 +205,31 @@ int varlink_check_privileged_peer(sd_varlink *vl) {
         return 0;
 }
 
+int varlink_set_sentinel(sd_varlink *v, const char *error_id) {
+        _cleanup_free_ char *s = NULL;
+
+        assert(v);
+
+        /* If the caller doesn't want a reply, then don't set a sentinel. */
+        if (v->state == VARLINK_PROCESSING_METHOD_ONEWAY)
+                return 0;
+
+        /* This has to be called during a callback, and not after it has exited. */
+        assert(IN_SET(v->state, VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE));
+
+        if (error_id) {
+                s = strdup(error_id);
+                if (!s)
+                        return -ENOMEM;
+        }
+
+        if (v->sentinel != POINTER_MAX)
+                free(v->sentinel);
+
+        v->sentinel = s ? TAKE_PTR(s) : POINTER_MAX;
+        return 0;
+}
+
 DEFINE_HASH_OPS_WITH_VALUE_DESTRUCTOR(
                 varlink_hash_ops,
                 void,
index ba0f23225356acef83375c3cc6690c16f91f29bf..dee79555ce9211213364890094482979604302a7 100644 (file)
@@ -28,4 +28,6 @@ int varlink_server_new(
 
 int varlink_check_privileged_peer(sd_varlink *vl);
 
+int varlink_set_sentinel(sd_varlink *v, const char *error_id);
+
 extern const struct hash_ops varlink_hash_ops;
index 61e2b06978fe4b700a6aa9992d76e974f5c40c78..bf1390fba1dc4ea937f16a4b37b968ec629ade69 100644 (file)
@@ -441,4 +441,315 @@ TEST(invalid_parameter) {
         ASSERT_OK(sd_event_loop(e));
 }
 
+static int method_with_error_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Set an error sentinel and return without sending a reply. The sentinel error should be sent automatically. */
+        ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError"));
+        return 0;
+}
+
+static int reply_sentinel_error(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        ASSERT_STREQ(error_id, "io.test.SentinelError");
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(sentinel_error) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.ErrorSentinel", method_with_error_sentinel));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_error));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.ErrorSentinel", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_empty_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Set an empty sentinel and return without sending a reply. An empty reply should be sent automatically. */
+        ASSERT_OK(varlink_set_sentinel(link, /* error_id= */ NULL));
+        return 0;
+}
+
+static int reply_sentinel_empty(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        ASSERT_NULL(error_id);
+        ASSERT_TRUE(sd_json_variant_is_blank_object(parameters));
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(sentinel_empty) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.EmptySentinel", method_with_empty_sentinel));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_empty));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.EmptySentinel", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_sentinel_but_reply(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Set a sentinel but also send a reply. The sentinel should not be used. */
+        ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError"));
+        return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "explicit-reply"));
+}
+
+static int reply_sentinel_explicit(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        ASSERT_NULL(error_id);
+        ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "explicit-reply");
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(sentinel_with_explicit_reply) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.SentinelButReply", method_with_sentinel_but_reply));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_explicit));
+
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.SentinelButReply", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_oneway_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* The method was called oneway, so varlink_set_sentinel() should be a no-op and the server should
+         * transition back to idle without sending any reply. */
+        ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_ONEWAY));
+        ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError"));
+        return 0;
+}
+
+static int method_oneway_sentinel_pong(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "pong"));
+}
+
+static int reply_oneway_sentinel_pong(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        /* If we get here, it means the oneway sentinel call didn't break the connection and the server
+         * properly handled a subsequent regular method call. */
+        ASSERT_NULL(error_id);
+        ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "pong");
+        ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        return 0;
+}
+
+TEST(sentinel_oneway) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.OnewaySentinel", method_with_oneway_sentinel));
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Pong", method_oneway_sentinel_pong));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        /* Send a oneway call with a sentinel — the sentinel should be silently ignored. */
+        ASSERT_OK(sd_varlink_send(c, "io.test.OnewaySentinel", /* parameters= */ NULL));
+
+        /* Follow up with a regular call to verify the server is still functional. */
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_oneway_sentinel_pong));
+        ASSERT_OK(sd_varlink_invoke(c, "io.test.Pong", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_fd_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        _cleanup_close_ int fd1 = -EBADF, fd2 = -EBADF;
+
+        ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_MORE));
+
+        /* Set a sentinel so sd_varlink_reply() defers sending: each reply and its pushed fds are captured in
+         * the queue, and the last one is sent as the final reply when the callback returns. */
+        ASSERT_OK(varlink_set_sentinel(link, /* error_id= */ NULL));
+
+        /* First reply: push one fd with "alpha" content */
+        ASSERT_OK(fd1 = memfd_new_and_seal_string("data", "alpha"));
+        ASSERT_OK_EQ(sd_varlink_push_fd(link, fd1), 0);
+        TAKE_FD(fd1);
+        ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_INTEGER("index", 0)));
+
+        /* Second reply: push one fd with "beta" content */
+        ASSERT_OK(fd2 = memfd_new_and_seal_string("data", "beta"));
+        ASSERT_OK_EQ(sd_varlink_push_fd(link, fd2), 0);
+        TAKE_FD(fd2);
+        ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_INTEGER("index", 1)));
+
+        return 0;
+}
+
+static int reply_sentinel_fd(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        int *state = ASSERT_PTR(sd_varlink_get_userdata(link));
+
+        if (*state == 0) {
+                /* First reply: should carry "continues" flag and fd with "alpha" */
+                ASSERT_NULL(error_id);
+                ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+                ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "index")), 0);
+
+                int fd;
+                ASSERT_OK(fd = sd_varlink_peek_fd(link, 0));
+                test_fd(fd, "alpha", STRLEN("alpha"));
+                (*state)++;
+        } else if (*state == 1) {
+                /* Second (final) reply: no "continues" flag, fd with "beta" */
+                ASSERT_NULL(error_id);
+                ASSERT_FALSE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+                ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "index")), 1);
+
+                int fd;
+                ASSERT_OK(fd = sd_varlink_peek_fd(link, 0));
+                test_fd(fd, "beta", STRLEN("beta"));
+
+                ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        } else
+                assert_not_reached();
+
+        return 0;
+}
+
+TEST(sentinel_with_fds) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_ALLOW_FD_PASSING_INPUT|SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.FDSentinel", method_with_fd_sentinel));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+        ASSERT_OK(sd_varlink_set_allow_fd_passing_input(c, true));
+        ASSERT_OK(sd_varlink_set_allow_fd_passing_output(c, true));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        int state = 0;
+        sd_varlink_set_userdata(c, &state);
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_fd));
+
+        ASSERT_OK(sd_varlink_observe(c, "io.test.FDSentinel", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_notify_then_error(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+        /* Send a notify first, then return an error. The notify should be received before the error. */
+        ASSERT_OK(sd_varlink_notifybo(link, SD_JSON_BUILD_PAIR_STRING("status", "in-progress")));
+        return sd_varlink_error(link, "io.test.OperationFailed", /* parameters= */ NULL);
+}
+
+static int reply_notify_then_error(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+        int *state = ASSERT_PTR(sd_varlink_get_userdata(link));
+
+        if (*state == 0) {
+                /* First callback: should be the notify (no error, has "more" flag) */
+                ASSERT_NULL(error_id);
+                ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+                ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "status")), "in-progress");
+                (*state)++;
+        } else if (*state == 1) {
+                /* Second callback: should be the error */
+                ASSERT_STREQ(error_id, "io.test.OperationFailed");
+                ASSERT_FALSE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+                ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+        } else
+                assert_not_reached();
+
+        return 0;
+}
+
+TEST(notify_then_error) {
+        _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+        ASSERT_OK(sd_event_default(&e));
+
+        _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+        ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+        ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+        ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.NotifyThenError", method_with_notify_then_error));
+
+        int connfd[2];
+        ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+        ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+        _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+        ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+        ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+        int state = 0;
+        sd_varlink_set_userdata(c, &state);
+        ASSERT_OK(sd_varlink_bind_reply(c, reply_notify_then_error));
+
+        ASSERT_OK(sd_varlink_observe(c, "io.test.NotifyThenError", /* parameters= */ NULL));
+
+        ASSERT_OK(sd_event_loop(e));
+}
+
 DEFINE_TEST_MAIN(LOG_DEBUG);