From c0696f1f5d3a2be1c8e4c8b45ca7e8a6df7998fa Mon Sep 17 00:00:00 2001 From: Daan De Meyer Date: Mon, 2 Feb 2026 14:23:40 +0100 Subject: [PATCH] sd-varlink: Introduce varlink_set_sentinel() Streaming methods which are not used as a continuous subscription but instead only send a series of objects all end up with the same workaround to be able to figure out when to send sd_varlink_reply() or sd_varlink_notify(). Let's generalize this in sd-varlink itself. Let's introduce the concept of a sentinel, which is a reply that will be sent by sd-varlink if no other reply was queued by a method callback. The sentinel is configured with varlink_set_sentinel(). If a sentinel is configured, sd_varlink_reply() can be used more than once in streaming methods to queue multiple values to stream to the client. The last queued reply is not sent until the callback finishes. When the callback finishes, the last reply is sent without "continues: more". If no reply was queued, the sentinel is sent. This always using only sd_varlink_reply() in such streaming methods and leaves sd_varlink_notify() available solely for continuous subscription streaming methods, where we never use sd_varlink_reply() and instead disconnect when the server exits. --- src/libsystemd/sd-varlink/sd-varlink.c | 124 +++++++- src/libsystemd/sd-varlink/varlink-internal.h | 11 +- src/libsystemd/sd-varlink/varlink-util.c | 26 ++ src/libsystemd/sd-varlink/varlink-util.h | 2 + src/test/test-varlink.c | 311 +++++++++++++++++++ 5 files changed, 455 insertions(+), 19 deletions(-) diff --git a/src/libsystemd/sd-varlink/sd-varlink.c b/src/libsystemd/sd-varlink/sd-varlink.c index a3f7b754954..de1d172762a 100644 --- a/src/libsystemd/sd-varlink/sd-varlink.c +++ b/src/libsystemd/sd-varlink/sd-varlink.c @@ -616,6 +616,12 @@ static void varlink_clear_current(sd_varlink *v) { close_many(v->input_fds, v->n_input_fds); v->input_fds = mfree(v->input_fds); v->n_input_fds = 0; + + v->previous = varlink_json_queue_item_free(v->previous); + if (v->sentinel != POINTER_MAX) + v->sentinel = mfree(v->sentinel); + else + v->sentinel = NULL; } static void varlink_clear(sd_varlink *v) { @@ -1383,6 +1389,19 @@ static int varlink_format_queue(sd_varlink *v) { return 0; } +static int varlink_enqueue_item(sd_varlink *v, VarlinkJsonQueueItem *q) { + assert(v); + assert(q); + + if (v->n_output_queue >= VARLINK_QUEUE_MAX) + return -ENOBUFS; + + LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q); + v->output_queue_tail = q; + v->n_output_queue++; + return 0; +} + static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) { VarlinkJsonQueueItem *q; @@ -1404,9 +1423,9 @@ static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) { v->n_pushed_fds = 0; /* fds now belong to the queue entry */ - LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q); - v->output_queue_tail = q; - v->n_output_queue++; + /* We already checked the precondition ourselves so this call cannot fail. */ + assert_se(varlink_enqueue_item(v, q) >= 0); + return 0; } @@ -1536,16 +1555,46 @@ static int varlink_dispatch_method(sd_varlink *v) { if (!invalid) { r = callback(v, parameters, flags, v->userdata); - if (r < 0 && VARLINK_STATE_WANTS_REPLY(v->state)) { - varlink_log_errno(v, r, "Callback for %s returned error: %m", method); + if (VARLINK_STATE_WANTS_REPLY(v->state)) { + if (r < 0) { + varlink_log_errno(v, r, "Callback for %s returned error: %m", method); + + /* We got an error back from the callback. Propagate it to the client + * if the method call remains unanswered. */ + r = sd_varlink_error_errno(v, r); + } else if (v->sentinel) { + if (v->previous) { + r = varlink_enqueue_item(v, v->previous); + if (r >= 0) { + TAKE_PTR(v->previous); + varlink_set_state(v, VARLINK_PROCESSED_METHOD); + } + } else { + char *sentinel = TAKE_PTR(v->sentinel); + + /* Propagate the sentinel to the client if one was configured + * and no replies were enqueued by the callback. */ + if (sentinel == POINTER_MAX) + r = sd_varlink_reply(v, NULL); + else + r = sd_varlink_error(v, sentinel, NULL); + + if (sentinel != POINTER_MAX) + free(sentinel); + } + if (r < 0) + varlink_log_errno(v, r, "Failed to process sentinel for method '%s': %m", method); + } else { + assert(!v->previous); + r = 0; + } - /* We got an error back from the callback. Propagate it to the client if the - * method call remains unanswered. */ - r = sd_varlink_error_errno(v, r); - /* If we didn't manage to enqueue an error response, then fail the connection completely. */ + /* If we didn't manage to enqueue a response, then fail the connection completely. */ if (r < 0 && VARLINK_STATE_WANTS_REPLY(v->state)) goto fail; - } + + } else + assert(!v->previous); } } else if (VARLINK_STATE_WANTS_REPLY(v->state)) { r = sd_varlink_errorbo(v, SD_VARLINK_ERROR_METHOD_NOT_FOUND, SD_JSON_BUILD_PAIR_STRING("method", method)); @@ -2501,33 +2550,58 @@ _public_ int sd_varlink_collectb( } _public_ int sd_varlink_reply(sd_varlink *v, sd_json_variant *parameters) { - _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL; int r; assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) - return -ENOTCONN; + return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); + if (!IN_SET(v->state, VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) - return -EBUSY; + return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy."); + + bool more = IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE); /* Validate parameters BEFORE sanitization */ if (v->current_method) { const char *bad_field = NULL; - r = varlink_idl_validate_method_reply(v->current_method, parameters, /* flags= */ 0, &bad_field); - if (r < 0) + r = varlink_idl_validate_method_reply(v->current_method, parameters, more && v->sentinel ? SD_VARLINK_REPLY_CONTINUES : 0, &bad_field); + if (r == -EBADE) + varlink_log_errno(v, r, "Method reply for %s() has 'continues' flag set, but IDL structure doesn't allow that, ignoring: %m", + v->current_method->name); + else if (r < 0) /* Please adjust test/units/end.sh when updating the log message. */ varlink_log_errno(v, r, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m", v->current_method->name, strna(bad_field)); } + _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL; r = sd_json_buildo(&m, JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters)); if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); + if (more && v->sentinel) { + if (v->previous) { + r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true); + if (r < 0) + return r; + + r = varlink_enqueue_item(v, v->previous); + if (r < 0) + return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); + } + + v->previous = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds); + if (!v->previous) + return -ENOMEM; + + v->n_pushed_fds = 0; /* fds now belong to the queue entry */ + return 1; + } + r = varlink_enqueue_json(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2590,6 +2664,21 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy."); + if (v->previous) { + r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true); + if (r < 0) + return r; + + /* If we have a previous reply still ready make sure we queue it before the error. We only + * ever set "previous" if we're in a streaming method so we pass more=true uncondtionally + * here as we know we're still going to queue an error afterwards. */ + r = varlink_enqueue_item(v, v->previous); + if (r < 0) + return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); + + TAKE_PTR(v->previous); + } + /* Reset the list of pushed file descriptors before sending an error reply. We do this here to * simplify code that puts together a complex reply message with fds, and half-way something * fails. In that case the pushed fds need to be flushed out again. Under the assumption that it @@ -2721,6 +2810,11 @@ _public_ int sd_varlink_notify(sd_varlink *v, sd_json_variant *parameters) { assert_return(v, -EINVAL); + if (v->sentinel) + return varlink_log_errno(v, SYNTHETIC_ERRNO(EINVAL), "Cannot use sd_varlink_notify() on method with sentinel set"); + + assert(!v->previous); + if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); diff --git a/src/libsystemd/sd-varlink/varlink-internal.h b/src/libsystemd/sd-varlink/varlink-internal.h index a2c54a0bf5a..64868514244 100644 --- a/src/libsystemd/sd-varlink/varlink-internal.h +++ b/src/libsystemd/sd-varlink/varlink-internal.h @@ -82,7 +82,7 @@ struct VarlinkJsonQueueItem { int fds[]; }; -struct sd_varlink { +typedef struct sd_varlink { unsigned n_ref; sd_varlink_server *server; @@ -157,6 +157,9 @@ struct sd_varlink { sd_varlink_reply_flags_t current_reply_flags; sd_varlink_symbol *current_method; + VarlinkJsonQueueItem *previous; + char *sentinel; + int peer_pidfd; struct ucred ucred; bool ucred_acquired:1; @@ -189,7 +192,7 @@ struct sd_varlink { sd_event_source *defer_event_source; PidRef exec_pidref; -}; +} sd_varlink; typedef struct VarlinkServerSocket VarlinkServerSocket; @@ -204,7 +207,7 @@ struct VarlinkServerSocket { LIST_FIELDS(VarlinkServerSocket, sockets); }; -struct sd_varlink_server { +typedef struct sd_varlink_server { unsigned n_ref; sd_varlink_server_flags_t flags; @@ -234,7 +237,7 @@ struct sd_varlink_server { unsigned connections_per_uid_max; bool exit_on_idle; -}; +} sd_varlink_server; #define varlink_log_errno(v, error, fmt, ...) \ log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__) diff --git a/src/libsystemd/sd-varlink/varlink-util.c b/src/libsystemd/sd-varlink/varlink-util.c index 7ae3a66af25..916ac2ba996 100644 --- a/src/libsystemd/sd-varlink/varlink-util.c +++ b/src/libsystemd/sd-varlink/varlink-util.c @@ -6,6 +6,7 @@ #include "pidref.h" #include "set.h" #include "string-util.h" +#include "varlink-internal.h" #include "varlink-util.h" #include "version.h" @@ -204,6 +205,31 @@ int varlink_check_privileged_peer(sd_varlink *vl) { return 0; } +int varlink_set_sentinel(sd_varlink *v, const char *error_id) { + _cleanup_free_ char *s = NULL; + + assert(v); + + /* If the caller doesn't want a reply, then don't set a sentinel. */ + if (v->state == VARLINK_PROCESSING_METHOD_ONEWAY) + return 0; + + /* This has to be called during a callback, and not after it has exited. */ + assert(IN_SET(v->state, VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE)); + + if (error_id) { + s = strdup(error_id); + if (!s) + return -ENOMEM; + } + + if (v->sentinel != POINTER_MAX) + free(v->sentinel); + + v->sentinel = s ? TAKE_PTR(s) : POINTER_MAX; + return 0; +} + DEFINE_HASH_OPS_WITH_VALUE_DESTRUCTOR( varlink_hash_ops, void, diff --git a/src/libsystemd/sd-varlink/varlink-util.h b/src/libsystemd/sd-varlink/varlink-util.h index ba0f2322535..dee79555ce9 100644 --- a/src/libsystemd/sd-varlink/varlink-util.h +++ b/src/libsystemd/sd-varlink/varlink-util.h @@ -28,4 +28,6 @@ int varlink_server_new( int varlink_check_privileged_peer(sd_varlink *vl); +int varlink_set_sentinel(sd_varlink *v, const char *error_id); + extern const struct hash_ops varlink_hash_ops; diff --git a/src/test/test-varlink.c b/src/test/test-varlink.c index 61e2b06978f..bf1390fba1d 100644 --- a/src/test/test-varlink.c +++ b/src/test/test-varlink.c @@ -441,4 +441,315 @@ TEST(invalid_parameter) { ASSERT_OK(sd_event_loop(e)); } +static int method_with_error_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + /* Set an error sentinel and return without sending a reply. The sentinel error should be sent automatically. */ + ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError")); + return 0; +} + +static int reply_sentinel_error(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + ASSERT_STREQ(error_id, "io.test.SentinelError"); + ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS)); + return 0; +} + +TEST(sentinel_error) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_default(&e)); + + _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL; + ASSERT_OK(sd_varlink_server_new(&s, 0)); + + ASSERT_OK(sd_varlink_server_attach_event(s, e, 0)); + + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.ErrorSentinel", method_with_error_sentinel)); + + int connfd[2]; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd)); + ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL)); + + _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL; + ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1])); + + ASSERT_OK(sd_varlink_attach_event(c, e, 0)); + + ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_error)); + + ASSERT_OK(sd_varlink_invoke(c, "io.test.ErrorSentinel", /* parameters= */ NULL)); + + ASSERT_OK(sd_event_loop(e)); +} + +static int method_with_empty_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + /* Set an empty sentinel and return without sending a reply. An empty reply should be sent automatically. */ + ASSERT_OK(varlink_set_sentinel(link, /* error_id= */ NULL)); + return 0; +} + +static int reply_sentinel_empty(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + ASSERT_NULL(error_id); + ASSERT_TRUE(sd_json_variant_is_blank_object(parameters)); + ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS)); + return 0; +} + +TEST(sentinel_empty) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_default(&e)); + + _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL; + ASSERT_OK(sd_varlink_server_new(&s, 0)); + + ASSERT_OK(sd_varlink_server_attach_event(s, e, 0)); + + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.EmptySentinel", method_with_empty_sentinel)); + + int connfd[2]; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd)); + ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL)); + + _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL; + ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1])); + + ASSERT_OK(sd_varlink_attach_event(c, e, 0)); + + ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_empty)); + + ASSERT_OK(sd_varlink_invoke(c, "io.test.EmptySentinel", /* parameters= */ NULL)); + + ASSERT_OK(sd_event_loop(e)); +} + +static int method_with_sentinel_but_reply(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + /* Set a sentinel but also send a reply. The sentinel should not be used. */ + ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError")); + return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "explicit-reply")); +} + +static int reply_sentinel_explicit(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + ASSERT_NULL(error_id); + ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "explicit-reply"); + ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS)); + return 0; +} + +TEST(sentinel_with_explicit_reply) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_default(&e)); + + _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL; + ASSERT_OK(sd_varlink_server_new(&s, 0)); + + ASSERT_OK(sd_varlink_server_attach_event(s, e, 0)); + + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.SentinelButReply", method_with_sentinel_but_reply)); + + int connfd[2]; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd)); + ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL)); + + _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL; + ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1])); + + ASSERT_OK(sd_varlink_attach_event(c, e, 0)); + + ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_explicit)); + + ASSERT_OK(sd_varlink_invoke(c, "io.test.SentinelButReply", /* parameters= */ NULL)); + + ASSERT_OK(sd_event_loop(e)); +} + +static int method_with_oneway_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + /* The method was called oneway, so varlink_set_sentinel() should be a no-op and the server should + * transition back to idle without sending any reply. */ + ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_ONEWAY)); + ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError")); + return 0; +} + +static int method_oneway_sentinel_pong(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "pong")); +} + +static int reply_oneway_sentinel_pong(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + /* If we get here, it means the oneway sentinel call didn't break the connection and the server + * properly handled a subsequent regular method call. */ + ASSERT_NULL(error_id); + ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "pong"); + ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS)); + return 0; +} + +TEST(sentinel_oneway) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_default(&e)); + + _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL; + ASSERT_OK(sd_varlink_server_new(&s, 0)); + + ASSERT_OK(sd_varlink_server_attach_event(s, e, 0)); + + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.OnewaySentinel", method_with_oneway_sentinel)); + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Pong", method_oneway_sentinel_pong)); + + int connfd[2]; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd)); + ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL)); + + _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL; + ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1])); + + ASSERT_OK(sd_varlink_attach_event(c, e, 0)); + + /* Send a oneway call with a sentinel — the sentinel should be silently ignored. */ + ASSERT_OK(sd_varlink_send(c, "io.test.OnewaySentinel", /* parameters= */ NULL)); + + /* Follow up with a regular call to verify the server is still functional. */ + ASSERT_OK(sd_varlink_bind_reply(c, reply_oneway_sentinel_pong)); + ASSERT_OK(sd_varlink_invoke(c, "io.test.Pong", /* parameters= */ NULL)); + + ASSERT_OK(sd_event_loop(e)); +} + +static int method_with_fd_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + _cleanup_close_ int fd1 = -EBADF, fd2 = -EBADF; + + ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_MORE)); + + /* Set a sentinel so sd_varlink_reply() defers sending: each reply and its pushed fds are captured in + * the queue, and the last one is sent as the final reply when the callback returns. */ + ASSERT_OK(varlink_set_sentinel(link, /* error_id= */ NULL)); + + /* First reply: push one fd with "alpha" content */ + ASSERT_OK(fd1 = memfd_new_and_seal_string("data", "alpha")); + ASSERT_OK_EQ(sd_varlink_push_fd(link, fd1), 0); + TAKE_FD(fd1); + ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_INTEGER("index", 0))); + + /* Second reply: push one fd with "beta" content */ + ASSERT_OK(fd2 = memfd_new_and_seal_string("data", "beta")); + ASSERT_OK_EQ(sd_varlink_push_fd(link, fd2), 0); + TAKE_FD(fd2); + ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_INTEGER("index", 1))); + + return 0; +} + +static int reply_sentinel_fd(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + int *state = ASSERT_PTR(sd_varlink_get_userdata(link)); + + if (*state == 0) { + /* First reply: should carry "continues" flag and fd with "alpha" */ + ASSERT_NULL(error_id); + ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES)); + ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "index")), 0); + + int fd; + ASSERT_OK(fd = sd_varlink_peek_fd(link, 0)); + test_fd(fd, "alpha", STRLEN("alpha")); + (*state)++; + } else if (*state == 1) { + /* Second (final) reply: no "continues" flag, fd with "beta" */ + ASSERT_NULL(error_id); + ASSERT_FALSE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES)); + ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "index")), 1); + + int fd; + ASSERT_OK(fd = sd_varlink_peek_fd(link, 0)); + test_fd(fd, "beta", STRLEN("beta")); + + ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS)); + } else + assert_not_reached(); + + return 0; +} + +TEST(sentinel_with_fds) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_default(&e)); + + _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL; + ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_ALLOW_FD_PASSING_INPUT|SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT)); + + ASSERT_OK(sd_varlink_server_attach_event(s, e, 0)); + + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.FDSentinel", method_with_fd_sentinel)); + + int connfd[2]; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd)); + ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL)); + + _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL; + ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1])); + ASSERT_OK(sd_varlink_set_allow_fd_passing_input(c, true)); + ASSERT_OK(sd_varlink_set_allow_fd_passing_output(c, true)); + + ASSERT_OK(sd_varlink_attach_event(c, e, 0)); + + int state = 0; + sd_varlink_set_userdata(c, &state); + ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_fd)); + + ASSERT_OK(sd_varlink_observe(c, "io.test.FDSentinel", /* parameters= */ NULL)); + + ASSERT_OK(sd_event_loop(e)); +} + +static int method_with_notify_then_error(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) { + /* Send a notify first, then return an error. The notify should be received before the error. */ + ASSERT_OK(sd_varlink_notifybo(link, SD_JSON_BUILD_PAIR_STRING("status", "in-progress"))); + return sd_varlink_error(link, "io.test.OperationFailed", /* parameters= */ NULL); +} + +static int reply_notify_then_error(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) { + int *state = ASSERT_PTR(sd_varlink_get_userdata(link)); + + if (*state == 0) { + /* First callback: should be the notify (no error, has "more" flag) */ + ASSERT_NULL(error_id); + ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES)); + ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "status")), "in-progress"); + (*state)++; + } else if (*state == 1) { + /* Second callback: should be the error */ + ASSERT_STREQ(error_id, "io.test.OperationFailed"); + ASSERT_FALSE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES)); + ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS)); + } else + assert_not_reached(); + + return 0; +} + +TEST(notify_then_error) { + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + ASSERT_OK(sd_event_default(&e)); + + _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL; + ASSERT_OK(sd_varlink_server_new(&s, 0)); + + ASSERT_OK(sd_varlink_server_attach_event(s, e, 0)); + + ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.NotifyThenError", method_with_notify_then_error)); + + int connfd[2]; + ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd)); + ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL)); + + _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL; + ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1])); + + ASSERT_OK(sd_varlink_attach_event(c, e, 0)); + + int state = 0; + sd_varlink_set_userdata(c, &state); + ASSERT_OK(sd_varlink_bind_reply(c, reply_notify_then_error)); + + ASSERT_OK(sd_varlink_observe(c, "io.test.NotifyThenError", /* parameters= */ NULL)); + + ASSERT_OK(sd_event_loop(e)); +} + DEFINE_TEST_MAIN(LOG_DEBUG); -- 2.47.3