1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
8 #include "alloc-util.h"
9 #include "errno-util.h"
11 #include "glyph-util.h"
14 #include "iovec-util.h"
16 #include "path-util.h"
17 #include "process-util.h"
18 #include "selinux-util.h"
19 #include "serialize.h"
21 #include "socket-util.h"
22 #include "string-table.h"
23 #include "string-util.h"
25 #include "time-util.h"
26 #include "umask-util.h"
27 #include "user-util.h"
29 #include "varlink-internal.h"
30 #include "varlink-org.varlink.service.h"
31 #include "varlink-io.systemd.h"
34 #define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
35 #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
37 #define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
38 #define VARLINK_BUFFER_MAX (16U*1024U*1024U)
39 #define VARLINK_READ_SIZE (64U*1024U)
41 typedef enum VarlinkState
{
42 /* Client side states */
44 VARLINK_AWAITING_REPLY
,
45 VARLINK_AWAITING_REPLY_MORE
,
48 VARLINK_PROCESSING_REPLY
,
50 /* Server side states */
52 VARLINK_PROCESSING_METHOD
,
53 VARLINK_PROCESSING_METHOD_MORE
,
54 VARLINK_PROCESSING_METHOD_ONEWAY
,
55 VARLINK_PROCESSED_METHOD
,
56 VARLINK_PENDING_METHOD
,
57 VARLINK_PENDING_METHOD_MORE
,
59 /* Common states (only during shutdown) */
60 VARLINK_PENDING_DISCONNECT
,
61 VARLINK_PENDING_TIMEOUT
,
62 VARLINK_PROCESSING_DISCONNECT
,
63 VARLINK_PROCESSING_TIMEOUT
,
64 VARLINK_PROCESSING_FAILURE
,
68 _VARLINK_STATE_INVALID
= -EINVAL
,
71 /* Tests whether we are not yet disconnected. Note that this is true during all states where the connection
72 * is still good for something, and false only when it's dead for good. This means: when we are
73 * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as
74 * the connection is still good, and we are likely to be able to properly operate on it soon. */
75 #define VARLINK_STATE_IS_ALIVE(state) \
77 VARLINK_IDLE_CLIENT, \
78 VARLINK_AWAITING_REPLY, \
79 VARLINK_AWAITING_REPLY_MORE, \
82 VARLINK_PROCESSING_REPLY, \
83 VARLINK_IDLE_SERVER, \
84 VARLINK_PROCESSING_METHOD, \
85 VARLINK_PROCESSING_METHOD_MORE, \
86 VARLINK_PROCESSING_METHOD_ONEWAY, \
87 VARLINK_PROCESSED_METHOD, \
88 VARLINK_PENDING_METHOD, \
89 VARLINK_PENDING_METHOD_MORE)
91 typedef struct VarlinkJsonQueueItem VarlinkJsonQueueItem
;
93 /* A queued message we shall write into the socket, along with the file descriptors to send at the same
94 * time. This queue item binds them together so that message/fd boundaries are maintained throughout the
96 struct VarlinkJsonQueueItem
{
97 LIST_FIELDS(VarlinkJsonQueueItem
, queue
);
106 VarlinkServer
*server
;
109 bool connecting
; /* This boolean indicates whether the socket fd we are operating on is currently
110 * processing an asynchronous connect(). In that state we watch the socket for
111 * EPOLLOUT, but we refrain from calling read() or write() on the socket as that
112 * will trigger ENOTCONN. Note that this boolean is kept separate from the
113 * VarlinkState above on purpose: while the connect() is still not complete we
114 * already want to allow queuing of messages and similar. Thus it's nice to keep
115 * these two state concepts separate: the VarlinkState encodes what our own view of
116 * the connection is, i.e. whether we think it's a server, a client, and has
117 * something queued already, while 'connecting' tells us a detail about the
118 * transport used below, that should have no effect on how we otherwise accept and
119 * process operations from the user.
121 * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the
122 * connection is good to use, even if it might not be fully connected
123 * yet. connecting=true then informs you that actually we are still connecting, and
124 * the connection is actually not established yet and thus any requests you enqueue
125 * now will still work fine but will be queued only, not sent yet, but that
126 * shouldn't stop you from using the connection, since eventually whatever you queue
129 * Or to say this even differently: 'state' is a high-level ("application layer"
130 * high, if you so will) state, while 'conecting' is a low-level ("transport layer"
131 * low, if you so will) state, and while they are not entirely unrelated and
132 * sometimes propagate effects to each other they are only asynchronously connected
138 char *input_buffer
; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */
139 size_t input_buffer_index
;
140 size_t input_buffer_size
;
141 size_t input_buffer_unscanned
;
143 void *input_control_buffer
;
144 size_t input_control_buffer_size
;
146 char *output_buffer
; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */
147 size_t output_buffer_index
;
148 size_t output_buffer_size
;
150 int *input_fds
; /* file descriptors associated with the data in input_buffer (for fd passing) */
153 int *output_fds
; /* file descriptors associated with the data in output_buffer (for fd passing) */
156 /* Further messages to output not yet formatted into text, and thus not included in output_buffer
157 * yet. We keep them separate from output_buffer, to not violate fd message boundaries: we want that
158 * each fd that is sent is associated with its fds, and that fds cannot be accidentally associated
159 * with preceding or following messages. */
160 LIST_HEAD(VarlinkJsonQueueItem
, output_queue
);
161 VarlinkJsonQueueItem
*output_queue_tail
;
163 /* The fds to associate with the next message that is about to be enqueued. The user first pushes the
164 * fds it intends to send via varlink_push_fd() into this queue, and then once the message data is
165 * submitted we'll combine the fds and the message data into one. */
169 VarlinkReply reply_callback
;
171 JsonVariant
*current
;
172 VarlinkSymbol
*current_method
;
175 bool ucred_acquired
:1;
177 bool write_disconnected
:1;
178 bool read_disconnected
:1;
179 bool prefer_read_write
:1;
182 bool allow_fd_passing_input
:1;
183 bool allow_fd_passing_output
:1;
185 bool output_buffer_sensitive
:1; /* whether to erase the output buffer after writing it to the socket */
187 int af
; /* address family if socket; AF_UNSPEC if not socket; negative if not known */
196 sd_event_source
*io_event_source
;
197 sd_event_source
*time_event_source
;
198 sd_event_source
*quit_event_source
;
199 sd_event_source
*defer_event_source
;
204 typedef struct VarlinkServerSocket VarlinkServerSocket
;
206 struct VarlinkServerSocket
{
207 VarlinkServer
*server
;
212 sd_event_source
*event_source
;
214 LIST_FIELDS(VarlinkServerSocket
, sockets
);
217 struct VarlinkServer
{
219 VarlinkServerFlags flags
;
221 LIST_HEAD(VarlinkServerSocket
, sockets
);
223 Hashmap
*methods
; /* Fully qualified symbol name of a method → VarlinkMethod */
224 Hashmap
*interfaces
; /* Fully qualified interface name → VarlinkInterface* */
225 Hashmap
*symbols
; /* Fully qualified symbol name of method/error → VarlinkSymbol* */
226 VarlinkConnect connect_callback
;
227 VarlinkDisconnect disconnect_callback
;
230 int64_t event_priority
;
232 unsigned n_connections
;
233 Hashmap
*by_uid
; /* UID_TO_PTR(uid) → UINT_TO_PTR(n_connections) */
238 unsigned connections_max
;
239 unsigned connections_per_uid_max
;
244 typedef struct VarlinkCollectContext
{
245 JsonVariant
*parameters
;
246 const char *error_id
;
247 VarlinkReplyFlags flags
;
248 } VarlinkCollectContext
;
250 static const char* const varlink_state_table
[_VARLINK_STATE_MAX
] = {
251 [VARLINK_IDLE_CLIENT
] = "idle-client",
252 [VARLINK_AWAITING_REPLY
] = "awaiting-reply",
253 [VARLINK_AWAITING_REPLY_MORE
] = "awaiting-reply-more",
254 [VARLINK_CALLING
] = "calling",
255 [VARLINK_CALLED
] = "called",
256 [VARLINK_PROCESSING_REPLY
] = "processing-reply",
257 [VARLINK_IDLE_SERVER
] = "idle-server",
258 [VARLINK_PROCESSING_METHOD
] = "processing-method",
259 [VARLINK_PROCESSING_METHOD_MORE
] = "processing-method-more",
260 [VARLINK_PROCESSING_METHOD_ONEWAY
] = "processing-method-oneway",
261 [VARLINK_PROCESSED_METHOD
] = "processed-method",
262 [VARLINK_PENDING_METHOD
] = "pending-method",
263 [VARLINK_PENDING_METHOD_MORE
] = "pending-method-more",
264 [VARLINK_PENDING_DISCONNECT
] = "pending-disconnect",
265 [VARLINK_PENDING_TIMEOUT
] = "pending-timeout",
266 [VARLINK_PROCESSING_DISCONNECT
] = "processing-disconnect",
267 [VARLINK_PROCESSING_TIMEOUT
] = "processing-timeout",
268 [VARLINK_PROCESSING_FAILURE
] = "processing-failure",
269 [VARLINK_DISCONNECTED
] = "disconnected",
272 DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state
, VarlinkState
);
274 #define varlink_log_errno(v, error, fmt, ...) \
275 log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
277 #define varlink_log(v, fmt, ...) \
278 log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__)
280 #define varlink_server_log_errno(s, error, fmt, ...) \
281 log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
283 #define varlink_server_log(s, fmt, ...) \
284 log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
286 static int varlink_format_queue(Varlink
*v
);
287 static void varlink_server_test_exit_on_idle(VarlinkServer
*s
);
289 static const char *varlink_description(Varlink
*v
) {
290 return (v
? v
->description
: NULL
) ?: "varlink";
293 static const char *varlink_server_description(VarlinkServer
*s
) {
294 return (s
? s
->description
: NULL
) ?: "varlink";
297 static VarlinkJsonQueueItem
*varlink_json_queue_item_free(VarlinkJsonQueueItem
*q
) {
301 json_variant_unref(q
->data
);
302 close_many(q
->fds
, q
->n_fds
);
307 static VarlinkJsonQueueItem
*varlink_json_queue_item_new(JsonVariant
*m
, const int fds
[], size_t n_fds
) {
308 VarlinkJsonQueueItem
*q
;
311 assert(fds
|| n_fds
== 0);
313 q
= malloc(offsetof(VarlinkJsonQueueItem
, fds
) + sizeof(int) * n_fds
);
317 *q
= (VarlinkJsonQueueItem
) {
318 .data
= json_variant_ref(m
),
322 memcpy_safe(q
->fds
, fds
, n_fds
* sizeof(int));
327 static void varlink_set_state(Varlink
*v
, VarlinkState state
) {
329 assert(state
>= 0 && state
< _VARLINK_STATE_MAX
);
332 varlink_log(v
, "Setting state %s",
333 varlink_state_to_string(state
));
335 varlink_log(v
, "Changing state %s %s %s",
336 varlink_state_to_string(v
->state
),
337 special_glyph(SPECIAL_GLYPH_ARROW_RIGHT
),
338 varlink_state_to_string(state
));
343 static int varlink_new(Varlink
**ret
) {
356 .state
= _VARLINK_STATE_INVALID
,
358 .ucred
= UCRED_INVALID
,
360 .timestamp
= USEC_INFINITY
,
361 .timeout
= VARLINK_DEFAULT_TIMEOUT_USEC
,
370 int varlink_connect_address(Varlink
**ret
, const char *address
) {
371 _cleanup_(varlink_unrefp
) Varlink
*v
= NULL
;
372 union sockaddr_union sockaddr
;
375 assert_return(ret
, -EINVAL
);
376 assert_return(address
, -EINVAL
);
380 return log_debug_errno(r
, "Failed to create varlink object: %m");
382 v
->fd
= socket(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0);
384 return log_debug_errno(errno
, "Failed to create AF_UNIX socket: %m");
386 v
->fd
= fd_move_above_stdio(v
->fd
);
389 r
= sockaddr_un_set_path(&sockaddr
.un
, address
);
391 if (r
!= -ENAMETOOLONG
)
392 return log_debug_errno(r
, "Failed to set socket address '%s': %m", address
);
394 /* This is a file system path, and too long to fit into sockaddr_un. Let's connect via O_PATH
397 r
= connect_unix_path(v
->fd
, AT_FDCWD
, address
);
399 r
= RET_NERRNO(connect(v
->fd
, &sockaddr
.sa
, r
));
402 if (!IN_SET(r
, -EAGAIN
, -EINPROGRESS
))
403 return log_debug_errno(r
, "Failed to connect to %s: %m", address
);
405 v
->connecting
= true; /* We are asynchronously connecting, i.e. the connect() is being
406 * processed in the background. As long as that's the case the socket
407 * is in a special state: it's there, we can poll it for EPOLLOUT, but
408 * if we attempt to write() to it before we see EPOLLOUT we'll get
409 * ENOTCONN (and not EAGAIN, like we would for a normal connected
410 * socket that isn't writable at the moment). Since ENOTCONN on write()
411 * hence can mean two different things (i.e. connection not complete
412 * yet vs. already disconnected again), we store as a boolean whether
413 * we are still in connect(). */
416 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
422 int varlink_connect_exec(Varlink
**ret
, const char *_command
, char **_argv
) {
423 _cleanup_close_pair_
int pair
[2] = EBADF_PAIR
;
424 _cleanup_(sigkill_waitp
) pid_t pid
= 0;
425 _cleanup_free_
char *command
= NULL
;
426 _cleanup_strv_free_
char **argv
= NULL
;
429 assert_return(ret
, -EINVAL
);
430 assert_return(_command
, -EINVAL
);
432 /* Copy the strings, in case they point into our own argv[], which we'll invalidate shortly because
433 * we rename the child process */
434 command
= strdup(_command
);
438 if (strv_isempty(_argv
))
439 argv
= strv_new(command
);
441 argv
= strv_copy(_argv
);
445 log_debug("Forking off Varlink child process '%s'.", command
);
447 if (socketpair(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0, pair
) < 0)
448 return log_debug_errno(errno
, "Failed to allocate AF_UNIX socket pair: %m");
452 /* stdio_fds= */ NULL
,
453 /* except_fds= */ (int[]) { pair
[1] },
454 /* n_except_fds= */ 1,
455 FORK_RESET_SIGNALS
|FORK_CLOSE_ALL_FDS
|FORK_DEATHSIG_SIGTERM
|FORK_REOPEN_LOG
|FORK_LOG
|FORK_RLIMIT_NOFILE_SAFE
,
458 return log_debug_errno(r
, "Failed to spawn process: %m");
460 char spid
[DECIMAL_STR_MAX(pid_t
)+1];
461 const char *setenv_list
[] = {
464 "LISTEN_FDNAMES", "varlink",
471 r
= move_fd(pair
[1], 3, /* cloexec= */ false);
473 log_debug_errno(r
, "Failed to move file descriptor to 3: %m");
477 xsprintf(spid
, PID_FMT
, pid
);
479 STRV_FOREACH_PAIR(a
, b
, setenv_list
) {
480 if (setenv(*a
, *b
, /* override= */ true) < 0) {
481 log_debug_errno(errno
, "Failed to set environment variable '%s': %m", *a
);
486 execvp(command
, argv
);
487 log_debug_errno(r
, "Failed to invoke process '%s': %m", command
);
491 pair
[1] = safe_close(pair
[1]);
496 return log_debug_errno(r
, "Failed to create varlink object: %m");
498 v
->fd
= TAKE_FD(pair
[0]);
500 v
->exec_pid
= TAKE_PID(pid
);
501 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
507 int varlink_connect_url(Varlink
**ret
, const char *url
) {
508 _cleanup_free_
char *c
= NULL
;
513 assert_return(ret
, -EINVAL
);
514 assert_return(url
, -EINVAL
);
516 // FIXME: Add support for vsock:, ssh-exec:, ssh-unix: URL schemes here. (The latter with OpenSSH
517 // 9.4's -W switch for referencing remote AF_UNIX sockets.)
519 /* The Varlink URL scheme is a bit underdefined. We support only the unix: transport for now, plus an
520 * exec: transport we made up ourselves. Strictly speaking this shouldn't even be called URL, since
521 * it has nothing to do with Internet URLs by RFC. */
523 p
= startswith(url
, "unix:");
527 p
= startswith(url
, "exec:");
529 return log_debug_errno(SYNTHETIC_ERRNO(EPROTONOSUPPORT
), "URL scheme not supported.");
534 /* The varlink.org reference C library supports more than just file system paths. We might want to
535 * support that one day too. For now simply refuse that. */
536 if (p
[strcspn(p
, ";?#")] != '\0')
537 return log_debug_errno(SYNTHETIC_ERRNO(EPROTONOSUPPORT
), "URL parameterization with ';', '?', '#' not supported.");
539 if (exec
|| p
[0] != '@') { /* no validity checks for abstract namespace */
541 if (!path_is_absolute(p
))
542 return log_debug_errno(SYNTHETIC_ERRNO(EINVAL
), "Specified path not absolute, refusing.");
544 r
= path_simplify_alloc(p
, &c
);
548 if (!path_is_normalized(c
))
549 return log_debug_errno(SYNTHETIC_ERRNO(EINVAL
), "Specified path is not normalized, refusing.");
553 return varlink_connect_exec(ret
, c
, NULL
);
555 return varlink_connect_address(ret
, c
?: p
);
558 int varlink_connect_fd(Varlink
**ret
, int fd
) {
562 assert_return(ret
, -EINVAL
);
563 assert_return(fd
>= 0, -EBADF
);
565 r
= fd_nonblock(fd
, true);
567 return log_debug_errno(r
, "Failed to make fd %d nonblocking: %m", fd
);
571 return log_debug_errno(r
, "Failed to create varlink object: %m");
575 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
577 /* Note that if this function is called we assume the passed socket (if it is one) is already
578 * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that
579 * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket
580 * until the connection is fully set up. Behaviour here is hence a bit different from
581 * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
582 * avoid doing write() on it before we saw EPOLLOUT for the first time. */
588 static void varlink_detach_event_sources(Varlink
*v
) {
591 v
->io_event_source
= sd_event_source_disable_unref(v
->io_event_source
);
592 v
->time_event_source
= sd_event_source_disable_unref(v
->time_event_source
);
593 v
->quit_event_source
= sd_event_source_disable_unref(v
->quit_event_source
);
594 v
->defer_event_source
= sd_event_source_disable_unref(v
->defer_event_source
);
597 static void varlink_clear_current(Varlink
*v
) {
600 /* Clears the currently processed incoming message */
601 v
->current
= json_variant_unref(v
->current
);
602 v
->current_method
= NULL
;
604 close_many(v
->input_fds
, v
->n_input_fds
);
605 v
->input_fds
= mfree(v
->input_fds
);
609 static void varlink_clear(Varlink
*v
) {
612 varlink_detach_event_sources(v
);
614 v
->fd
= safe_close(v
->fd
);
616 varlink_clear_current(v
);
618 v
->input_buffer
= mfree(v
->input_buffer
);
619 v
->output_buffer
= v
->output_buffer_sensitive
? erase_and_free(v
->output_buffer
) : mfree(v
->output_buffer
);
621 v
->input_control_buffer
= mfree(v
->input_control_buffer
);
622 v
->input_control_buffer_size
= 0;
624 close_many(v
->output_fds
, v
->n_output_fds
);
625 v
->output_fds
= mfree(v
->output_fds
);
628 close_many(v
->pushed_fds
, v
->n_pushed_fds
);
629 v
->pushed_fds
= mfree(v
->pushed_fds
);
632 LIST_CLEAR(queue
, v
->output_queue
, varlink_json_queue_item_free
);
633 v
->output_queue_tail
= NULL
;
635 v
->event
= sd_event_unref(v
->event
);
637 if (v
->exec_pid
> 0) {
638 sigterm_wait(v
->exec_pid
);
643 static Varlink
* varlink_destroy(Varlink
*v
) {
647 /* If this is called the server object must already been unreffed here. Why that? because when we
648 * linked up the varlink connection with the server object we took one ref in each direction */
653 free(v
->description
);
657 DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink
, varlink
, varlink_destroy
);
659 static int varlink_test_disconnect(Varlink
*v
) {
662 /* Tests whether we the connection has been terminated. We are careful to not stop processing it
663 * prematurely, since we want to handle half-open connections as well as possible and want to flush
664 * out and read data before we close down if we can. */
666 /* Already disconnected? */
667 if (!VARLINK_STATE_IS_ALIVE(v
->state
))
670 /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
674 /* Still something to write and we can write? Stay around */
675 if (v
->output_buffer_size
> 0 && !v
->write_disconnected
)
678 /* Both sides gone already? Then there's no need to stick around */
679 if (v
->read_disconnected
&& v
->write_disconnected
)
682 /* If we are waiting for incoming data but the read side is shut down, disconnect. */
683 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
) && v
->read_disconnected
)
686 /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
687 * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
688 * being down if we never wrote anything. */
689 if (v
->state
== VARLINK_IDLE_CLIENT
&& (v
->write_disconnected
|| v
->got_pollhup
))
692 /* We are on the server side and still want to send out more replies, but we saw POLLHUP already, and
693 * either got no buffered bytes to write anymore or already saw a write error. In that case we should
694 * shut down the varlink link. */
695 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
) && (v
->write_disconnected
|| v
->output_buffer_size
== 0) && v
->got_pollhup
)
701 varlink_set_state(v
, VARLINK_PENDING_DISCONNECT
);
705 static int varlink_write(Varlink
*v
) {
711 if (!VARLINK_STATE_IS_ALIVE(v
->state
))
713 if (v
->connecting
) /* Writing while we are still wait for a non-blocking connect() to complete will
714 * result in ENOTCONN, hence exit early here */
716 if (v
->write_disconnected
)
719 /* If needed let's convert some output queue json variants into text form */
720 r
= varlink_format_queue(v
);
724 if (v
->output_buffer_size
== 0)
729 if (v
->n_output_fds
> 0) { /* If we shall send fds along, we must use sendmsg() */
731 .iov_base
= v
->output_buffer
+ v
->output_buffer_index
,
732 .iov_len
= v
->output_buffer_size
,
737 .msg_controllen
= CMSG_SPACE(sizeof(int) * v
->n_output_fds
),
740 mh
.msg_control
= alloca0(mh
.msg_controllen
);
742 struct cmsghdr
*control
= CMSG_FIRSTHDR(&mh
);
743 control
->cmsg_len
= CMSG_LEN(sizeof(int) * v
->n_output_fds
);
744 control
->cmsg_level
= SOL_SOCKET
;
745 control
->cmsg_type
= SCM_RIGHTS
;
746 memcpy(CMSG_DATA(control
), v
->output_fds
, sizeof(int) * v
->n_output_fds
);
748 n
= sendmsg(v
->fd
, &mh
, MSG_DONTWAIT
|MSG_NOSIGNAL
);
750 /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
751 * with non-socket IO, hence fall back automatically.
753 * Use a local variable to help gcc figure out that we set 'n' in all cases. */
754 bool prefer_write
= v
->prefer_read_write
;
756 n
= send(v
->fd
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
, MSG_DONTWAIT
|MSG_NOSIGNAL
);
757 if (n
< 0 && errno
== ENOTSOCK
)
758 prefer_write
= v
->prefer_read_write
= true;
761 n
= write(v
->fd
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
);
767 if (ERRNO_IS_DISCONNECT(errno
)) {
768 /* If we get informed about a disconnect on write, then let's remember that, but not
769 * act on it just yet. Let's wait for read() to report the issue first. */
770 v
->write_disconnected
= true;
777 if (v
->output_buffer_sensitive
)
778 explicit_bzero_safe(v
->output_buffer
+ v
->output_buffer_index
, n
);
780 v
->output_buffer_size
-= n
;
782 if (v
->output_buffer_size
== 0) {
783 v
->output_buffer_index
= 0;
784 v
->output_buffer_sensitive
= false; /* We can reset the sensitive flag once the buffer is empty */
786 v
->output_buffer_index
+= n
;
788 close_many(v
->output_fds
, v
->n_output_fds
);
791 v
->timestamp
= now(CLOCK_MONOTONIC
);
795 #define VARLINK_FDS_MAX (16U*1024U)
797 static int varlink_read(Varlink
*v
) {
806 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
))
808 if (v
->connecting
) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
812 if (v
->input_buffer_unscanned
> 0)
814 if (v
->read_disconnected
)
817 if (v
->input_buffer_size
>= VARLINK_BUFFER_MAX
)
822 if (MALLOC_SIZEOF_SAFE(v
->input_buffer
) <= v
->input_buffer_index
+ v
->input_buffer_size
) {
825 add
= MIN(VARLINK_BUFFER_MAX
- v
->input_buffer_size
, VARLINK_READ_SIZE
);
827 if (v
->input_buffer_index
== 0) {
829 if (!GREEDY_REALLOC(v
->input_buffer
, v
->input_buffer_size
+ add
))
835 b
= new(char, v
->input_buffer_size
+ add
);
839 memcpy(b
, v
->input_buffer
+ v
->input_buffer_index
, v
->input_buffer_size
);
841 free_and_replace(v
->input_buffer
, b
);
842 v
->input_buffer_index
= 0;
846 p
= v
->input_buffer
+ v
->input_buffer_index
+ v
->input_buffer_size
;
847 rs
= MALLOC_SIZEOF_SAFE(v
->input_buffer
) - (v
->input_buffer_index
+ v
->input_buffer_size
);
849 if (v
->allow_fd_passing_input
) {
850 iov
= IOVEC_MAKE(p
, rs
);
852 /* Allocate the fd buffer on the heap, since we need a lot of space potentially */
853 if (!v
->input_control_buffer
) {
854 v
->input_control_buffer_size
= CMSG_SPACE(sizeof(int) * VARLINK_FDS_MAX
);
855 v
->input_control_buffer
= malloc(v
->input_control_buffer_size
);
856 if (!v
->input_control_buffer
)
860 mh
= (struct msghdr
) {
863 .msg_control
= v
->input_control_buffer
,
864 .msg_controllen
= v
->input_control_buffer_size
,
867 n
= recvmsg_safe(v
->fd
, &mh
, MSG_DONTWAIT
|MSG_CMSG_CLOEXEC
);
869 bool prefer_read
= v
->prefer_read_write
;
871 n
= recv(v
->fd
, p
, rs
, MSG_DONTWAIT
);
872 if (n
< 0 && errno
== ENOTSOCK
)
873 prefer_read
= v
->prefer_read_write
= true;
876 n
= read(v
->fd
, p
, rs
);
882 if (ERRNO_IS_DISCONNECT(errno
)) {
883 v
->read_disconnected
= true;
889 if (n
== 0) { /* EOF */
891 if (v
->allow_fd_passing_input
)
894 v
->read_disconnected
= true;
898 if (v
->allow_fd_passing_input
) {
899 struct cmsghdr
* cmsg
;
901 cmsg
= cmsg_find(&mh
, SOL_SOCKET
, SCM_RIGHTS
, (socklen_t
) -1);
905 /* We only allow file descriptors to be passed along with the first byte of a
906 * message. If they are passed with any other byte this is a protocol violation. */
907 if (v
->input_buffer_size
!= 0) {
912 add
= (cmsg
->cmsg_len
- CMSG_LEN(0)) / sizeof(int);
913 if (add
> INT_MAX
- v
->n_input_fds
) {
918 if (!GREEDY_REALLOC(v
->input_fds
, v
->n_input_fds
+ add
)) {
923 memcpy_safe(v
->input_fds
+ v
->n_input_fds
, CMSG_TYPED_DATA(cmsg
, int), add
* sizeof(int));
924 v
->n_input_fds
+= add
;
928 v
->input_buffer_size
+= n
;
929 v
->input_buffer_unscanned
+= n
;
934 static int varlink_parse_message(Varlink
*v
) {
935 const char *e
, *begin
;
943 if (v
->input_buffer_unscanned
<= 0)
946 assert(v
->input_buffer_unscanned
<= v
->input_buffer_size
);
947 assert(v
->input_buffer_index
+ v
->input_buffer_size
<= MALLOC_SIZEOF_SAFE(v
->input_buffer
));
949 begin
= v
->input_buffer
+ v
->input_buffer_index
;
951 e
= memchr(begin
+ v
->input_buffer_size
- v
->input_buffer_unscanned
, 0, v
->input_buffer_unscanned
);
953 v
->input_buffer_unscanned
= 0;
959 varlink_log(v
, "New incoming message: %s", begin
); /* FIXME: should we output the whole message here before validation?
960 * This may produce a non-printable journal entry if the message
961 * is invalid. We may also expose privileged information. */
963 r
= json_parse(begin
, 0, &v
->current
, NULL
, NULL
);
965 /* If we encounter a parse failure flush all data. We cannot possibly recover from this,
966 * hence drop all buffered data now. */
967 v
->input_buffer_index
= v
->input_buffer_size
= v
->input_buffer_unscanned
= 0;
968 return varlink_log_errno(v
, r
, "Failed to parse JSON: %m");
971 v
->input_buffer_size
-= sz
;
973 if (v
->input_buffer_size
== 0)
974 v
->input_buffer_index
= 0;
976 v
->input_buffer_index
+= sz
;
978 v
->input_buffer_unscanned
= v
->input_buffer_size
;
982 static int varlink_test_timeout(Varlink
*v
) {
985 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
))
987 if (v
->timeout
== USEC_INFINITY
)
990 if (now(CLOCK_MONOTONIC
) < usec_add(v
->timestamp
, v
->timeout
))
993 varlink_set_state(v
, VARLINK_PENDING_TIMEOUT
);
998 static int varlink_dispatch_local_error(Varlink
*v
, const char *error
) {
1004 if (!v
->reply_callback
)
1007 r
= v
->reply_callback(v
, NULL
, error
, VARLINK_REPLY_ERROR
|VARLINK_REPLY_LOCAL
, v
->userdata
);
1009 varlink_log_errno(v
, r
, "Reply callback returned error, ignoring: %m");
1014 static int varlink_dispatch_timeout(Varlink
*v
) {
1017 if (v
->state
!= VARLINK_PENDING_TIMEOUT
)
1020 varlink_set_state(v
, VARLINK_PROCESSING_TIMEOUT
);
1021 varlink_dispatch_local_error(v
, VARLINK_ERROR_TIMEOUT
);
1027 static int varlink_dispatch_disconnect(Varlink
*v
) {
1030 if (v
->state
!= VARLINK_PENDING_DISCONNECT
)
1033 varlink_set_state(v
, VARLINK_PROCESSING_DISCONNECT
);
1034 varlink_dispatch_local_error(v
, VARLINK_ERROR_DISCONNECTED
);
1040 static int varlink_sanitize_parameters(JsonVariant
**v
) {
1045 /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */
1047 return json_variant_new_object(v
, NULL
, 0);
1048 if (json_variant_is_null(*v
)) {
1051 r
= json_variant_new_object(&empty
, NULL
, 0);
1055 json_variant_unref(*v
);
1059 if (!json_variant_is_object(*v
))
1065 static int varlink_dispatch_reply(Varlink
*v
) {
1066 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1067 VarlinkReplyFlags flags
= 0;
1068 const char *error
= NULL
;
1075 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
))
1080 assert(v
->n_pending
> 0);
1082 if (!json_variant_is_object(v
->current
))
1085 JSON_VARIANT_OBJECT_FOREACH(k
, e
, v
->current
) {
1087 if (streq(k
, "error")) {
1090 if (!json_variant_is_string(e
))
1093 error
= json_variant_string(e
);
1094 flags
|= VARLINK_REPLY_ERROR
;
1096 } else if (streq(k
, "parameters")) {
1099 if (!json_variant_is_object(e
) && !json_variant_is_null(e
))
1102 parameters
= json_variant_ref(e
);
1104 } else if (streq(k
, "continues")) {
1105 if (FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
1108 if (!json_variant_is_boolean(e
))
1111 if (json_variant_boolean(e
))
1112 flags
|= VARLINK_REPLY_CONTINUES
;
1117 /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
1118 if (v
->state
!= VARLINK_AWAITING_REPLY_MORE
&& FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
1121 /* An error is final */
1122 if (error
&& FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
1125 r
= varlink_sanitize_parameters(¶meters
);
1129 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
)) {
1130 varlink_set_state(v
, VARLINK_PROCESSING_REPLY
);
1132 if (v
->reply_callback
) {
1133 r
= v
->reply_callback(v
, parameters
, error
, flags
, v
->userdata
);
1135 varlink_log_errno(v
, r
, "Reply callback returned error, ignoring: %m");
1138 varlink_clear_current(v
);
1140 if (v
->state
== VARLINK_PROCESSING_REPLY
) {
1142 assert(v
->n_pending
> 0);
1144 if (!FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
1147 varlink_set_state(v
,
1148 FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
) ? VARLINK_AWAITING_REPLY_MORE
:
1149 v
->n_pending
== 0 ? VARLINK_IDLE_CLIENT
: VARLINK_AWAITING_REPLY
);
1152 assert(v
->state
== VARLINK_CALLING
);
1153 varlink_set_state(v
, VARLINK_CALLED
);
1159 varlink_set_state(v
, VARLINK_PROCESSING_FAILURE
);
1160 varlink_dispatch_local_error(v
, VARLINK_ERROR_PROTOCOL
);
1166 static int generic_method_get_info(
1168 JsonVariant
*parameters
,
1169 VarlinkMethodFlags flags
,
1172 _cleanup_strv_free_
char **interfaces
= NULL
;
1173 _cleanup_free_
char *product
= NULL
;
1178 if (json_variant_elements(parameters
) != 0)
1179 return varlink_error_invalid_parameter(link
, parameters
);
1181 product
= strjoin("systemd (", program_invocation_short_name
, ")");
1185 VarlinkInterface
*interface
;
1186 HASHMAP_FOREACH(interface
, ASSERT_PTR(link
->server
)->interfaces
) {
1187 r
= strv_extend(&interfaces
, interface
->name
);
1192 strv_sort(interfaces
);
1194 return varlink_replyb(link
, JSON_BUILD_OBJECT(
1195 JSON_BUILD_PAIR_STRING("vendor", "The systemd Project"),
1196 JSON_BUILD_PAIR_STRING("product", product
),
1197 JSON_BUILD_PAIR_STRING("version", STRINGIFY(PROJECT_VERSION
) " (" GIT_VERSION
")"),
1198 JSON_BUILD_PAIR_STRING("url", "https://systemd.io/"),
1199 JSON_BUILD_PAIR_STRV("interfaces", interfaces
)));
1202 static int generic_method_get_interface_description(
1204 JsonVariant
*parameters
,
1205 VarlinkMethodFlags flags
,
1208 static const struct JsonDispatch dispatch_table
[] = {
1209 { "interface", JSON_VARIANT_STRING
, json_dispatch_const_string
, 0, JSON_MANDATORY
},
1212 _cleanup_free_
char *text
= NULL
;
1213 const VarlinkInterface
*interface
;
1214 const char *name
= NULL
;
1219 r
= json_dispatch(parameters
, dispatch_table
, 0, &name
);
1223 interface
= hashmap_get(ASSERT_PTR(link
->server
)->interfaces
, name
);
1225 return varlink_errorb(link
, VARLINK_ERROR_INTERFACE_NOT_FOUND
,
1227 JSON_BUILD_PAIR_STRING("interface", name
)));
1229 r
= varlink_idl_format(interface
, &text
);
1233 return varlink_replyb(link
,
1235 JSON_BUILD_PAIR_STRING("description", text
)));
1238 static int varlink_dispatch_method(Varlink
*v
) {
1239 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1240 VarlinkMethodFlags flags
= 0;
1241 const char *method
= NULL
;
1243 VarlinkMethod callback
;
1249 if (v
->state
!= VARLINK_IDLE_SERVER
)
1254 if (!json_variant_is_object(v
->current
))
1257 JSON_VARIANT_OBJECT_FOREACH(k
, e
, v
->current
) {
1259 if (streq(k
, "method")) {
1262 if (!json_variant_is_string(e
))
1265 method
= json_variant_string(e
);
1267 } else if (streq(k
, "parameters")) {
1270 if (!json_variant_is_object(e
) && !json_variant_is_null(e
))
1273 parameters
= json_variant_ref(e
);
1275 } else if (streq(k
, "oneway")) {
1277 if ((flags
& (VARLINK_METHOD_ONEWAY
|VARLINK_METHOD_MORE
)) != 0)
1280 if (!json_variant_is_boolean(e
))
1283 if (json_variant_boolean(e
))
1284 flags
|= VARLINK_METHOD_ONEWAY
;
1286 } else if (streq(k
, "more")) {
1288 if ((flags
& (VARLINK_METHOD_ONEWAY
|VARLINK_METHOD_MORE
)) != 0)
1291 if (!json_variant_is_boolean(e
))
1294 if (json_variant_boolean(e
))
1295 flags
|= VARLINK_METHOD_MORE
;
1304 r
= varlink_sanitize_parameters(¶meters
);
1308 varlink_set_state(v
, (flags
& VARLINK_METHOD_MORE
) ? VARLINK_PROCESSING_METHOD_MORE
:
1309 (flags
& VARLINK_METHOD_ONEWAY
) ? VARLINK_PROCESSING_METHOD_ONEWAY
:
1310 VARLINK_PROCESSING_METHOD
);
1314 /* First consult user supplied method implementations */
1315 callback
= hashmap_get(v
->server
->methods
, method
);
1317 if (streq(method
, "org.varlink.service.GetInfo"))
1318 callback
= generic_method_get_info
;
1319 else if (streq(method
, "org.varlink.service.GetInterfaceDescription"))
1320 callback
= generic_method_get_interface_description
;
1324 bool invalid
= false;
1326 v
->current_method
= hashmap_get(v
->server
->symbols
, method
);
1327 if (!v
->current_method
)
1328 varlink_log(v
, "No interface description defined for method '%s', not validating.", method
);
1330 const char *bad_field
;
1332 r
= varlink_idl_validate_method_call(v
->current_method
, parameters
, &bad_field
);
1334 varlink_log_errno(v
, r
, "Parameters for method %s() didn't pass validation on field '%s': %m", method
, strna(bad_field
));
1336 if (IN_SET(v
->state
, VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
)) {
1337 r
= varlink_error_invalid_parameter_name(v
, bad_field
);
1346 r
= callback(v
, parameters
, flags
, v
->userdata
);
1348 varlink_log_errno(v
, r
, "Callback for %s returned error: %m", method
);
1350 /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */
1351 if (IN_SET(v
->state
, VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
)) {
1352 r
= varlink_error_errno(v
, r
);
1358 } else if (IN_SET(v
->state
, VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
)) {
1359 r
= varlink_errorb(v
, VARLINK_ERROR_METHOD_NOT_FOUND
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
))));
1366 case VARLINK_PROCESSED_METHOD
: /* Method call is fully processed */
1367 case VARLINK_PROCESSING_METHOD_ONEWAY
: /* ditto */
1368 varlink_clear_current(v
);
1369 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
1372 case VARLINK_PROCESSING_METHOD
: /* Method call wasn't replied to, will be replied to later */
1373 varlink_set_state(v
, VARLINK_PENDING_METHOD
);
1376 case VARLINK_PROCESSING_METHOD_MORE
: /* No reply for a "more" message was sent, more to come */
1377 varlink_set_state(v
, VARLINK_PENDING_METHOD_MORE
);
1381 assert_not_reached();
1390 varlink_set_state(v
, VARLINK_PROCESSING_FAILURE
);
1391 varlink_dispatch_local_error(v
, VARLINK_ERROR_PROTOCOL
);
1397 int varlink_process(Varlink
*v
) {
1400 assert_return(v
, -EINVAL
);
1402 if (v
->state
== VARLINK_DISCONNECTED
)
1403 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1407 r
= varlink_write(v
);
1409 varlink_log_errno(v
, r
, "Write failed: %m");
1413 r
= varlink_dispatch_reply(v
);
1415 varlink_log_errno(v
, r
, "Reply dispatch failed: %m");
1419 r
= varlink_dispatch_method(v
);
1421 varlink_log_errno(v
, r
, "Method dispatch failed: %m");
1425 r
= varlink_parse_message(v
);
1427 varlink_log_errno(v
, r
, "Message parsing failed: %m");
1431 r
= varlink_read(v
);
1433 varlink_log_errno(v
, r
, "Read failed: %m");
1437 r
= varlink_test_disconnect(v
);
1442 r
= varlink_dispatch_disconnect(v
);
1447 r
= varlink_test_timeout(v
);
1452 r
= varlink_dispatch_timeout(v
);
1458 if (r
>= 0 && v
->defer_event_source
) {
1461 /* If we did some processing, make sure we are called again soon */
1462 q
= sd_event_source_set_enabled(v
->defer_event_source
, r
> 0 ? SD_EVENT_ON
: SD_EVENT_OFF
);
1464 r
= varlink_log_errno(v
, q
, "Failed to enable deferred event source: %m");
1468 if (VARLINK_STATE_IS_ALIVE(v
->state
))
1469 /* Initiate disconnection */
1470 varlink_set_state(v
, VARLINK_PENDING_DISCONNECT
);
1472 /* We failed while disconnecting, in that case close right away */
1480 static void handle_revents(Varlink
*v
, int revents
) {
1483 if (v
->connecting
) {
1484 /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
1485 * to complete on, we know we are ready. We don't read the connection error here though,
1486 * we'll get the error on the next read() or write(). */
1487 if ((revents
& (POLLOUT
|POLLHUP
)) == 0)
1490 varlink_log(v
, "Asynchronous connection completed.");
1491 v
->connecting
= false;
1493 /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
1494 * what we can. However, we do care about POLLHUP to detect connection termination even if we
1495 * momentarily don't want to read nor write anything. */
1497 if (!FLAGS_SET(revents
, POLLHUP
))
1500 varlink_log(v
, "Got POLLHUP from socket.");
1501 v
->got_pollhup
= true;
1505 int varlink_wait(Varlink
*v
, usec_t timeout
) {
1509 assert_return(v
, -EINVAL
);
1511 if (v
->state
== VARLINK_DISCONNECTED
)
1512 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1514 r
= varlink_get_timeout(v
, &t
);
1517 if (t
!= USEC_INFINITY
) {
1520 n
= now(CLOCK_MONOTONIC
);
1524 t
= usec_sub_unsigned(t
, n
);
1527 if (timeout
!= USEC_INFINITY
&&
1528 (t
== USEC_INFINITY
|| timeout
< t
))
1531 fd
= varlink_get_fd(v
);
1535 events
= varlink_get_events(v
);
1539 r
= fd_wait_for_event(fd
, events
, t
);
1540 if (ERRNO_IS_NEG_TRANSIENT(r
)) /* Treat EINTR as not a timeout, but also nothing happened, and
1541 * the caller gets a chance to call back into us */
1546 handle_revents(v
, r
);
1550 int varlink_is_idle(Varlink
*v
) {
1551 assert_return(v
, -EINVAL
);
1553 /* Returns true if there's nothing pending on the connection anymore, i.e. we processed all incoming
1554 * or outgoing messages fully, or finished disconnection */
1556 return IN_SET(v
->state
, VARLINK_DISCONNECTED
, VARLINK_IDLE_CLIENT
, VARLINK_IDLE_SERVER
);
1559 int varlink_get_fd(Varlink
*v
) {
1561 assert_return(v
, -EINVAL
);
1563 if (v
->state
== VARLINK_DISCONNECTED
)
1564 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1566 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBADF
), "No valid fd.");
1571 int varlink_get_events(Varlink
*v
) {
1574 assert_return(v
, -EINVAL
);
1576 if (v
->state
== VARLINK_DISCONNECTED
)
1577 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1579 if (v
->connecting
) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
1580 * tells us that the connection is now complete. Before that we should neither
1581 * write() or read() from the fd. */
1584 if (!v
->read_disconnected
&&
1585 IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
) &&
1587 v
->input_buffer_unscanned
<= 0)
1590 if (!v
->write_disconnected
&&
1591 v
->output_buffer_size
> 0)
1597 int varlink_get_timeout(Varlink
*v
, usec_t
*ret
) {
1598 assert_return(v
, -EINVAL
);
1600 if (v
->state
== VARLINK_DISCONNECTED
)
1601 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1603 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
) &&
1604 v
->timeout
!= USEC_INFINITY
) {
1606 *ret
= usec_add(v
->timestamp
, v
->timeout
);
1610 *ret
= USEC_INFINITY
;
1615 int varlink_flush(Varlink
*v
) {
1618 assert_return(v
, -EINVAL
);
1620 if (v
->state
== VARLINK_DISCONNECTED
)
1621 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1624 if (v
->output_buffer_size
== 0)
1626 if (v
->write_disconnected
)
1629 r
= varlink_write(v
);
1637 r
= fd_wait_for_event(v
->fd
, POLLOUT
, USEC_INFINITY
);
1638 if (ERRNO_IS_NEG_TRANSIENT(r
))
1641 return varlink_log_errno(v
, r
, "Poll failed on fd: %m");
1644 handle_revents(v
, r
);
1650 static void varlink_detach_server(Varlink
*v
) {
1651 VarlinkServer
*saved_server
;
1657 if (v
->server
->by_uid
&&
1658 v
->ucred_acquired
&&
1659 uid_is_valid(v
->ucred
.uid
)) {
1662 c
= PTR_TO_UINT(hashmap_get(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
)));
1666 (void) hashmap_remove(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
));
1668 (void) hashmap_replace(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
), UINT_TO_PTR(c
- 1));
1671 assert(v
->server
->n_connections
> 0);
1672 v
->server
->n_connections
--;
1674 /* If this is a connection associated to a server, then let's disconnect the server and the
1675 * connection from each other. This drops the dangling reference that connect_callback() set up. But
1676 * before we release the references, let's call the disconnection callback if it is defined. */
1678 saved_server
= TAKE_PTR(v
->server
);
1680 if (saved_server
->disconnect_callback
)
1681 saved_server
->disconnect_callback(saved_server
, v
, saved_server
->userdata
);
1683 varlink_server_test_exit_on_idle(saved_server
);
1684 varlink_server_unref(saved_server
);
1688 int varlink_close(Varlink
*v
) {
1689 assert_return(v
, -EINVAL
);
1691 if (v
->state
== VARLINK_DISCONNECTED
)
1694 varlink_set_state(v
, VARLINK_DISCONNECTED
);
1696 /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref
1697 * which would destroy us before we can call varlink_clear() */
1699 varlink_detach_server(v
);
1706 Varlink
* varlink_close_unref(Varlink
*v
) {
1710 (void) varlink_close(v
);
1711 return varlink_unref(v
);
1714 Varlink
* varlink_flush_close_unref(Varlink
*v
) {
1718 (void) varlink_flush(v
);
1719 return varlink_close_unref(v
);
1722 static int varlink_format_json(Varlink
*v
, JsonVariant
*m
) {
1723 _cleanup_(erase_and_freep
) char *text
= NULL
;
1729 r
= json_variant_format(m
, 0, &text
);
1732 assert(text
[r
] == '\0');
1734 if (v
->output_buffer_size
+ r
+ 1 > VARLINK_BUFFER_MAX
)
1737 varlink_log(v
, "Sending message: %s", text
);
1739 if (v
->output_buffer_size
== 0) {
1741 free_and_replace(v
->output_buffer
, text
);
1743 v
->output_buffer_size
= r
+ 1;
1744 v
->output_buffer_index
= 0;
1746 } else if (v
->output_buffer_index
== 0) {
1748 if (!GREEDY_REALLOC(v
->output_buffer
, v
->output_buffer_size
+ r
+ 1))
1751 memcpy(v
->output_buffer
+ v
->output_buffer_size
, text
, r
+ 1);
1752 v
->output_buffer_size
+= r
+ 1;
1755 const size_t new_size
= v
->output_buffer_size
+ r
+ 1;
1757 n
= new(char, new_size
);
1761 memcpy(mempcpy(n
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
), text
, r
+ 1);
1763 free_and_replace(v
->output_buffer
, n
);
1764 v
->output_buffer_size
= new_size
;
1765 v
->output_buffer_index
= 0;
1768 if (json_variant_is_sensitive(m
))
1769 v
->output_buffer_sensitive
= true; /* Propagate sensitive flag */
1771 text
= mfree(text
); /* No point in the erase_and_free() destructor declared above */
1776 static int varlink_enqueue_json(Varlink
*v
, JsonVariant
*m
) {
1777 VarlinkJsonQueueItem
*q
;
1782 /* If there are no file descriptors to be queued and no queue entries yet we can shortcut things and
1783 * append this entry directly to the output buffer */
1784 if (v
->n_pushed_fds
== 0 && !v
->output_queue
)
1785 return varlink_format_json(v
, m
);
1787 /* Otherwise add a queue entry for this */
1788 q
= varlink_json_queue_item_new(m
, v
->pushed_fds
, v
->n_pushed_fds
);
1792 v
->n_pushed_fds
= 0; /* fds now belong to the queue entry */
1794 LIST_INSERT_AFTER(queue
, v
->output_queue
, v
->output_queue_tail
, q
);
1795 v
->output_queue_tail
= q
;
1799 static int varlink_format_queue(Varlink
*v
) {
1804 /* Takes entries out of the output queue and formats them into the output buffer. But only if this
1805 * would not corrupt our fd message boundaries */
1807 while (v
->output_queue
) {
1808 _cleanup_free_
int *array
= NULL
;
1809 VarlinkJsonQueueItem
*q
= v
->output_queue
;
1811 if (v
->n_output_fds
> 0) /* unwritten fds? if we'd add more we'd corrupt the fd message boundaries, hence wait */
1815 array
= newdup(int, q
->fds
, q
->n_fds
);
1820 r
= varlink_format_json(v
, q
->data
);
1824 /* Take possession of the queue element's fds */
1825 free(v
->output_fds
);
1826 v
->output_fds
= TAKE_PTR(array
);
1827 v
->n_output_fds
= q
->n_fds
;
1830 LIST_REMOVE(queue
, v
->output_queue
, q
);
1831 if (!v
->output_queue
)
1832 v
->output_queue_tail
= NULL
;
1834 varlink_json_queue_item_free(q
);
1840 int varlink_send(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1841 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1844 assert_return(v
, -EINVAL
);
1845 assert_return(method
, -EINVAL
);
1847 if (v
->state
== VARLINK_DISCONNECTED
)
1848 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1850 /* We allow enqueuing multiple method calls at once! */
1851 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
, VARLINK_AWAITING_REPLY
))
1852 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1854 r
= varlink_sanitize_parameters(¶meters
);
1856 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1858 r
= json_build(&m
, JSON_BUILD_OBJECT(
1859 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1860 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1861 JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true))));
1863 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1865 r
= varlink_enqueue_json(v
, m
);
1867 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1869 /* No state change here, this is one-way only after all */
1870 v
->timestamp
= now(CLOCK_MONOTONIC
);
1874 int varlink_sendb(Varlink
*v
, const char *method
, ...) {
1875 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1879 assert_return(v
, -EINVAL
);
1881 va_start(ap
, method
);
1882 r
= json_buildv(¶meters
, ap
);
1886 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1888 return varlink_send(v
, method
, parameters
);
1891 int varlink_invoke(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1892 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1895 assert_return(v
, -EINVAL
);
1896 assert_return(method
, -EINVAL
);
1898 if (v
->state
== VARLINK_DISCONNECTED
)
1899 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1901 /* We allow enqueuing multiple method calls at once! */
1902 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
, VARLINK_AWAITING_REPLY
))
1903 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1905 r
= varlink_sanitize_parameters(¶meters
);
1907 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1909 r
= json_build(&m
, JSON_BUILD_OBJECT(
1910 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1911 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1913 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1915 r
= varlink_enqueue_json(v
, m
);
1917 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1919 varlink_set_state(v
, VARLINK_AWAITING_REPLY
);
1921 v
->timestamp
= now(CLOCK_MONOTONIC
);
1926 int varlink_invokeb(Varlink
*v
, const char *method
, ...) {
1927 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1931 assert_return(v
, -EINVAL
);
1933 va_start(ap
, method
);
1934 r
= json_buildv(¶meters
, ap
);
1938 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1940 return varlink_invoke(v
, method
, parameters
);
1943 int varlink_observe(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1944 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1947 assert_return(v
, -EINVAL
);
1948 assert_return(method
, -EINVAL
);
1950 if (v
->state
== VARLINK_DISCONNECTED
)
1951 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1953 /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
1954 * thus insist on an idle client here. */
1955 if (v
->state
!= VARLINK_IDLE_CLIENT
)
1956 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1958 r
= varlink_sanitize_parameters(¶meters
);
1960 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1962 r
= json_build(&m
, JSON_BUILD_OBJECT(
1963 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1964 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1965 JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
1967 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1969 r
= varlink_enqueue_json(v
, m
);
1971 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1973 varlink_set_state(v
, VARLINK_AWAITING_REPLY_MORE
);
1975 v
->timestamp
= now(CLOCK_MONOTONIC
);
1980 int varlink_observeb(Varlink
*v
, const char *method
, ...) {
1981 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1985 assert_return(v
, -EINVAL
);
1987 va_start(ap
, method
);
1988 r
= json_buildv(¶meters
, ap
);
1992 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1994 return varlink_observe(v
, method
, parameters
);
2000 JsonVariant
*parameters
,
2001 JsonVariant
**ret_parameters
,
2002 const char **ret_error_id
,
2003 VarlinkReplyFlags
*ret_flags
) {
2005 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
2008 assert_return(v
, -EINVAL
);
2009 assert_return(method
, -EINVAL
);
2011 if (v
->state
== VARLINK_DISCONNECTED
)
2012 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
2013 if (v
->state
!= VARLINK_IDLE_CLIENT
)
2014 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
2016 assert(v
->n_pending
== 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
2018 /* If there was still a reply pinned from a previous call, now it's the time to get rid of it, so
2019 * that we can assign a new reply shortly. */
2020 varlink_clear_current(v
);
2022 r
= varlink_sanitize_parameters(¶meters
);
2024 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
2026 r
= json_build(&m
, JSON_BUILD_OBJECT(
2027 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
2028 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
2030 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2032 r
= varlink_enqueue_json(v
, m
);
2034 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
2036 varlink_set_state(v
, VARLINK_CALLING
);
2038 v
->timestamp
= now(CLOCK_MONOTONIC
);
2040 while (v
->state
== VARLINK_CALLING
) {
2042 r
= varlink_process(v
);
2048 r
= varlink_wait(v
, USEC_INFINITY
);
2055 case VARLINK_CALLED
:
2058 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
2059 assert(v
->n_pending
== 1);
2063 *ret_parameters
= json_variant_by_key(v
->current
, "parameters");
2065 *ret_error_id
= json_variant_string(json_variant_by_key(v
->current
, "error"));
2071 case VARLINK_PENDING_DISCONNECT
:
2072 case VARLINK_DISCONNECTED
:
2073 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ECONNRESET
), "Connection was closed.");
2075 case VARLINK_PENDING_TIMEOUT
:
2076 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ETIME
), "Connection timed out.");
2079 assert_not_reached();
2086 JsonVariant
**ret_parameters
,
2087 const char **ret_error_id
,
2088 VarlinkReplyFlags
*ret_flags
, ...) {
2090 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
2094 assert_return(v
, -EINVAL
);
2096 va_start(ap
, ret_flags
);
2097 r
= json_buildv(¶meters
, ap
);
2101 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2103 return varlink_call(v
, method
, parameters
, ret_parameters
, ret_error_id
, ret_flags
);
2106 static void varlink_collect_context_free(VarlinkCollectContext
*cc
) {
2109 json_variant_unref(cc
->parameters
);
2110 free((char *)cc
->error_id
);
2113 static int collect_callback(
2115 JsonVariant
*parameters
,
2116 const char *error_id
,
2117 VarlinkReplyFlags flags
,
2120 VarlinkCollectContext
*context
= ASSERT_PTR(userdata
);
2125 context
->flags
= flags
;
2126 /* If we hit an error, we will drop all collected replies and just return the error_id and flags in varlink_collect() */
2128 context
->error_id
= error_id
;
2132 r
= json_variant_append_array(&context
->parameters
, parameters
);
2134 return varlink_log_errno(v
, r
, "Failed to append JSON object to array: %m");
2139 int varlink_collect(
2142 JsonVariant
*parameters
,
2143 JsonVariant
**ret_parameters
,
2144 const char **ret_error_id
,
2145 VarlinkReplyFlags
*ret_flags
) {
2147 _cleanup_(varlink_collect_context_free
) VarlinkCollectContext context
= {};
2150 assert_return(v
, -EINVAL
);
2151 assert_return(method
, -EINVAL
);
2153 if (v
->state
== VARLINK_DISCONNECTED
)
2154 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
2155 if (v
->state
!= VARLINK_IDLE_CLIENT
)
2156 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
2158 assert(v
->n_pending
== 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
2160 /* If there was still a reply pinned from a previous call, now it's the time to get rid of it, so
2161 * that we can assign a new reply shortly. */
2162 varlink_clear_current(v
);
2164 r
= varlink_bind_reply(v
, collect_callback
);
2166 return varlink_log_errno(v
, r
, "Failed to bind collect callback");
2168 varlink_set_userdata(v
, &context
);
2169 r
= varlink_observe(v
, method
, parameters
);
2171 return varlink_log_errno(v
, r
, "Failed to collect varlink method: %m");
2173 while (v
->state
== VARLINK_AWAITING_REPLY_MORE
) {
2175 r
= varlink_process(v
);
2179 /* If we get an error from any of the replies, return immediately with just the error_id and flags*/
2180 if (context
.error_id
) {
2182 *ret_error_id
= TAKE_PTR(context
.error_id
);
2184 *ret_flags
= context
.flags
;
2191 r
= varlink_wait(v
, USEC_INFINITY
);
2198 case VARLINK_IDLE_CLIENT
:
2201 case VARLINK_PENDING_DISCONNECT
:
2202 case VARLINK_DISCONNECTED
:
2203 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ECONNRESET
), "Connection was closed.");
2205 case VARLINK_PENDING_TIMEOUT
:
2206 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ETIME
), "Connection timed out.");
2209 assert_not_reached();
2213 *ret_parameters
= TAKE_PTR(context
.parameters
);
2215 *ret_error_id
= TAKE_PTR(context
.error_id
);
2217 *ret_flags
= context
.flags
;
2221 int varlink_collectb(
2224 JsonVariant
**ret_parameters
,
2225 const char **ret_error_id
,
2226 VarlinkReplyFlags
*ret_flags
, ...) {
2228 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
2232 assert_return(v
, -EINVAL
);
2234 va_start(ap
, ret_flags
);
2235 r
= json_buildv(¶meters
, ap
);
2239 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2241 return varlink_collect(v
, method
, parameters
, ret_parameters
, ret_error_id
, ret_flags
);
2244 int varlink_reply(Varlink
*v
, JsonVariant
*parameters
) {
2245 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
2248 assert_return(v
, -EINVAL
);
2250 if (v
->state
== VARLINK_DISCONNECTED
)
2252 if (!IN_SET(v
->state
,
2253 VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
,
2254 VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
))
2257 r
= varlink_sanitize_parameters(¶meters
);
2259 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
2261 r
= json_build(&m
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
2263 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2265 if (v
->current_method
) {
2266 const char *bad_field
= NULL
;
2268 r
= varlink_idl_validate_method_reply(v
->current_method
, parameters
, &bad_field
);
2270 varlink_log_errno(v
, r
, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m", v
->current_method
->name
, strna(bad_field
));
2273 r
= varlink_enqueue_json(v
, m
);
2275 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
2277 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
)) {
2278 /* We just replied to a method call that was let hanging for a while (i.e. we were outside of
2279 * the varlink_dispatch_method() stack frame), which means with this reply we are ready to
2280 * process further messages. */
2281 varlink_clear_current(v
);
2282 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
2284 /* We replied to a method call from within the varlink_dispatch_method() stack frame), which
2285 * means we should it handle the rest of the state engine. */
2286 varlink_set_state(v
, VARLINK_PROCESSED_METHOD
);
2291 int varlink_replyb(Varlink
*v
, ...) {
2292 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
2296 assert_return(v
, -EINVAL
);
2299 r
= json_buildv(¶meters
, ap
);
2305 return varlink_reply(v
, parameters
);
2308 int varlink_error(Varlink
*v
, const char *error_id
, JsonVariant
*parameters
) {
2309 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
2312 assert_return(v
, -EINVAL
);
2313 assert_return(error_id
, -EINVAL
);
2315 if (v
->state
== VARLINK_DISCONNECTED
)
2316 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
2317 if (!IN_SET(v
->state
,
2318 VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
,
2319 VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
))
2320 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
2322 /* Reset the list of pushed file descriptors before sending an error reply. We do this here to
2323 * simplify code that puts together a complex reply message with fds, and half-way something
2324 * fails. In that case the pushed fds need to be flushed out again. Under the assumption that it
2325 * never makes sense to send fds along with errors we simply flush them out here beforehand, so that
2326 * the callers don't need to do this explicitly. */
2327 varlink_reset_fds(v
);
2329 r
= varlink_sanitize_parameters(¶meters
);
2331 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
2333 r
= json_build(&m
, JSON_BUILD_OBJECT(
2334 JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id
)),
2335 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
2337 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2339 VarlinkSymbol
*symbol
= hashmap_get(v
->server
->symbols
, error_id
);
2341 varlink_log(v
, "No interface description defined for error '%s', not validating.", error_id
);
2343 const char *bad_field
= NULL
;
2345 r
= varlink_idl_validate_error(symbol
, parameters
, &bad_field
);
2347 varlink_log_errno(v
, r
, "Parameters for error %s didn't pass validation on field '%s', ignoring: %m", error_id
, strna(bad_field
));
2350 r
= varlink_enqueue_json(v
, m
);
2352 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
2354 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
)) {
2355 varlink_clear_current(v
);
2356 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
2358 varlink_set_state(v
, VARLINK_PROCESSED_METHOD
);
2363 int varlink_errorb(Varlink
*v
, const char *error_id
, ...) {
2364 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
2368 assert_return(v
, -EINVAL
);
2369 assert_return(error_id
, -EINVAL
);
2371 va_start(ap
, error_id
);
2372 r
= json_buildv(¶meters
, ap
);
2376 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2378 return varlink_error(v
, error_id
, parameters
);
2381 int varlink_error_invalid_parameter(Varlink
*v
, JsonVariant
*parameters
) {
2384 assert_return(v
, -EINVAL
);
2385 assert_return(parameters
, -EINVAL
);
2387 /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which
2388 * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object
2389 * variant in which case we'll pull out the first key. The latter mode is useful in functions that
2390 * don't expect any arguments. */
2392 /* varlink_error(...) expects a json object as the third parameter. Passing a string variant causes
2393 * parameter sanitization to fail, and it returns -EINVAL. */
2395 if (json_variant_is_string(parameters
)) {
2396 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters_obj
= NULL
;
2398 r
= json_build(¶meters_obj
,
2400 JSON_BUILD_PAIR("parameter", JSON_BUILD_VARIANT(parameters
))));
2404 return varlink_error(v
, VARLINK_ERROR_INVALID_PARAMETER
, parameters_obj
);
2407 if (json_variant_is_object(parameters
) &&
2408 json_variant_elements(parameters
) > 0) {
2409 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters_obj
= NULL
;
2411 r
= json_build(¶meters_obj
,
2413 JSON_BUILD_PAIR("parameter", JSON_BUILD_VARIANT(json_variant_by_index(parameters
, 0)))));
2417 return varlink_error(v
, VARLINK_ERROR_INVALID_PARAMETER
, parameters_obj
);
2423 int varlink_error_invalid_parameter_name(Varlink
*v
, const char *name
) {
2424 return varlink_errorb(
2426 VARLINK_ERROR_INVALID_PARAMETER
,
2427 JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameter", JSON_BUILD_STRING(name
))));
2430 int varlink_error_errno(Varlink
*v
, int error
) {
2431 return varlink_errorb(
2433 VARLINK_ERROR_SYSTEM
,
2434 JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(abs(error
)))));
2437 int varlink_notify(Varlink
*v
, JsonVariant
*parameters
) {
2438 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
2441 assert_return(v
, -EINVAL
);
2443 if (v
->state
== VARLINK_DISCONNECTED
)
2444 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
2446 /* If we want to reply with a notify connection but the caller didn't set "more", then return an
2447 * error indicating that we expected to be called with "more" set */
2448 if (IN_SET(v
->state
, VARLINK_PROCESSING_METHOD
, VARLINK_PENDING_METHOD
))
2449 return varlink_error(v
, VARLINK_ERROR_EXPECTED_MORE
, NULL
);
2451 if (!IN_SET(v
->state
, VARLINK_PROCESSING_METHOD_MORE
, VARLINK_PENDING_METHOD_MORE
))
2452 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
2454 r
= varlink_sanitize_parameters(¶meters
);
2456 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
2458 r
= json_build(&m
, JSON_BUILD_OBJECT(
2459 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
2460 JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true))));
2462 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2464 if (v
->current_method
) {
2465 const char *bad_field
= NULL
;
2467 r
= varlink_idl_validate_method_reply(v
->current_method
, parameters
, &bad_field
);
2469 varlink_log_errno(v
, r
, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m", v
->current_method
->name
, strna(bad_field
));
2472 r
= varlink_enqueue_json(v
, m
);
2474 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
2476 /* No state change, as more is coming */
2480 int varlink_notifyb(Varlink
*v
, ...) {
2481 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
2485 assert_return(v
, -EINVAL
);
2488 r
= json_buildv(¶meters
, ap
);
2492 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2494 return varlink_notify(v
, parameters
);
2497 int varlink_dispatch(Varlink
*v
, JsonVariant
*parameters
, const JsonDispatch table
[], void *userdata
) {
2498 const char *bad_field
= NULL
;
2501 assert_return(v
, -EINVAL
);
2502 assert_return(table
, -EINVAL
);
2504 /* A wrapper around json_dispatch_full() that returns a nice InvalidParameter error if we hit a problem with some field. */
2506 r
= json_dispatch_full(parameters
, table
, /* bad= */ NULL
, /* flags= */ 0, userdata
, &bad_field
);
2509 return varlink_error_invalid_parameter_name(v
, bad_field
);
2516 int varlink_bind_reply(Varlink
*v
, VarlinkReply callback
) {
2517 assert_return(v
, -EINVAL
);
2519 if (callback
&& v
->reply_callback
&& callback
!= v
->reply_callback
)
2520 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "A different callback was already set.");
2522 v
->reply_callback
= callback
;
2527 void* varlink_set_userdata(Varlink
*v
, void *userdata
) {
2530 assert_return(v
, NULL
);
2533 v
->userdata
= userdata
;
2538 void* varlink_get_userdata(Varlink
*v
) {
2539 assert_return(v
, NULL
);
2544 static int varlink_acquire_ucred(Varlink
*v
) {
2549 if (v
->ucred_acquired
)
2552 r
= getpeercred(v
->fd
, &v
->ucred
);
2556 v
->ucred_acquired
= true;
2560 int varlink_get_peer_uid(Varlink
*v
, uid_t
*ret
) {
2563 assert_return(v
, -EINVAL
);
2564 assert_return(ret
, -EINVAL
);
2566 r
= varlink_acquire_ucred(v
);
2568 return varlink_log_errno(v
, r
, "Failed to acquire credentials: %m");
2570 if (!uid_is_valid(v
->ucred
.uid
))
2571 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENODATA
), "Peer uid is invalid.");
2573 *ret
= v
->ucred
.uid
;
2577 int varlink_get_peer_pid(Varlink
*v
, pid_t
*ret
) {
2580 assert_return(v
, -EINVAL
);
2581 assert_return(ret
, -EINVAL
);
2583 r
= varlink_acquire_ucred(v
);
2585 return varlink_log_errno(v
, r
, "Failed to acquire credentials: %m");
2587 if (!pid_is_valid(v
->ucred
.pid
))
2588 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENODATA
), "Peer uid is invalid.");
2590 *ret
= v
->ucred
.pid
;
2594 int varlink_set_relative_timeout(Varlink
*v
, usec_t timeout
) {
2595 assert_return(v
, -EINVAL
);
2596 assert_return(timeout
> 0, -EINVAL
);
2598 v
->timeout
= timeout
;
2602 VarlinkServer
*varlink_get_server(Varlink
*v
) {
2603 assert_return(v
, NULL
);
2608 int varlink_set_description(Varlink
*v
, const char *description
) {
2609 assert_return(v
, -EINVAL
);
2611 return free_and_strdup(&v
->description
, description
);
2614 static int io_callback(sd_event_source
*s
, int fd
, uint32_t revents
, void *userdata
) {
2615 Varlink
*v
= ASSERT_PTR(userdata
);
2619 handle_revents(v
, revents
);
2620 (void) varlink_process(v
);
2625 static int time_callback(sd_event_source
*s
, uint64_t usec
, void *userdata
) {
2626 Varlink
*v
= ASSERT_PTR(userdata
);
2630 (void) varlink_process(v
);
2634 static int defer_callback(sd_event_source
*s
, void *userdata
) {
2635 Varlink
*v
= ASSERT_PTR(userdata
);
2639 (void) varlink_process(v
);
2643 static int prepare_callback(sd_event_source
*s
, void *userdata
) {
2644 Varlink
*v
= ASSERT_PTR(userdata
);
2651 e
= varlink_get_events(v
);
2655 r
= sd_event_source_set_io_events(v
->io_event_source
, e
);
2657 return varlink_log_errno(v
, r
, "Failed to set source events: %m");
2659 r
= varlink_get_timeout(v
, &until
);
2662 have_timeout
= r
> 0;
2665 r
= sd_event_source_set_time(v
->time_event_source
, until
);
2667 return varlink_log_errno(v
, r
, "Failed to set source time: %m");
2670 r
= sd_event_source_set_enabled(v
->time_event_source
, have_timeout
? SD_EVENT_ON
: SD_EVENT_OFF
);
2672 return varlink_log_errno(v
, r
, "Failed to enable event source: %m");
2677 static int quit_callback(sd_event_source
*event
, void *userdata
) {
2678 Varlink
*v
= ASSERT_PTR(userdata
);
2688 int varlink_attach_event(Varlink
*v
, sd_event
*e
, int64_t priority
) {
2691 assert_return(v
, -EINVAL
);
2692 assert_return(!v
->event
, -EBUSY
);
2695 v
->event
= sd_event_ref(e
);
2697 r
= sd_event_default(&v
->event
);
2699 return varlink_log_errno(v
, r
, "Failed to create event source: %m");
2702 r
= sd_event_add_time(v
->event
, &v
->time_event_source
, CLOCK_MONOTONIC
, 0, 0, time_callback
, v
);
2706 r
= sd_event_source_set_priority(v
->time_event_source
, priority
);
2710 (void) sd_event_source_set_description(v
->time_event_source
, "varlink-time");
2712 r
= sd_event_add_exit(v
->event
, &v
->quit_event_source
, quit_callback
, v
);
2716 r
= sd_event_source_set_priority(v
->quit_event_source
, priority
);
2720 (void) sd_event_source_set_description(v
->quit_event_source
, "varlink-quit");
2722 r
= sd_event_add_io(v
->event
, &v
->io_event_source
, v
->fd
, 0, io_callback
, v
);
2726 r
= sd_event_source_set_prepare(v
->io_event_source
, prepare_callback
);
2730 r
= sd_event_source_set_priority(v
->io_event_source
, priority
);
2734 (void) sd_event_source_set_description(v
->io_event_source
, "varlink-io");
2736 r
= sd_event_add_defer(v
->event
, &v
->defer_event_source
, defer_callback
, v
);
2740 r
= sd_event_source_set_priority(v
->defer_event_source
, priority
);
2744 (void) sd_event_source_set_description(v
->defer_event_source
, "varlink-defer");
2749 varlink_log_errno(v
, r
, "Failed to setup event source: %m");
2750 varlink_detach_event(v
);
2754 void varlink_detach_event(Varlink
*v
) {
2758 varlink_detach_event_sources(v
);
2760 v
->event
= sd_event_unref(v
->event
);
2763 sd_event
*varlink_get_event(Varlink
*v
) {
2764 assert_return(v
, NULL
);
2769 int varlink_push_fd(Varlink
*v
, int fd
) {
2772 assert_return(v
, -EINVAL
);
2773 assert_return(fd
>= 0, -EBADF
);
2775 /* Takes an fd to send along with the *next* varlink message sent via this varlink connection. This
2776 * takes ownership of the specified fd. Use varlink_dup_fd() below to duplicate the fd first. */
2778 if (!v
->allow_fd_passing_output
)
2781 if (v
->n_pushed_fds
>= INT_MAX
)
2784 if (!GREEDY_REALLOC(v
->pushed_fds
, v
->n_pushed_fds
+ 1))
2787 i
= (int) v
->n_pushed_fds
;
2788 v
->pushed_fds
[v
->n_pushed_fds
++] = fd
;
2792 int varlink_dup_fd(Varlink
*v
, int fd
) {
2793 _cleanup_close_
int dp
= -1;
2796 assert_return(v
, -EINVAL
);
2797 assert_return(fd
>= 0, -EBADF
);
2799 /* Like varlink_push_fd() but duplicates the specified fd instead of taking possession of it */
2801 dp
= fcntl(fd
, F_DUPFD_CLOEXEC
, 3);
2805 r
= varlink_push_fd(v
, dp
);
2813 int varlink_reset_fds(Varlink
*v
) {
2814 assert_return(v
, -EINVAL
);
2816 /* Closes all currently pending fds to send. This may be used whenever the caller is in the process
2817 * of putting together a message with fds, and then eventually something fails and they need to
2818 * rollback the fds. Note that this is implicitly called whenever an error reply is sent, see above. */
2820 close_many(v
->output_fds
, v
->n_output_fds
);
2821 v
->n_output_fds
= 0;
2825 int varlink_peek_fd(Varlink
*v
, size_t i
) {
2826 assert_return(v
, -EINVAL
);
2828 /* Returns one of the file descriptors that were received along with the current message. This does
2829 * not duplicate the fd nor invalidate it, it hence remains in our possession. */
2831 if (!v
->allow_fd_passing_input
)
2834 if (i
>= v
->n_input_fds
)
2837 return v
->input_fds
[i
];
2840 int varlink_take_fd(Varlink
*v
, size_t i
) {
2841 assert_return(v
, -EINVAL
);
2843 /* Similar to varlink_peek_fd() but the file descriptor's ownership is passed to the caller, and
2844 * we'll invalidate the reference to it under our possession. If called twice in a row will return
2847 if (!v
->allow_fd_passing_input
)
2850 if (i
>= v
->n_input_fds
)
2853 return TAKE_FD(v
->input_fds
[i
]);
2856 static int verify_unix_socket(Varlink
*v
) {
2862 if (fstat(v
->fd
, &st
) < 0)
2864 if (!S_ISSOCK(st
.st_mode
)) {
2869 v
->af
= socket_get_family(v
->fd
);
2874 return v
->af
== AF_UNIX
? 0 : -ENOMEDIUM
;
2877 int varlink_set_allow_fd_passing_input(Varlink
*v
, bool b
) {
2880 assert_return(v
, -EINVAL
);
2882 if (v
->allow_fd_passing_input
== b
)
2886 v
->allow_fd_passing_input
= false;
2890 r
= verify_unix_socket(v
);
2894 v
->allow_fd_passing_input
= true;
2898 int varlink_set_allow_fd_passing_output(Varlink
*v
, bool b
) {
2901 assert_return(v
, -EINVAL
);
2903 if (v
->allow_fd_passing_output
== b
)
2907 v
->allow_fd_passing_output
= false;
2911 r
= verify_unix_socket(v
);
2915 v
->allow_fd_passing_output
= true;
2919 int varlink_server_new(VarlinkServer
**ret
, VarlinkServerFlags flags
) {
2920 _cleanup_(varlink_server_unrefp
) VarlinkServer
*s
= NULL
;
2923 assert_return(ret
, -EINVAL
);
2924 assert_return((flags
& ~_VARLINK_SERVER_FLAGS_ALL
) == 0, -EINVAL
);
2926 s
= new(VarlinkServer
, 1);
2928 return log_oom_debug();
2930 *s
= (VarlinkServer
) {
2933 .connections_max
= varlink_server_connections_max(NULL
),
2934 .connections_per_uid_max
= varlink_server_connections_per_uid_max(NULL
),
2937 r
= varlink_server_add_interface_many(
2939 &vl_interface_io_systemd
,
2940 &vl_interface_org_varlink_service
);
2948 static VarlinkServer
* varlink_server_destroy(VarlinkServer
*s
) {
2954 varlink_server_shutdown(s
);
2956 while ((m
= hashmap_steal_first_key(s
->methods
)))
2959 hashmap_free(s
->methods
);
2960 hashmap_free(s
->interfaces
);
2961 hashmap_free(s
->symbols
);
2962 hashmap_free(s
->by_uid
);
2964 sd_event_unref(s
->event
);
2966 free(s
->description
);
2971 DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer
, varlink_server
, varlink_server_destroy
);
2973 static int validate_connection(VarlinkServer
*server
, const struct ucred
*ucred
) {
2979 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ROOT_ONLY
))
2980 allowed
= ucred
->uid
== 0;
2982 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_MYSELF_ONLY
))
2983 allowed
= allowed
> 0 || ucred
->uid
== getuid();
2985 if (allowed
== 0) { /* Allow access when it is explicitly allowed or when neither
2986 * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */
2987 varlink_server_log(server
, "Unprivileged client attempted connection, refusing.");
2991 if (server
->n_connections
>= server
->connections_max
) {
2992 varlink_server_log(server
, "Connection limit of %u reached, refusing.", server
->connections_max
);
2996 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ACCOUNT_UID
)) {
2999 if (!uid_is_valid(ucred
->uid
)) {
3000 varlink_server_log(server
, "Client with invalid UID attempted connection, refusing.");
3004 c
= PTR_TO_UINT(hashmap_get(server
->by_uid
, UID_TO_PTR(ucred
->uid
)));
3005 if (c
>= server
->connections_per_uid_max
) {
3006 varlink_server_log(server
, "Per-UID connection limit of %u reached, refusing.",
3007 server
->connections_per_uid_max
);
3015 static int count_connection(VarlinkServer
*server
, const struct ucred
*ucred
) {
3022 server
->n_connections
++;
3024 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ACCOUNT_UID
)) {
3025 assert(uid_is_valid(ucred
->uid
));
3027 r
= hashmap_ensure_allocated(&server
->by_uid
, NULL
);
3029 return varlink_server_log_errno(server
, r
, "Failed to allocate UID hash table: %m");
3031 c
= PTR_TO_UINT(hashmap_get(server
->by_uid
, UID_TO_PTR(ucred
->uid
)));
3033 varlink_server_log(server
, "Connections of user " UID_FMT
": %u (of %u max)",
3034 ucred
->uid
, c
, server
->connections_per_uid_max
);
3036 r
= hashmap_replace(server
->by_uid
, UID_TO_PTR(ucred
->uid
), UINT_TO_PTR(c
+ 1));
3038 return varlink_server_log_errno(server
, r
, "Failed to increment counter in UID hash table: %m");
3044 int varlink_server_add_connection(VarlinkServer
*server
, int fd
, Varlink
**ret
) {
3045 _cleanup_(varlink_unrefp
) Varlink
*v
= NULL
;
3046 struct ucred ucred
= UCRED_INVALID
;
3047 bool ucred_acquired
;
3050 assert_return(server
, -EINVAL
);
3051 assert_return(fd
>= 0, -EBADF
);
3053 if ((server
->flags
& (VARLINK_SERVER_ROOT_ONLY
|VARLINK_SERVER_ACCOUNT_UID
)) != 0) {
3054 r
= getpeercred(fd
, &ucred
);
3056 return varlink_server_log_errno(server
, r
, "Failed to acquire peer credentials of incoming socket, refusing: %m");
3058 ucred_acquired
= true;
3060 r
= validate_connection(server
, &ucred
);
3066 ucred_acquired
= false;
3068 r
= varlink_new(&v
);
3070 return varlink_server_log_errno(server
, r
, "Failed to allocate connection object: %m");
3072 r
= count_connection(server
, &ucred
);
3077 if (server
->flags
& VARLINK_SERVER_INHERIT_USERDATA
)
3078 v
->userdata
= server
->userdata
;
3080 if (ucred_acquired
) {
3082 v
->ucred_acquired
= true;
3085 _cleanup_free_
char *desc
= NULL
;
3086 if (asprintf(&desc
, "%s-%i", server
->description
?: "varlink", v
->fd
) >= 0)
3087 v
->description
= TAKE_PTR(desc
);
3089 /* Link up the server and the connection, and take reference in both directions. Note that the
3090 * reference on the connection is left dangling. It will be dropped when the connection is closed,
3091 * which happens in varlink_close(), including in the event loop quit callback. */
3092 v
->server
= varlink_server_ref(server
);
3095 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
3097 if (server
->event
) {
3098 r
= varlink_attach_event(v
, server
->event
, server
->event_priority
);
3100 varlink_log_errno(v
, r
, "Failed to attach new connection: %m");
3101 v
->fd
= -EBADF
; /* take the fd out of the connection again */
3113 static VarlinkServerSocket
*varlink_server_socket_free(VarlinkServerSocket
*ss
) {
3121 DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkServerSocket
*, varlink_server_socket_free
);
3123 static int connect_callback(sd_event_source
*source
, int fd
, uint32_t revents
, void *userdata
) {
3124 VarlinkServerSocket
*ss
= ASSERT_PTR(userdata
);
3125 _cleanup_close_
int cfd
= -EBADF
;
3131 varlink_server_log(ss
->server
, "New incoming connection.");
3133 cfd
= accept4(fd
, NULL
, NULL
, SOCK_NONBLOCK
|SOCK_CLOEXEC
);
3135 if (ERRNO_IS_ACCEPT_AGAIN(errno
))
3138 return varlink_server_log_errno(ss
->server
, errno
, "Failed to accept incoming socket: %m");
3141 r
= varlink_server_add_connection(ss
->server
, cfd
, &v
);
3147 if (ss
->server
->connect_callback
) {
3148 r
= ss
->server
->connect_callback(ss
->server
, v
, ss
->server
->userdata
);
3150 varlink_log_errno(v
, r
, "Connection callback returned error, disconnecting client: %m");
3159 static int varlink_server_create_listen_fd_socket(VarlinkServer
*s
, int fd
, VarlinkServerSocket
**ret_ss
) {
3160 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
3167 ss
= new(VarlinkServerSocket
, 1);
3169 return log_oom_debug();
3171 *ss
= (VarlinkServerSocket
) {
3177 r
= sd_event_add_io(s
->event
, &ss
->event_source
, fd
, EPOLLIN
, connect_callback
, ss
);
3181 r
= sd_event_source_set_priority(ss
->event_source
, s
->event_priority
);
3186 *ret_ss
= TAKE_PTR(ss
);
3190 int varlink_server_listen_fd(VarlinkServer
*s
, int fd
) {
3191 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
3194 assert_return(s
, -EINVAL
);
3195 assert_return(fd
>= 0, -EBADF
);
3197 r
= fd_nonblock(fd
, true);
3201 r
= fd_cloexec(fd
, true);
3205 r
= varlink_server_create_listen_fd_socket(s
, fd
, &ss
);
3209 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));
3213 int varlink_server_listen_address(VarlinkServer
*s
, const char *address
, mode_t m
) {
3214 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
3215 union sockaddr_union sockaddr
;
3216 socklen_t sockaddr_len
;
3217 _cleanup_close_
int fd
= -EBADF
;
3220 assert_return(s
, -EINVAL
);
3221 assert_return(address
, -EINVAL
);
3222 assert_return((m
& ~0777) == 0, -EINVAL
);
3224 r
= sockaddr_un_set_path(&sockaddr
.un
, address
);
3229 fd
= socket(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0);
3233 fd
= fd_move_above_stdio(fd
);
3235 (void) sockaddr_un_unlink(&sockaddr
.un
);
3237 WITH_UMASK(~m
& 0777) {
3238 r
= mac_selinux_bind(fd
, &sockaddr
.sa
, sockaddr_len
);
3243 if (listen(fd
, SOMAXCONN_DELUXE
) < 0)
3246 r
= varlink_server_create_listen_fd_socket(s
, fd
, &ss
);
3250 r
= free_and_strdup(&ss
->address
, address
);
3254 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));
3259 int varlink_server_listen_auto(VarlinkServer
*s
) {
3260 _cleanup_strv_free_
char **names
= NULL
;
3263 assert_return(s
, -EINVAL
);
3265 /* Adds all passed fds marked as "varlink" to our varlink server. These fds can either refer to a
3266 * listening socket or to a connection socket.
3268 * See https://varlink.org/#activation for the environment variables this is backed by and the
3269 * recommended "varlink" identifier in $LISTEN_FDNAMES. */
3271 r
= sd_listen_fds_with_names(/* unset_environment= */ false, &names
);
3275 for (int i
= 0; i
< r
; i
++) {
3277 socklen_t l
= sizeof(b
);
3279 if (!streq(names
[i
], "varlink"))
3282 fd
= SD_LISTEN_FDS_START
+ i
;
3284 if (getsockopt(fd
, SOL_SOCKET
, SO_ACCEPTCONN
, &b
, &l
) < 0)
3287 assert(l
== sizeof(b
));
3289 if (b
) /* Listening socket? */
3290 r
= varlink_server_listen_fd(s
, fd
);
3291 else /* Otherwise assume connection socket */
3292 r
= varlink_server_add_connection(s
, fd
, NULL
);
3302 void* varlink_server_set_userdata(VarlinkServer
*s
, void *userdata
) {
3305 assert_return(s
, NULL
);
3308 s
->userdata
= userdata
;
3313 void* varlink_server_get_userdata(VarlinkServer
*s
) {
3314 assert_return(s
, NULL
);
3319 int varlink_server_loop_auto(VarlinkServer
*server
) {
3320 _cleanup_(sd_event_unrefp
) sd_event
*event
= NULL
;
3323 assert_return(server
, -EINVAL
);
3324 assert_return(!server
->event
, -EBUSY
);
3326 /* Runs a Varlink service event loop populated with a passed fd. Exits on the last connection. */
3328 r
= sd_event_new(&event
);
3332 r
= varlink_server_set_exit_on_idle(server
, true);
3336 r
= varlink_server_attach_event(server
, event
, 0);
3340 r
= varlink_server_listen_auto(server
);
3344 return sd_event_loop(event
);
3347 static VarlinkServerSocket
* varlink_server_socket_destroy(VarlinkServerSocket
*ss
) {
3352 LIST_REMOVE(sockets
, ss
->server
->sockets
, ss
);
3354 sd_event_source_disable_unref(ss
->event_source
);
3362 int varlink_server_shutdown(VarlinkServer
*s
) {
3363 assert_return(s
, -EINVAL
);
3366 varlink_server_socket_destroy(s
->sockets
);
3371 static void varlink_server_test_exit_on_idle(VarlinkServer
*s
) {
3374 if (s
->exit_on_idle
&& s
->event
&& s
->n_connections
== 0)
3375 (void) sd_event_exit(s
->event
, 0);
3378 int varlink_server_set_exit_on_idle(VarlinkServer
*s
, bool b
) {
3379 assert_return(s
, -EINVAL
);
3381 s
->exit_on_idle
= b
;
3382 varlink_server_test_exit_on_idle(s
);
3386 static int varlink_server_add_socket_event_source(VarlinkServer
*s
, VarlinkServerSocket
*ss
, int64_t priority
) {
3387 _cleanup_(sd_event_source_unrefp
) sd_event_source
*es
= NULL
;
3393 assert(ss
->fd
>= 0);
3394 assert(!ss
->event_source
);
3396 r
= sd_event_add_io(s
->event
, &es
, ss
->fd
, EPOLLIN
, connect_callback
, ss
);
3400 r
= sd_event_source_set_priority(es
, priority
);
3404 ss
->event_source
= TAKE_PTR(es
);
3408 int varlink_server_attach_event(VarlinkServer
*s
, sd_event
*e
, int64_t priority
) {
3411 assert_return(s
, -EINVAL
);
3412 assert_return(!s
->event
, -EBUSY
);
3415 s
->event
= sd_event_ref(e
);
3417 r
= sd_event_default(&s
->event
);
3422 LIST_FOREACH(sockets
, ss
, s
->sockets
) {
3423 r
= varlink_server_add_socket_event_source(s
, ss
, priority
);
3428 s
->event_priority
= priority
;
3432 varlink_server_detach_event(s
);
3436 int varlink_server_detach_event(VarlinkServer
*s
) {
3437 assert_return(s
, -EINVAL
);
3439 LIST_FOREACH(sockets
, ss
, s
->sockets
)
3440 ss
->event_source
= sd_event_source_disable_unref(ss
->event_source
);
3442 sd_event_unref(s
->event
);
3446 sd_event
*varlink_server_get_event(VarlinkServer
*s
) {
3447 assert_return(s
, NULL
);
3452 static bool varlink_symbol_in_interface(const char *method
, const char *interface
) {
3458 p
= startswith(method
, interface
);
3465 return !strchr(p
+1, '.');
3468 int varlink_server_bind_method(VarlinkServer
*s
, const char *method
, VarlinkMethod callback
) {
3469 _cleanup_free_
char *m
= NULL
;
3472 assert_return(s
, -EINVAL
);
3473 assert_return(method
, -EINVAL
);
3474 assert_return(callback
, -EINVAL
);
3476 if (varlink_symbol_in_interface(method
, "org.varlink.service") ||
3477 varlink_symbol_in_interface(method
, "io.systemd"))
3478 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EEXIST
), "Cannot bind server to '%s'.", method
);
3482 return log_oom_debug();
3484 r
= hashmap_ensure_put(&s
->methods
, &string_hash_ops
, m
, callback
);
3486 return log_oom_debug();
3488 return varlink_server_log_errno(s
, r
, "Failed to register callback: %m");
3495 int varlink_server_bind_method_many_internal(VarlinkServer
*s
, ...) {
3499 assert_return(s
, -EINVAL
);
3503 VarlinkMethod callback
;
3506 method
= va_arg(ap
, const char *);
3510 callback
= va_arg(ap
, VarlinkMethod
);
3512 r
= varlink_server_bind_method(s
, method
, callback
);
3521 int varlink_server_bind_connect(VarlinkServer
*s
, VarlinkConnect callback
) {
3522 assert_return(s
, -EINVAL
);
3524 if (callback
&& s
->connect_callback
&& callback
!= s
->connect_callback
)
3525 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EBUSY
), "A different callback was already set.");
3527 s
->connect_callback
= callback
;
3531 int varlink_server_bind_disconnect(VarlinkServer
*s
, VarlinkDisconnect callback
) {
3532 assert_return(s
, -EINVAL
);
3534 if (callback
&& s
->disconnect_callback
&& callback
!= s
->disconnect_callback
)
3535 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EBUSY
), "A different callback was already set.");
3537 s
->disconnect_callback
= callback
;
3541 int varlink_server_add_interface(VarlinkServer
*s
, const VarlinkInterface
*interface
) {
3544 assert_return(s
, -EINVAL
);
3545 assert_return(interface
, -EINVAL
);
3546 assert_return(interface
->name
, -EINVAL
);
3548 if (hashmap_contains(s
->interfaces
, interface
->name
))
3549 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EEXIST
), "Duplicate registration of interface '%s'.", interface
->name
);
3551 r
= hashmap_ensure_put(&s
->interfaces
, &string_hash_ops
, interface
->name
, (void*) interface
);
3555 for (const VarlinkSymbol
*const*symbol
= interface
->symbols
; *symbol
; symbol
++) {
3556 _cleanup_free_
char *j
= NULL
;
3558 /* We only ever want to validate method calls/replies and errors against the interface
3559 * definitions, hence don't bother with the type symbols */
3560 if (!IN_SET((*symbol
)->symbol_type
, VARLINK_METHOD
, VARLINK_ERROR
))
3563 j
= strjoin(interface
->name
, ".", (*symbol
)->name
);
3567 r
= hashmap_ensure_put(&s
->symbols
, &string_hash_ops_free
, j
, (void*) *symbol
);
3577 int varlink_server_add_interface_many_internal(VarlinkServer
*s
, ...) {
3581 assert_return(s
, -EINVAL
);
3585 const VarlinkInterface
*interface
= va_arg(ap
, const VarlinkInterface
*);
3589 r
= varlink_server_add_interface(s
, interface
);
3598 unsigned varlink_server_connections_max(VarlinkServer
*s
) {
3601 /* If a server is specified, return the setting for that server, otherwise the default value */
3603 return s
->connections_max
;
3605 dts
= getdtablesize();
3608 /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
3609 if (VARLINK_DEFAULT_CONNECTIONS_MAX
> (unsigned) dts
/ 4 * 3)
3612 return VARLINK_DEFAULT_CONNECTIONS_MAX
;
3615 unsigned varlink_server_connections_per_uid_max(VarlinkServer
*s
) {
3619 return s
->connections_per_uid_max
;
3621 /* Make sure to never use up more than ¾th of available connections for a single user */
3622 m
= varlink_server_connections_max(NULL
);
3623 if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX
> m
)
3626 return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX
;
3629 int varlink_server_set_connections_per_uid_max(VarlinkServer
*s
, unsigned m
) {
3630 assert_return(s
, -EINVAL
);
3631 assert_return(m
> 0, -EINVAL
);
3633 s
->connections_per_uid_max
= m
;
3637 int varlink_server_set_connections_max(VarlinkServer
*s
, unsigned m
) {
3638 assert_return(s
, -EINVAL
);
3639 assert_return(m
> 0, -EINVAL
);
3641 s
->connections_max
= m
;
3645 unsigned varlink_server_current_connections(VarlinkServer
*s
) {
3646 assert_return(s
, UINT_MAX
);
3648 return s
->n_connections
;
3651 int varlink_server_set_description(VarlinkServer
*s
, const char *description
) {
3652 assert_return(s
, -EINVAL
);
3654 return free_and_strdup(&s
->description
, description
);
3657 int varlink_server_serialize(VarlinkServer
*s
, FILE *f
, FDSet
*fds
) {
3664 LIST_FOREACH(sockets
, ss
, s
->sockets
) {
3667 assert(ss
->address
);
3668 assert(ss
->fd
>= 0);
3670 fprintf(f
, "varlink-server-socket-address=%s", ss
->address
);
3672 /* If we fail to serialize the fd, it will be considered an error during deserialization */
3673 copy
= fdset_put_dup(fds
, ss
->fd
);
3677 fprintf(f
, " varlink-server-socket-fd=%i", copy
);
3685 int varlink_server_deserialize_one(VarlinkServer
*s
, const char *value
, FDSet
*fds
) {
3686 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
3687 _cleanup_free_
char *address
= NULL
;
3688 const char *v
= ASSERT_PTR(value
);
3696 n
= strcspn(v
, " ");
3697 address
= strndup(v
, n
);
3699 return log_oom_debug();
3702 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EINVAL
),
3703 "Failed to deserialize VarlinkServerSocket: %s: %m", value
);
3704 v
= startswith(v
+ n
+ 1, "varlink-server-socket-fd=");
3706 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EINVAL
),
3707 "Failed to deserialize VarlinkServerSocket fd %s: %m", value
);
3709 n
= strcspn(v
, " ");
3710 buf
= strndupa_safe(v
, n
);
3714 return varlink_server_log_errno(s
, fd
, "Unable to parse VarlinkServerSocket varlink-server-socket-fd=%s: %m", buf
);
3715 if (!fdset_contains(fds
, fd
))
3716 return varlink_server_log_errno(s
, SYNTHETIC_ERRNO(EBADF
),
3717 "VarlinkServerSocket varlink-server-socket-fd= has unknown fd %d: %m", fd
);
3719 ss
= new(VarlinkServerSocket
, 1);
3721 return log_oom_debug();
3723 *ss
= (VarlinkServerSocket
) {
3725 .address
= TAKE_PTR(address
),
3726 .fd
= fdset_remove(fds
, fd
),
3729 r
= varlink_server_add_socket_event_source(s
, ss
, SD_EVENT_PRIORITY_NORMAL
);
3731 return varlink_server_log_errno(s
, r
, "Failed to add VarlinkServerSocket event source to the event loop: %m");
3733 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));
3737 int varlink_invocation(VarlinkInvocationFlags flags
) {
3738 _cleanup_strv_free_
char **names
= NULL
;
3740 socklen_t l
= sizeof(b
);
3742 /* Returns true if this is a "pure" varlink server invocation, i.e. with one fd passed. */
3744 r
= sd_listen_fds_with_names(/* unset_environment= */ false, &names
);
3750 return -ETOOMANYREFS
;
3752 if (!strv_equal(names
, STRV_MAKE("varlink")))
3755 if (FLAGS_SET(flags
, VARLINK_ALLOW_LISTEN
|VARLINK_ALLOW_ACCEPT
)) /* Both flags set? Then allow everything */
3758 if ((flags
& (VARLINK_ALLOW_LISTEN
|VARLINK_ALLOW_ACCEPT
)) == 0) /* Neither is set, then fail */
3761 if (getsockopt(SD_LISTEN_FDS_START
, SOL_SOCKET
, SO_ACCEPTCONN
, &b
, &l
) < 0)
3764 assert(l
== sizeof(b
));
3766 if (!FLAGS_SET(flags
, b
? VARLINK_ALLOW_LISTEN
: VARLINK_ALLOW_ACCEPT
))