close_many(v->input_fds, v->n_input_fds);
v->input_fds = mfree(v->input_fds);
v->n_input_fds = 0;
+
+ v->previous = varlink_json_queue_item_free(v->previous);
+ if (v->sentinel != POINTER_MAX)
+ v->sentinel = mfree(v->sentinel);
+ else
+ v->sentinel = NULL;
}
static void varlink_clear(sd_varlink *v) {
return 0;
}
+static int varlink_enqueue_item(sd_varlink *v, VarlinkJsonQueueItem *q) {
+ assert(v);
+ assert(q);
+
+ if (v->n_output_queue >= VARLINK_QUEUE_MAX)
+ return -ENOBUFS;
+
+ LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q);
+ v->output_queue_tail = q;
+ v->n_output_queue++;
+ return 0;
+}
+
static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) {
VarlinkJsonQueueItem *q;
v->n_pushed_fds = 0; /* fds now belong to the queue entry */
- LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q);
- v->output_queue_tail = q;
- v->n_output_queue++;
+ /* We already checked the precondition ourselves so this call cannot fail. */
+ assert_se(varlink_enqueue_item(v, q) >= 0);
+
return 0;
}
if (!invalid) {
r = callback(v, parameters, flags, v->userdata);
- if (r < 0 && VARLINK_STATE_WANTS_REPLY(v->state)) {
- varlink_log_errno(v, r, "Callback for %s returned error: %m", method);
+ if (VARLINK_STATE_WANTS_REPLY(v->state)) {
+ if (r < 0) {
+ varlink_log_errno(v, r, "Callback for %s returned error: %m", method);
+
+ /* We got an error back from the callback. Propagate it to the client
+ * if the method call remains unanswered. */
+ r = sd_varlink_error_errno(v, r);
+ } else if (v->sentinel) {
+ if (v->previous) {
+ r = varlink_enqueue_item(v, v->previous);
+ if (r >= 0) {
+ TAKE_PTR(v->previous);
+ varlink_set_state(v, VARLINK_PROCESSED_METHOD);
+ }
+ } else {
+ char *sentinel = TAKE_PTR(v->sentinel);
+
+ /* Propagate the sentinel to the client if one was configured
+ * and no replies were enqueued by the callback. */
+ if (sentinel == POINTER_MAX)
+ r = sd_varlink_reply(v, NULL);
+ else
+ r = sd_varlink_error(v, sentinel, NULL);
+
+ if (sentinel != POINTER_MAX)
+ free(sentinel);
+ }
+ if (r < 0)
+ varlink_log_errno(v, r, "Failed to process sentinel for method '%s': %m", method);
+ } else {
+ assert(!v->previous);
+ r = 0;
+ }
- /* We got an error back from the callback. Propagate it to the client if the
- * method call remains unanswered. */
- r = sd_varlink_error_errno(v, r);
- /* If we didn't manage to enqueue an error response, then fail the connection completely. */
+ /* If we didn't manage to enqueue a response, then fail the connection completely. */
if (r < 0 && VARLINK_STATE_WANTS_REPLY(v->state))
goto fail;
- }
+
+ } else
+ assert(!v->previous);
}
} else if (VARLINK_STATE_WANTS_REPLY(v->state)) {
r = sd_varlink_errorbo(v, SD_VARLINK_ERROR_METHOD_NOT_FOUND, SD_JSON_BUILD_PAIR_STRING("method", method));
}
_public_ int sd_varlink_reply(sd_varlink *v, sd_json_variant *parameters) {
- _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
int r;
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
- return -ENOTCONN;
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
+
if (!IN_SET(v->state,
VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
- return -EBUSY;
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
+
+ bool more = IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE);
/* Validate parameters BEFORE sanitization */
if (v->current_method) {
const char *bad_field = NULL;
- r = varlink_idl_validate_method_reply(v->current_method, parameters, /* flags= */ 0, &bad_field);
- if (r < 0)
+ r = varlink_idl_validate_method_reply(v->current_method, parameters, more && v->sentinel ? SD_VARLINK_REPLY_CONTINUES : 0, &bad_field);
+ if (r == -EBADE)
+ varlink_log_errno(v, r, "Method reply for %s() has 'continues' flag set, but IDL structure doesn't allow that, ignoring: %m",
+ v->current_method->name);
+ else if (r < 0)
/* Please adjust test/units/end.sh when updating the log message. */
varlink_log_errno(v, r, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m",
v->current_method->name, strna(bad_field));
}
+ _cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
r = sd_json_buildo(&m, JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
+ if (more && v->sentinel) {
+ if (v->previous) {
+ r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true);
+ if (r < 0)
+ return r;
+
+ r = varlink_enqueue_item(v, v->previous);
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+ }
+
+ v->previous = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds);
+ if (!v->previous)
+ return -ENOMEM;
+
+ v->n_pushed_fds = 0; /* fds now belong to the queue entry */
+ return 1;
+ }
+
r = varlink_enqueue_json(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
+ if (v->previous) {
+ r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true);
+ if (r < 0)
+ return r;
+
+ /* If we have a previous reply still ready make sure we queue it before the error. We only
+ * ever set "previous" if we're in a streaming method so we pass more=true uncondtionally
+ * here as we know we're still going to queue an error afterwards. */
+ r = varlink_enqueue_item(v, v->previous);
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+
+ TAKE_PTR(v->previous);
+ }
+
/* Reset the list of pushed file descriptors before sending an error reply. We do this here to
* simplify code that puts together a complex reply message with fds, and half-way something
* fails. In that case the pushed fds need to be flushed out again. Under the assumption that it
assert_return(v, -EINVAL);
+ if (v->sentinel)
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(EINVAL), "Cannot use sd_varlink_notify() on method with sentinel set");
+
+ assert(!v->previous);
+
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
ASSERT_OK(sd_event_loop(e));
}
+static int method_with_error_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Set an error sentinel and return without sending a reply. The sentinel error should be sent automatically. */
+ ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError"));
+ return 0;
+}
+
+static int reply_sentinel_error(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ ASSERT_STREQ(error_id, "io.test.SentinelError");
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(sentinel_error) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.ErrorSentinel", method_with_error_sentinel));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_error));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.ErrorSentinel", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_empty_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Set an empty sentinel and return without sending a reply. An empty reply should be sent automatically. */
+ ASSERT_OK(varlink_set_sentinel(link, /* error_id= */ NULL));
+ return 0;
+}
+
+static int reply_sentinel_empty(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ ASSERT_NULL(error_id);
+ ASSERT_TRUE(sd_json_variant_is_blank_object(parameters));
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(sentinel_empty) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.EmptySentinel", method_with_empty_sentinel));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_empty));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.EmptySentinel", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_sentinel_but_reply(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Set a sentinel but also send a reply. The sentinel should not be used. */
+ ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError"));
+ return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "explicit-reply"));
+}
+
+static int reply_sentinel_explicit(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ ASSERT_NULL(error_id);
+ ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "explicit-reply");
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(sentinel_with_explicit_reply) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.SentinelButReply", method_with_sentinel_but_reply));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_explicit));
+
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.SentinelButReply", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_oneway_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* The method was called oneway, so varlink_set_sentinel() should be a no-op and the server should
+ * transition back to idle without sending any reply. */
+ ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_ONEWAY));
+ ASSERT_OK(varlink_set_sentinel(link, "io.test.SentinelError"));
+ return 0;
+}
+
+static int method_oneway_sentinel_pong(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ return sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_STRING("result", "pong"));
+}
+
+static int reply_oneway_sentinel_pong(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ /* If we get here, it means the oneway sentinel call didn't break the connection and the server
+ * properly handled a subsequent regular method call. */
+ ASSERT_NULL(error_id);
+ ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "result")), "pong");
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ return 0;
+}
+
+TEST(sentinel_oneway) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.OnewaySentinel", method_with_oneway_sentinel));
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Pong", method_oneway_sentinel_pong));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ /* Send a oneway call with a sentinel — the sentinel should be silently ignored. */
+ ASSERT_OK(sd_varlink_send(c, "io.test.OnewaySentinel", /* parameters= */ NULL));
+
+ /* Follow up with a regular call to verify the server is still functional. */
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_oneway_sentinel_pong));
+ ASSERT_OK(sd_varlink_invoke(c, "io.test.Pong", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_fd_sentinel(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ _cleanup_close_ int fd1 = -EBADF, fd2 = -EBADF;
+
+ ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_MORE));
+
+ /* Set a sentinel so sd_varlink_reply() defers sending: each reply and its pushed fds are captured in
+ * the queue, and the last one is sent as the final reply when the callback returns. */
+ ASSERT_OK(varlink_set_sentinel(link, /* error_id= */ NULL));
+
+ /* First reply: push one fd with "alpha" content */
+ ASSERT_OK(fd1 = memfd_new_and_seal_string("data", "alpha"));
+ ASSERT_OK_EQ(sd_varlink_push_fd(link, fd1), 0);
+ TAKE_FD(fd1);
+ ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_INTEGER("index", 0)));
+
+ /* Second reply: push one fd with "beta" content */
+ ASSERT_OK(fd2 = memfd_new_and_seal_string("data", "beta"));
+ ASSERT_OK_EQ(sd_varlink_push_fd(link, fd2), 0);
+ TAKE_FD(fd2);
+ ASSERT_OK(sd_varlink_replybo(link, SD_JSON_BUILD_PAIR_INTEGER("index", 1)));
+
+ return 0;
+}
+
+static int reply_sentinel_fd(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ int *state = ASSERT_PTR(sd_varlink_get_userdata(link));
+
+ if (*state == 0) {
+ /* First reply: should carry "continues" flag and fd with "alpha" */
+ ASSERT_NULL(error_id);
+ ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+ ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "index")), 0);
+
+ int fd;
+ ASSERT_OK(fd = sd_varlink_peek_fd(link, 0));
+ test_fd(fd, "alpha", STRLEN("alpha"));
+ (*state)++;
+ } else if (*state == 1) {
+ /* Second (final) reply: no "continues" flag, fd with "beta" */
+ ASSERT_NULL(error_id);
+ ASSERT_FALSE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+ ASSERT_EQ(sd_json_variant_integer(sd_json_variant_by_key(parameters, "index")), 1);
+
+ int fd;
+ ASSERT_OK(fd = sd_varlink_peek_fd(link, 0));
+ test_fd(fd, "beta", STRLEN("beta"));
+
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ } else
+ assert_not_reached();
+
+ return 0;
+}
+
+TEST(sentinel_with_fds) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_ALLOW_FD_PASSING_INPUT|SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.FDSentinel", method_with_fd_sentinel));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+ ASSERT_OK(sd_varlink_set_allow_fd_passing_input(c, true));
+ ASSERT_OK(sd_varlink_set_allow_fd_passing_output(c, true));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ int state = 0;
+ sd_varlink_set_userdata(c, &state);
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_sentinel_fd));
+
+ ASSERT_OK(sd_varlink_observe(c, "io.test.FDSentinel", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
+static int method_with_notify_then_error(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
+ /* Send a notify first, then return an error. The notify should be received before the error. */
+ ASSERT_OK(sd_varlink_notifybo(link, SD_JSON_BUILD_PAIR_STRING("status", "in-progress")));
+ return sd_varlink_error(link, "io.test.OperationFailed", /* parameters= */ NULL);
+}
+
+static int reply_notify_then_error(sd_varlink *link, sd_json_variant *parameters, const char *error_id, sd_varlink_reply_flags_t flags, void *userdata) {
+ int *state = ASSERT_PTR(sd_varlink_get_userdata(link));
+
+ if (*state == 0) {
+ /* First callback: should be the notify (no error, has "more" flag) */
+ ASSERT_NULL(error_id);
+ ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+ ASSERT_STREQ(sd_json_variant_string(sd_json_variant_by_key(parameters, "status")), "in-progress");
+ (*state)++;
+ } else if (*state == 1) {
+ /* Second callback: should be the error */
+ ASSERT_STREQ(error_id, "io.test.OperationFailed");
+ ASSERT_FALSE(FLAGS_SET(flags, SD_VARLINK_REPLY_CONTINUES));
+ ASSERT_OK(sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS));
+ } else
+ assert_not_reached();
+
+ return 0;
+}
+
+TEST(notify_then_error) {
+ _cleanup_(sd_event_unrefp) sd_event *e = NULL;
+ ASSERT_OK(sd_event_default(&e));
+
+ _cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
+ ASSERT_OK(sd_varlink_server_new(&s, 0));
+
+ ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
+
+ ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.NotifyThenError", method_with_notify_then_error));
+
+ int connfd[2];
+ ASSERT_OK_ERRNO(socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, connfd));
+ ASSERT_OK(sd_varlink_server_add_connection(s, connfd[0], /* ret= */ NULL));
+
+ _cleanup_(sd_varlink_unrefp) sd_varlink *c = NULL;
+ ASSERT_OK(sd_varlink_connect_fd(&c, connfd[1]));
+
+ ASSERT_OK(sd_varlink_attach_event(c, e, 0));
+
+ int state = 0;
+ sd_varlink_set_userdata(c, &state);
+ ASSERT_OK(sd_varlink_bind_reply(c, reply_notify_then_error));
+
+ ASSERT_OK(sd_varlink_observe(c, "io.test.NotifyThenError", /* parameters= */ NULL));
+
+ ASSERT_OK(sd_event_loop(e));
+}
+
DEFINE_TEST_MAIN(LOG_DEBUG);