1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
6 #include "alloc-util.h"
7 #include "errno-util.h"
9 #include "glyph-util.h"
13 #include "process-util.h"
14 #include "selinux-util.h"
15 #include "serialize.h"
17 #include "socket-util.h"
18 #include "string-table.h"
19 #include "string-util.h"
21 #include "time-util.h"
22 #include "umask-util.h"
23 #include "user-util.h"
25 #include "varlink-internal.h"
27 #define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
28 #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
30 #define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
31 #define VARLINK_BUFFER_MAX (16U*1024U*1024U)
32 #define VARLINK_READ_SIZE (64U*1024U)
34 typedef enum VarlinkState
{
35 /* Client side states */
37 VARLINK_AWAITING_REPLY
,
38 VARLINK_AWAITING_REPLY_MORE
,
41 VARLINK_PROCESSING_REPLY
,
43 /* Server side states */
45 VARLINK_PROCESSING_METHOD
,
46 VARLINK_PROCESSING_METHOD_MORE
,
47 VARLINK_PROCESSING_METHOD_ONEWAY
,
48 VARLINK_PROCESSED_METHOD
,
49 VARLINK_PENDING_METHOD
,
50 VARLINK_PENDING_METHOD_MORE
,
52 /* Common states (only during shutdown) */
53 VARLINK_PENDING_DISCONNECT
,
54 VARLINK_PENDING_TIMEOUT
,
55 VARLINK_PROCESSING_DISCONNECT
,
56 VARLINK_PROCESSING_TIMEOUT
,
57 VARLINK_PROCESSING_FAILURE
,
61 _VARLINK_STATE_INVALID
= -EINVAL
,
64 /* Tests whether we are not yet disconnected. Note that this is true during all states where the connection
65 * is still good for something, and false only when it's dead for good. This means: when we are
66 * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as
67 * the connection is still good, and we are likely to be able to properly operate on it soon. */
68 #define VARLINK_STATE_IS_ALIVE(state) \
70 VARLINK_IDLE_CLIENT, \
71 VARLINK_AWAITING_REPLY, \
72 VARLINK_AWAITING_REPLY_MORE, \
75 VARLINK_PROCESSING_REPLY, \
76 VARLINK_IDLE_SERVER, \
77 VARLINK_PROCESSING_METHOD, \
78 VARLINK_PROCESSING_METHOD_MORE, \
79 VARLINK_PROCESSING_METHOD_ONEWAY, \
80 VARLINK_PROCESSED_METHOD, \
81 VARLINK_PENDING_METHOD, \
82 VARLINK_PENDING_METHOD_MORE)
84 typedef struct VarlinkJsonQueueItem VarlinkJsonQueueItem
;
86 /* A queued message we shall write into the socket, along with the file descriptors to send at the same
87 * time. This queue item binds them together so that message/fd boundaries are maintained throughout the
89 struct VarlinkJsonQueueItem
{
90 LIST_FIELDS(VarlinkJsonQueueItem
, queue
);
99 VarlinkServer
*server
;
102 bool connecting
; /* This boolean indicates whether the socket fd we are operating on is currently
103 * processing an asynchronous connect(). In that state we watch the socket for
104 * EPOLLOUT, but we refrain from calling read() or write() on the socket as that
105 * will trigger ENOTCONN. Note that this boolean is kept separate from the
106 * VarlinkState above on purpose: while the connect() is still not complete we
107 * already want to allow queuing of messages and similar. Thus it's nice to keep
108 * these two state concepts separate: the VarlinkState encodes what our own view of
109 * the connection is, i.e. whether we think it's a server, a client, and has
110 * something queued already, while 'connecting' tells us a detail about the
111 * transport used below, that should have no effect on how we otherwise accept and
112 * process operations from the user.
114 * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the
115 * connection is good to use, even if it might not be fully connected
116 * yet. connecting=true then informs you that actually we are still connecting, and
117 * the connection is actually not established yet and thus any requests you enqueue
118 * now will still work fine but will be queued only, not sent yet, but that
119 * shouldn't stop you from using the connection, since eventually whatever you queue
122 * Or to say this even differently: 'state' is a high-level ("application layer"
123 * high, if you so will) state, while 'conecting' is a low-level ("transport layer"
124 * low, if you so will) state, and while they are not entirely unrelated and
125 * sometimes propagate effects to each other they are only asynchronously connected
131 char *input_buffer
; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */
132 size_t input_buffer_index
;
133 size_t input_buffer_size
;
134 size_t input_buffer_unscanned
;
136 void *input_control_buffer
;
137 size_t input_control_buffer_size
;
139 char *output_buffer
; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */
140 size_t output_buffer_index
;
141 size_t output_buffer_size
;
143 int *input_fds
; /* file descriptors associated with the data in input_buffer (for fd passing) */
146 int *output_fds
; /* file descriptors associated with the data in output_buffer (for fd passing) */
149 /* Further messages to output not yet formatted into text, and thus not included in output_buffer
150 * yet. We keep them separate from output_buffer, to not violate fd message boundaries: we want that
151 * each fd that is sent is associated with its fds, and that fds cannot be accidentally associated
152 * with preceding or following messages. */
153 LIST_HEAD(VarlinkJsonQueueItem
, output_queue
);
154 VarlinkJsonQueueItem
*output_queue_tail
;
156 /* The fds to associate with the next message that is about to be enqueued. The user first pushes the
157 * fds it intends to send via varlink_push_fd() into this queue, and then once the message data is
158 * submitted we'll combine the fds and the message data into one. */
162 VarlinkReply reply_callback
;
164 JsonVariant
*current
;
167 bool ucred_acquired
:1;
169 bool write_disconnected
:1;
170 bool read_disconnected
:1;
171 bool prefer_read_write
:1;
174 bool allow_fd_passing_input
:1;
175 bool allow_fd_passing_output
:1;
177 bool output_buffer_sensitive
:1; /* whether to erase the output buffer after writing it to the socket */
179 int af
; /* address family if socket; AF_UNSPEC if not socket; negative if not known */
188 sd_event_source
*io_event_source
;
189 sd_event_source
*time_event_source
;
190 sd_event_source
*quit_event_source
;
191 sd_event_source
*defer_event_source
;
194 typedef struct VarlinkServerSocket VarlinkServerSocket
;
196 struct VarlinkServerSocket
{
197 VarlinkServer
*server
;
202 sd_event_source
*event_source
;
204 LIST_FIELDS(VarlinkServerSocket
, sockets
);
207 struct VarlinkServer
{
209 VarlinkServerFlags flags
;
211 LIST_HEAD(VarlinkServerSocket
, sockets
);
214 VarlinkConnect connect_callback
;
215 VarlinkDisconnect disconnect_callback
;
218 int64_t event_priority
;
220 unsigned n_connections
;
226 unsigned connections_max
;
227 unsigned connections_per_uid_max
;
230 static const char* const varlink_state_table
[_VARLINK_STATE_MAX
] = {
231 [VARLINK_IDLE_CLIENT
] = "idle-client",
232 [VARLINK_AWAITING_REPLY
] = "awaiting-reply",
233 [VARLINK_AWAITING_REPLY_MORE
] = "awaiting-reply-more",
234 [VARLINK_CALLING
] = "calling",
235 [VARLINK_CALLED
] = "called",
236 [VARLINK_PROCESSING_REPLY
] = "processing-reply",
237 [VARLINK_IDLE_SERVER
] = "idle-server",
238 [VARLINK_PROCESSING_METHOD
] = "processing-method",
239 [VARLINK_PROCESSING_METHOD_MORE
] = "processing-method-more",
240 [VARLINK_PROCESSING_METHOD_ONEWAY
] = "processing-method-oneway",
241 [VARLINK_PROCESSED_METHOD
] = "processed-method",
242 [VARLINK_PENDING_METHOD
] = "pending-method",
243 [VARLINK_PENDING_METHOD_MORE
] = "pending-method-more",
244 [VARLINK_PENDING_DISCONNECT
] = "pending-disconnect",
245 [VARLINK_PENDING_TIMEOUT
] = "pending-timeout",
246 [VARLINK_PROCESSING_DISCONNECT
] = "processing-disconnect",
247 [VARLINK_PROCESSING_TIMEOUT
] = "processing-timeout",
248 [VARLINK_PROCESSING_FAILURE
] = "processing-failure",
249 [VARLINK_DISCONNECTED
] = "disconnected",
252 DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state
, VarlinkState
);
254 #define varlink_log_errno(v, error, fmt, ...) \
255 log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
257 #define varlink_log(v, fmt, ...) \
258 log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__)
260 #define varlink_server_log_errno(s, error, fmt, ...) \
261 log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
263 #define varlink_server_log(s, fmt, ...) \
264 log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
266 static int varlink_format_queue(Varlink
*v
);
268 static inline const char *varlink_description(Varlink
*v
) {
269 return (v
? v
->description
: NULL
) ?: "varlink";
272 static inline const char *varlink_server_description(VarlinkServer
*s
) {
273 return (s
? s
->description
: NULL
) ?: "varlink";
276 static VarlinkJsonQueueItem
*varlink_json_queue_item_free(VarlinkJsonQueueItem
*q
) {
280 json_variant_unref(q
->data
);
281 close_many(q
->fds
, q
->n_fds
);
286 static VarlinkJsonQueueItem
*varlink_json_queue_item_new(JsonVariant
*m
, const int fds
[], size_t n_fds
) {
287 VarlinkJsonQueueItem
*q
;
290 assert(fds
|| n_fds
== 0);
292 q
= malloc(offsetof(VarlinkJsonQueueItem
, fds
) + sizeof(int) * n_fds
);
296 *q
= (VarlinkJsonQueueItem
) {
297 .data
= json_variant_ref(m
),
301 memcpy_safe(q
->fds
, fds
, n_fds
* sizeof(int));
306 static void varlink_set_state(Varlink
*v
, VarlinkState state
) {
308 assert(state
>= 0 && state
< _VARLINK_STATE_MAX
);
311 varlink_log(v
, "Setting state %s",
312 varlink_state_to_string(state
));
314 varlink_log(v
, "Changing state %s %s %s",
315 varlink_state_to_string(v
->state
),
316 special_glyph(SPECIAL_GLYPH_ARROW_RIGHT
),
317 varlink_state_to_string(state
));
322 static int varlink_new(Varlink
**ret
) {
335 .state
= _VARLINK_STATE_INVALID
,
337 .ucred
= UCRED_INVALID
,
339 .timestamp
= USEC_INFINITY
,
340 .timeout
= VARLINK_DEFAULT_TIMEOUT_USEC
,
349 int varlink_connect_address(Varlink
**ret
, const char *address
) {
350 _cleanup_(varlink_unrefp
) Varlink
*v
= NULL
;
351 union sockaddr_union sockaddr
;
354 assert_return(ret
, -EINVAL
);
355 assert_return(address
, -EINVAL
);
359 return log_debug_errno(r
, "Failed to create varlink object: %m");
361 v
->fd
= socket(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0);
363 return log_debug_errno(errno
, "Failed to create AF_UNIX socket: %m");
365 v
->fd
= fd_move_above_stdio(v
->fd
);
368 r
= sockaddr_un_set_path(&sockaddr
.un
, address
);
370 if (r
!= -ENAMETOOLONG
)
371 return log_debug_errno(r
, "Failed to set socket address '%s': %m", address
);
373 /* This is a file system path, and too long to fit into sockaddr_un. Let's connect via O_PATH
376 r
= connect_unix_path(v
->fd
, AT_FDCWD
, address
);
378 r
= RET_NERRNO(connect(v
->fd
, &sockaddr
.sa
, r
));
381 if (!IN_SET(r
, -EAGAIN
, -EINPROGRESS
))
382 return log_debug_errno(r
, "Failed to connect to %s: %m", address
);
384 v
->connecting
= true; /* We are asynchronously connecting, i.e. the connect() is being
385 * processed in the background. As long as that's the case the socket
386 * is in a special state: it's there, we can poll it for EPOLLOUT, but
387 * if we attempt to write() to it before we see EPOLLOUT we'll get
388 * ENOTCONN (and not EAGAIN, like we would for a normal connected
389 * socket that isn't writable at the moment). Since ENOTCONN on write()
390 * hence can mean two different things (i.e. connection not complete
391 * yet vs. already disconnected again), we store as a boolean whether
392 * we are still in connect(). */
395 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
401 int varlink_connect_fd(Varlink
**ret
, int fd
) {
405 assert_return(ret
, -EINVAL
);
406 assert_return(fd
>= 0, -EBADF
);
408 r
= fd_nonblock(fd
, true);
410 return log_debug_errno(r
, "Failed to make fd %d nonblocking: %m", fd
);
414 return log_debug_errno(r
, "Failed to create varlink object: %m");
418 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
420 /* Note that if this function is called we assume the passed socket (if it is one) is already
421 * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that
422 * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket
423 * until the connection is fully set up. Behaviour here is hence a bit different from
424 * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
425 * avoid doing write() on it before we saw EPOLLOUT for the first time. */
431 static void varlink_detach_event_sources(Varlink
*v
) {
434 v
->io_event_source
= sd_event_source_disable_unref(v
->io_event_source
);
435 v
->time_event_source
= sd_event_source_disable_unref(v
->time_event_source
);
436 v
->quit_event_source
= sd_event_source_disable_unref(v
->quit_event_source
);
437 v
->defer_event_source
= sd_event_source_disable_unref(v
->defer_event_source
);
440 static void varlink_clear_current(Varlink
*v
) {
443 /* Clears the currently processed incoming message */
444 v
->current
= json_variant_unref(v
->current
);
446 close_many(v
->input_fds
, v
->n_input_fds
);
447 v
->input_fds
= mfree(v
->input_fds
);
451 static void varlink_clear(Varlink
*v
) {
454 varlink_detach_event_sources(v
);
456 v
->fd
= safe_close(v
->fd
);
458 varlink_clear_current(v
);
460 v
->input_buffer
= mfree(v
->input_buffer
);
461 v
->output_buffer
= v
->output_buffer_sensitive
? erase_and_free(v
->output_buffer
) : mfree(v
->output_buffer
);
463 v
->input_control_buffer
= mfree(v
->input_control_buffer
);
464 v
->input_control_buffer_size
= 0;
466 varlink_clear_current(v
);
468 close_many(v
->output_fds
, v
->n_output_fds
);
469 v
->output_fds
= mfree(v
->output_fds
);
472 close_many(v
->pushed_fds
, v
->n_pushed_fds
);
473 v
->pushed_fds
= mfree(v
->pushed_fds
);
476 while (v
->output_queue
) {
477 VarlinkJsonQueueItem
*q
= v
->output_queue
;
479 LIST_REMOVE(queue
, v
->output_queue
, q
);
480 varlink_json_queue_item_free(q
);
482 v
->output_queue_tail
= NULL
;
484 v
->event
= sd_event_unref(v
->event
);
487 static Varlink
* varlink_destroy(Varlink
*v
) {
491 /* If this is called the server object must already been unreffed here. Why that? because when we
492 * linked up the varlink connection with the server object we took one ref in each direction */
497 free(v
->description
);
501 DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink
, varlink
, varlink_destroy
);
503 static int varlink_test_disconnect(Varlink
*v
) {
506 /* Tests whether we the connection has been terminated. We are careful to not stop processing it
507 * prematurely, since we want to handle half-open connections as well as possible and want to flush
508 * out and read data before we close down if we can. */
510 /* Already disconnected? */
511 if (!VARLINK_STATE_IS_ALIVE(v
->state
))
514 /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
518 /* Still something to write and we can write? Stay around */
519 if (v
->output_buffer_size
> 0 && !v
->write_disconnected
)
522 /* Both sides gone already? Then there's no need to stick around */
523 if (v
->read_disconnected
&& v
->write_disconnected
)
526 /* If we are waiting for incoming data but the read side is shut down, disconnect. */
527 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
) && v
->read_disconnected
)
530 /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
531 * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
532 * being down if we never wrote anything. */
533 if (v
->state
== VARLINK_IDLE_CLIENT
&& (v
->write_disconnected
|| v
->got_pollhup
))
536 /* We are on the server side and still want to send out more replies, but we saw POLLHUP already, and
537 * either got no buffered bytes to write anymore or already saw a write error. In that case we should
538 * shut down the varlink link. */
539 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
) && (v
->write_disconnected
|| v
->output_buffer_size
== 0) && v
->got_pollhup
)
545 varlink_set_state(v
, VARLINK_PENDING_DISCONNECT
);
549 static int varlink_write(Varlink
*v
) {
555 if (!VARLINK_STATE_IS_ALIVE(v
->state
))
557 if (v
->connecting
) /* Writing while we are still wait for a non-blocking connect() to complete will
558 * result in ENOTCONN, hence exit early here */
560 if (v
->write_disconnected
)
563 /* If needed let's convert some output queue json variants into text form */
564 r
= varlink_format_queue(v
);
568 if (v
->output_buffer_size
== 0)
573 if (v
->n_output_fds
> 0) { /* If we shall send fds along, we must use sendmsg() */
575 .iov_base
= v
->output_buffer
+ v
->output_buffer_index
,
576 .iov_len
= v
->output_buffer_size
,
581 .msg_controllen
= CMSG_SPACE(sizeof(int) * v
->n_output_fds
),
584 mh
.msg_control
= alloca0(mh
.msg_controllen
);
586 struct cmsghdr
*control
= CMSG_FIRSTHDR(&mh
);
587 control
->cmsg_len
= CMSG_LEN(sizeof(int) * v
->n_output_fds
);
588 control
->cmsg_level
= SOL_SOCKET
;
589 control
->cmsg_type
= SCM_RIGHTS
;
590 memcpy(CMSG_DATA(control
), v
->output_fds
, sizeof(int) * v
->n_output_fds
);
592 n
= sendmsg(v
->fd
, &mh
, MSG_DONTWAIT
|MSG_NOSIGNAL
);
594 /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
595 * with non-socket IO, hence fall back automatically.
597 * Use a local variable to help gcc figure out that we set 'n' in all cases. */
598 bool prefer_write
= v
->prefer_read_write
;
600 n
= send(v
->fd
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
, MSG_DONTWAIT
|MSG_NOSIGNAL
);
601 if (n
< 0 && errno
== ENOTSOCK
)
602 prefer_write
= v
->prefer_read_write
= true;
605 n
= write(v
->fd
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
);
611 if (ERRNO_IS_DISCONNECT(errno
)) {
612 /* If we get informed about a disconnect on write, then let's remember that, but not
613 * act on it just yet. Let's wait for read() to report the issue first. */
614 v
->write_disconnected
= true;
621 if (v
->output_buffer_sensitive
)
622 explicit_bzero_safe(v
->output_buffer
+ v
->output_buffer_index
, n
);
624 v
->output_buffer_size
-= n
;
626 if (v
->output_buffer_size
== 0) {
627 v
->output_buffer_index
= 0;
628 v
->output_buffer_sensitive
= false; /* We can reset the sensitive flag once the buffer is empty */
630 v
->output_buffer_index
+= n
;
632 close_many(v
->output_fds
, v
->n_output_fds
);
635 v
->timestamp
= now(CLOCK_MONOTONIC
);
639 #define VARLINK_FDS_MAX (16U*1024U)
641 static int varlink_read(Varlink
*v
) {
650 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
))
652 if (v
->connecting
) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
656 if (v
->input_buffer_unscanned
> 0)
658 if (v
->read_disconnected
)
661 if (v
->input_buffer_size
>= VARLINK_BUFFER_MAX
)
666 if (MALLOC_SIZEOF_SAFE(v
->input_buffer
) <= v
->input_buffer_index
+ v
->input_buffer_size
) {
669 add
= MIN(VARLINK_BUFFER_MAX
- v
->input_buffer_size
, VARLINK_READ_SIZE
);
671 if (v
->input_buffer_index
== 0) {
673 if (!GREEDY_REALLOC(v
->input_buffer
, v
->input_buffer_size
+ add
))
679 b
= new(char, v
->input_buffer_size
+ add
);
683 memcpy(b
, v
->input_buffer
+ v
->input_buffer_index
, v
->input_buffer_size
);
685 free_and_replace(v
->input_buffer
, b
);
686 v
->input_buffer_index
= 0;
690 p
= v
->input_buffer
+ v
->input_buffer_index
+ v
->input_buffer_size
;
691 rs
= MALLOC_SIZEOF_SAFE(v
->input_buffer
) - (v
->input_buffer_index
+ v
->input_buffer_size
);
693 if (v
->allow_fd_passing_input
) {
694 iov
= IOVEC_MAKE(p
, rs
);
696 /* Allocate the fd buffer on the heap, since we need a lot of space potentially */
697 if (!v
->input_control_buffer
) {
698 v
->input_control_buffer_size
= CMSG_SPACE(sizeof(int) * VARLINK_FDS_MAX
);
699 v
->input_control_buffer
= malloc(v
->input_control_buffer_size
);
700 if (!v
->input_control_buffer
)
704 mh
= (struct msghdr
) {
707 .msg_control
= v
->input_control_buffer
,
708 .msg_controllen
= v
->input_control_buffer_size
,
711 n
= recvmsg_safe(v
->fd
, &mh
, MSG_DONTWAIT
|MSG_CMSG_CLOEXEC
);
713 bool prefer_read
= v
->prefer_read_write
;
715 n
= recv(v
->fd
, p
, rs
, MSG_DONTWAIT
);
716 if (n
< 0 && errno
== ENOTSOCK
)
717 prefer_read
= v
->prefer_read_write
= true;
720 n
= read(v
->fd
, p
, rs
);
726 if (ERRNO_IS_DISCONNECT(errno
)) {
727 v
->read_disconnected
= true;
733 if (n
== 0) { /* EOF */
735 if (v
->allow_fd_passing_input
)
738 v
->read_disconnected
= true;
742 if (v
->allow_fd_passing_input
) {
743 struct cmsghdr
* cmsg
;
745 cmsg
= cmsg_find(&mh
, SOL_SOCKET
, SCM_RIGHTS
, (socklen_t
) -1);
749 /* We only allow file descriptors to be passed along with the first byte of a
750 * message. If they are passed with any other byte this is a protocol violation. */
751 if (v
->input_buffer_size
!= 0) {
756 add
= (cmsg
->cmsg_len
- CMSG_LEN(0)) / sizeof(int);
757 if (add
> INT_MAX
- v
->n_input_fds
) {
762 if (!GREEDY_REALLOC(v
->input_fds
, v
->n_input_fds
+ add
)) {
767 memcpy_safe(v
->input_fds
+ v
->n_input_fds
, CMSG_TYPED_DATA(cmsg
, int), add
* sizeof(int));
768 v
->n_input_fds
+= add
;
772 v
->input_buffer_size
+= n
;
773 v
->input_buffer_unscanned
+= n
;
778 static int varlink_parse_message(Varlink
*v
) {
779 const char *e
, *begin
;
787 if (v
->input_buffer_unscanned
<= 0)
790 assert(v
->input_buffer_unscanned
<= v
->input_buffer_size
);
791 assert(v
->input_buffer_index
+ v
->input_buffer_size
<= MALLOC_SIZEOF_SAFE(v
->input_buffer
));
793 begin
= v
->input_buffer
+ v
->input_buffer_index
;
795 e
= memchr(begin
+ v
->input_buffer_size
- v
->input_buffer_unscanned
, 0, v
->input_buffer_unscanned
);
797 v
->input_buffer_unscanned
= 0;
803 varlink_log(v
, "New incoming message: %s", begin
); /* FIXME: should we output the whole message here before validation?
804 * This may produce a non-printable journal entry if the message
805 * is invalid. We may also expose privileged information. */
807 r
= json_parse(begin
, 0, &v
->current
, NULL
, NULL
);
809 /* If we encounter a parse failure flush all data. We cannot possibly recover from this,
810 * hence drop all buffered data now. */
811 v
->input_buffer_index
= v
->input_buffer_size
= v
->input_buffer_unscanned
= 0;
812 return varlink_log_errno(v
, r
, "Failed to parse JSON: %m");
815 v
->input_buffer_size
-= sz
;
817 if (v
->input_buffer_size
== 0)
818 v
->input_buffer_index
= 0;
820 v
->input_buffer_index
+= sz
;
822 v
->input_buffer_unscanned
= v
->input_buffer_size
;
826 static int varlink_test_timeout(Varlink
*v
) {
829 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
))
831 if (v
->timeout
== USEC_INFINITY
)
834 if (now(CLOCK_MONOTONIC
) < usec_add(v
->timestamp
, v
->timeout
))
837 varlink_set_state(v
, VARLINK_PENDING_TIMEOUT
);
842 static int varlink_dispatch_local_error(Varlink
*v
, const char *error
) {
848 if (!v
->reply_callback
)
851 r
= v
->reply_callback(v
, NULL
, error
, VARLINK_REPLY_ERROR
|VARLINK_REPLY_LOCAL
, v
->userdata
);
853 log_debug_errno(r
, "Reply callback returned error, ignoring: %m");
858 static int varlink_dispatch_timeout(Varlink
*v
) {
861 if (v
->state
!= VARLINK_PENDING_TIMEOUT
)
864 varlink_set_state(v
, VARLINK_PROCESSING_TIMEOUT
);
865 varlink_dispatch_local_error(v
, VARLINK_ERROR_TIMEOUT
);
871 static int varlink_dispatch_disconnect(Varlink
*v
) {
874 if (v
->state
!= VARLINK_PENDING_DISCONNECT
)
877 varlink_set_state(v
, VARLINK_PROCESSING_DISCONNECT
);
878 varlink_dispatch_local_error(v
, VARLINK_ERROR_DISCONNECTED
);
884 static int varlink_sanitize_parameters(JsonVariant
**v
) {
887 /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */
889 return json_variant_new_object(v
, NULL
, 0);
890 else if (!json_variant_is_object(*v
))
896 static int varlink_dispatch_reply(Varlink
*v
) {
897 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
898 VarlinkReplyFlags flags
= 0;
899 const char *error
= NULL
;
906 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
))
911 assert(v
->n_pending
> 0);
913 if (!json_variant_is_object(v
->current
))
916 JSON_VARIANT_OBJECT_FOREACH(k
, e
, v
->current
) {
918 if (streq(k
, "error")) {
921 if (!json_variant_is_string(e
))
924 error
= json_variant_string(e
);
925 flags
|= VARLINK_REPLY_ERROR
;
927 } else if (streq(k
, "parameters")) {
930 if (!json_variant_is_object(e
))
933 parameters
= json_variant_ref(e
);
935 } else if (streq(k
, "continues")) {
936 if (FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
939 if (!json_variant_is_boolean(e
))
942 if (json_variant_boolean(e
))
943 flags
|= VARLINK_REPLY_CONTINUES
;
948 /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
949 if (v
->state
!= VARLINK_AWAITING_REPLY_MORE
&& FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
952 /* An error is final */
953 if (error
&& FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
956 r
= varlink_sanitize_parameters(¶meters
);
960 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
)) {
961 varlink_set_state(v
, VARLINK_PROCESSING_REPLY
);
963 if (v
->reply_callback
) {
964 r
= v
->reply_callback(v
, parameters
, error
, flags
, v
->userdata
);
966 log_debug_errno(r
, "Reply callback returned error, ignoring: %m");
969 varlink_clear_current(v
);
971 if (v
->state
== VARLINK_PROCESSING_REPLY
) {
973 assert(v
->n_pending
> 0);
975 if (!FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
979 FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
) ? VARLINK_AWAITING_REPLY_MORE
:
980 v
->n_pending
== 0 ? VARLINK_IDLE_CLIENT
: VARLINK_AWAITING_REPLY
);
983 assert(v
->state
== VARLINK_CALLING
);
984 varlink_set_state(v
, VARLINK_CALLED
);
990 varlink_set_state(v
, VARLINK_PROCESSING_FAILURE
);
991 varlink_dispatch_local_error(v
, VARLINK_ERROR_PROTOCOL
);
997 static int varlink_dispatch_method(Varlink
*v
) {
998 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
999 VarlinkMethodFlags flags
= 0;
1000 const char *method
= NULL
, *error
;
1002 VarlinkMethod callback
;
1008 if (v
->state
!= VARLINK_IDLE_SERVER
)
1013 if (!json_variant_is_object(v
->current
))
1016 JSON_VARIANT_OBJECT_FOREACH(k
, e
, v
->current
) {
1018 if (streq(k
, "method")) {
1021 if (!json_variant_is_string(e
))
1024 method
= json_variant_string(e
);
1026 } else if (streq(k
, "parameters")) {
1029 if (!json_variant_is_object(e
))
1032 parameters
= json_variant_ref(e
);
1034 } else if (streq(k
, "oneway")) {
1036 if ((flags
& (VARLINK_METHOD_ONEWAY
|VARLINK_METHOD_MORE
)) != 0)
1039 if (!json_variant_is_boolean(e
))
1042 if (json_variant_boolean(e
))
1043 flags
|= VARLINK_METHOD_ONEWAY
;
1045 } else if (streq(k
, "more")) {
1047 if ((flags
& (VARLINK_METHOD_ONEWAY
|VARLINK_METHOD_MORE
)) != 0)
1050 if (!json_variant_is_boolean(e
))
1053 if (json_variant_boolean(e
))
1054 flags
|= VARLINK_METHOD_MORE
;
1063 r
= varlink_sanitize_parameters(¶meters
);
1067 varlink_set_state(v
, (flags
& VARLINK_METHOD_MORE
) ? VARLINK_PROCESSING_METHOD_MORE
:
1068 (flags
& VARLINK_METHOD_ONEWAY
) ? VARLINK_PROCESSING_METHOD_ONEWAY
:
1069 VARLINK_PROCESSING_METHOD
);
1073 if (STR_IN_SET(method
, "org.varlink.service.GetInfo", "org.varlink.service.GetInterface")) {
1074 /* For now, we don't implement a single of varlink's own methods */
1076 error
= VARLINK_ERROR_METHOD_NOT_IMPLEMENTED
;
1077 } else if (startswith(method
, "org.varlink.service.")) {
1079 error
= VARLINK_ERROR_METHOD_NOT_FOUND
;
1081 callback
= hashmap_get(v
->server
->methods
, method
);
1082 error
= VARLINK_ERROR_METHOD_NOT_FOUND
;
1086 r
= callback(v
, parameters
, flags
, v
->userdata
);
1088 log_debug_errno(r
, "Callback for %s returned error: %m", method
);
1090 /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */
1091 if (!FLAGS_SET(flags
, VARLINK_METHOD_ONEWAY
)) {
1092 r
= varlink_error_errno(v
, r
);
1097 } else if (!FLAGS_SET(flags
, VARLINK_METHOD_ONEWAY
)) {
1100 r
= varlink_errorb(v
, error
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
))));
1107 case VARLINK_PROCESSED_METHOD
: /* Method call is fully processed */
1108 case VARLINK_PROCESSING_METHOD_ONEWAY
: /* ditto */
1109 varlink_clear_current(v
);
1110 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
1113 case VARLINK_PROCESSING_METHOD
: /* Method call wasn't replied to, will be replied to later */
1114 varlink_set_state(v
, VARLINK_PENDING_METHOD
);
1117 case VARLINK_PROCESSING_METHOD_MORE
: /* No reply for a "more" message was sent, more to come */
1118 varlink_set_state(v
, VARLINK_PENDING_METHOD_MORE
);
1122 assert_not_reached();
1132 varlink_set_state(v
, VARLINK_PROCESSING_FAILURE
);
1133 varlink_dispatch_local_error(v
, VARLINK_ERROR_PROTOCOL
);
1139 int varlink_process(Varlink
*v
) {
1142 assert_return(v
, -EINVAL
);
1144 if (v
->state
== VARLINK_DISCONNECTED
)
1145 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1149 r
= varlink_write(v
);
1151 varlink_log_errno(v
, r
, "Write failed: %m");
1155 r
= varlink_dispatch_reply(v
);
1157 varlink_log_errno(v
, r
, "Reply dispatch failed: %m");
1161 r
= varlink_dispatch_method(v
);
1163 varlink_log_errno(v
, r
, "Method dispatch failed: %m");
1167 r
= varlink_parse_message(v
);
1169 varlink_log_errno(v
, r
, "Message parsing failed: %m");
1173 r
= varlink_read(v
);
1175 varlink_log_errno(v
, r
, "Read failed: %m");
1179 r
= varlink_test_disconnect(v
);
1184 r
= varlink_dispatch_disconnect(v
);
1189 r
= varlink_test_timeout(v
);
1194 r
= varlink_dispatch_timeout(v
);
1200 if (r
>= 0 && v
->defer_event_source
) {
1203 /* If we did some processing, make sure we are called again soon */
1204 q
= sd_event_source_set_enabled(v
->defer_event_source
, r
> 0 ? SD_EVENT_ON
: SD_EVENT_OFF
);
1206 r
= varlink_log_errno(v
, q
, "Failed to enable deferred event source: %m");
1210 if (VARLINK_STATE_IS_ALIVE(v
->state
))
1211 /* Initiate disconnection */
1212 varlink_set_state(v
, VARLINK_PENDING_DISCONNECT
);
1214 /* We failed while disconnecting, in that case close right away */
1222 static void handle_revents(Varlink
*v
, int revents
) {
1225 if (v
->connecting
) {
1226 /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
1227 * to complete on, we know we are ready. We don't read the connection error here though,
1228 * we'll get the error on the next read() or write(). */
1229 if ((revents
& (POLLOUT
|POLLHUP
)) == 0)
1232 varlink_log(v
, "Asynchronous connection completed.");
1233 v
->connecting
= false;
1235 /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
1236 * what we can. However, we do care about POLLHUP to detect connection termination even if we
1237 * momentarily don't want to read nor write anything. */
1239 if (!FLAGS_SET(revents
, POLLHUP
))
1242 varlink_log(v
, "Got POLLHUP from socket.");
1243 v
->got_pollhup
= true;
1247 int varlink_wait(Varlink
*v
, usec_t timeout
) {
1251 assert_return(v
, -EINVAL
);
1253 if (v
->state
== VARLINK_DISCONNECTED
)
1254 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1256 r
= varlink_get_timeout(v
, &t
);
1259 if (t
!= USEC_INFINITY
) {
1262 n
= now(CLOCK_MONOTONIC
);
1266 t
= usec_sub_unsigned(t
, n
);
1269 if (timeout
!= USEC_INFINITY
&&
1270 (t
== USEC_INFINITY
|| timeout
< t
))
1273 fd
= varlink_get_fd(v
);
1277 events
= varlink_get_events(v
);
1281 r
= fd_wait_for_event(fd
, events
, t
);
1282 if (ERRNO_IS_NEG_TRANSIENT(r
)) /* Treat EINTR as not a timeout, but also nothing happened, and
1283 * the caller gets a chance to call back into us */
1288 handle_revents(v
, r
);
1292 int varlink_get_fd(Varlink
*v
) {
1294 assert_return(v
, -EINVAL
);
1296 if (v
->state
== VARLINK_DISCONNECTED
)
1297 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1299 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBADF
), "No valid fd.");
1304 int varlink_get_events(Varlink
*v
) {
1307 assert_return(v
, -EINVAL
);
1309 if (v
->state
== VARLINK_DISCONNECTED
)
1310 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1312 if (v
->connecting
) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
1313 * tells us that the connection is now complete. Before that we should neither
1314 * write() or read() from the fd. */
1317 if (!v
->read_disconnected
&&
1318 IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
) &&
1320 v
->input_buffer_unscanned
<= 0)
1323 if (!v
->write_disconnected
&&
1324 v
->output_buffer_size
> 0)
1330 int varlink_get_timeout(Varlink
*v
, usec_t
*ret
) {
1331 assert_return(v
, -EINVAL
);
1333 if (v
->state
== VARLINK_DISCONNECTED
)
1334 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1336 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
) &&
1337 v
->timeout
!= USEC_INFINITY
) {
1339 *ret
= usec_add(v
->timestamp
, v
->timeout
);
1343 *ret
= USEC_INFINITY
;
1348 int varlink_flush(Varlink
*v
) {
1351 assert_return(v
, -EINVAL
);
1353 if (v
->state
== VARLINK_DISCONNECTED
)
1354 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1357 if (v
->output_buffer_size
== 0)
1359 if (v
->write_disconnected
)
1362 r
= varlink_write(v
);
1370 r
= fd_wait_for_event(v
->fd
, POLLOUT
, USEC_INFINITY
);
1371 if (ERRNO_IS_NEG_TRANSIENT(r
))
1374 return varlink_log_errno(v
, r
, "Poll failed on fd: %m");
1377 handle_revents(v
, r
);
1383 static void varlink_detach_server(Varlink
*v
) {
1384 VarlinkServer
*saved_server
;
1390 if (v
->server
->by_uid
&&
1391 v
->ucred_acquired
&&
1392 uid_is_valid(v
->ucred
.uid
)) {
1395 c
= PTR_TO_UINT(hashmap_get(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
)));
1399 (void) hashmap_remove(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
));
1401 (void) hashmap_replace(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
), UINT_TO_PTR(c
- 1));
1404 assert(v
->server
->n_connections
> 0);
1405 v
->server
->n_connections
--;
1407 /* If this is a connection associated to a server, then let's disconnect the server and the
1408 * connection from each other. This drops the dangling reference that connect_callback() set up. But
1409 * before we release the references, let's call the disconnection callback if it is defined. */
1411 saved_server
= TAKE_PTR(v
->server
);
1413 if (saved_server
->disconnect_callback
)
1414 saved_server
->disconnect_callback(saved_server
, v
, saved_server
->userdata
);
1416 varlink_server_unref(saved_server
);
1420 int varlink_close(Varlink
*v
) {
1421 assert_return(v
, -EINVAL
);
1423 if (v
->state
== VARLINK_DISCONNECTED
)
1426 varlink_set_state(v
, VARLINK_DISCONNECTED
);
1428 /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref
1429 * which would destroy us before we can call varlink_clear() */
1431 varlink_detach_server(v
);
1438 Varlink
* varlink_close_unref(Varlink
*v
) {
1442 (void) varlink_close(v
);
1443 return varlink_unref(v
);
1446 Varlink
* varlink_flush_close_unref(Varlink
*v
) {
1450 (void) varlink_flush(v
);
1451 return varlink_close_unref(v
);
1454 static int varlink_format_json(Varlink
*v
, JsonVariant
*m
) {
1455 _cleanup_(erase_and_freep
) char *text
= NULL
;
1461 r
= json_variant_format(m
, 0, &text
);
1464 assert(text
[r
] == '\0');
1466 if (v
->output_buffer_size
+ r
+ 1 > VARLINK_BUFFER_MAX
)
1469 varlink_log(v
, "Sending message: %s", text
);
1471 if (v
->output_buffer_size
== 0) {
1473 free_and_replace(v
->output_buffer
, text
);
1475 v
->output_buffer_size
= r
+ 1;
1476 v
->output_buffer_index
= 0;
1478 } else if (v
->output_buffer_index
== 0) {
1480 if (!GREEDY_REALLOC(v
->output_buffer
, v
->output_buffer_size
+ r
+ 1))
1483 memcpy(v
->output_buffer
+ v
->output_buffer_size
, text
, r
+ 1);
1484 v
->output_buffer_size
+= r
+ 1;
1487 const size_t new_size
= v
->output_buffer_size
+ r
+ 1;
1489 n
= new(char, new_size
);
1493 memcpy(mempcpy(n
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
), text
, r
+ 1);
1495 free_and_replace(v
->output_buffer
, n
);
1496 v
->output_buffer_size
= new_size
;
1497 v
->output_buffer_index
= 0;
1500 if (json_variant_is_sensitive(m
))
1501 v
->output_buffer_sensitive
= true; /* Propagate sensitive flag */
1503 text
= mfree(text
); /* No point in the erase_and_free() destructor declared above */
1508 static int varlink_enqueue_json(Varlink
*v
, JsonVariant
*m
) {
1509 VarlinkJsonQueueItem
*q
;
1514 /* If there are no file descriptors to be queued and no queue entries yet we can shortcut things and
1515 * append this entry directly to the output buffer */
1516 if (v
->n_pushed_fds
== 0 && !v
->output_queue
)
1517 return varlink_format_json(v
, m
);
1519 /* Otherwise add a queue entry for this */
1520 q
= varlink_json_queue_item_new(m
, v
->pushed_fds
, v
->n_pushed_fds
);
1524 v
->n_pushed_fds
= 0; /* fds now belong to the queue entry */
1526 LIST_INSERT_AFTER(queue
, v
->output_queue
, v
->output_queue_tail
, q
);
1527 v
->output_queue_tail
= q
;
1531 static int varlink_format_queue(Varlink
*v
) {
1536 /* Takes entries out of the output queue and formats them into the output buffer. But only if this
1537 * would not corrupt our fd message boundaries */
1539 while (v
->output_queue
) {
1540 _cleanup_free_
int *array
= NULL
;
1541 VarlinkJsonQueueItem
*q
= v
->output_queue
;
1543 if (v
->n_output_fds
> 0) /* unwritten fds? if we'd add more we'd corrupt the fd message boundaries, hence wait */
1547 array
= newdup(int, q
->fds
, q
->n_fds
);
1552 r
= varlink_format_json(v
, q
->data
);
1556 /* Take possession of the queue element's fds */
1557 free(v
->output_fds
);
1558 v
->output_fds
= TAKE_PTR(array
);
1559 v
->n_output_fds
= q
->n_fds
;
1562 LIST_REMOVE(queue
, v
->output_queue
, q
);
1563 if (!v
->output_queue
)
1564 v
->output_queue_tail
= NULL
;
1566 varlink_json_queue_item_free(q
);
1572 int varlink_send(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1573 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1576 assert_return(v
, -EINVAL
);
1577 assert_return(method
, -EINVAL
);
1579 if (v
->state
== VARLINK_DISCONNECTED
)
1580 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1582 /* We allow enqueuing multiple method calls at once! */
1583 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
, VARLINK_AWAITING_REPLY
))
1584 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1586 r
= varlink_sanitize_parameters(¶meters
);
1588 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1590 r
= json_build(&m
, JSON_BUILD_OBJECT(
1591 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1592 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1593 JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true))));
1595 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1597 r
= varlink_enqueue_json(v
, m
);
1599 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1601 /* No state change here, this is one-way only after all */
1602 v
->timestamp
= now(CLOCK_MONOTONIC
);
1606 int varlink_sendb(Varlink
*v
, const char *method
, ...) {
1607 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1611 assert_return(v
, -EINVAL
);
1613 va_start(ap
, method
);
1614 r
= json_buildv(¶meters
, ap
);
1618 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1620 return varlink_send(v
, method
, parameters
);
1623 int varlink_invoke(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1624 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1627 assert_return(v
, -EINVAL
);
1628 assert_return(method
, -EINVAL
);
1630 if (v
->state
== VARLINK_DISCONNECTED
)
1631 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1633 /* We allow enqueuing multiple method calls at once! */
1634 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
, VARLINK_AWAITING_REPLY
))
1635 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1637 r
= varlink_sanitize_parameters(¶meters
);
1639 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1641 r
= json_build(&m
, JSON_BUILD_OBJECT(
1642 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1643 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1645 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1647 r
= varlink_enqueue_json(v
, m
);
1649 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1651 varlink_set_state(v
, VARLINK_AWAITING_REPLY
);
1653 v
->timestamp
= now(CLOCK_MONOTONIC
);
1658 int varlink_invokeb(Varlink
*v
, const char *method
, ...) {
1659 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1663 assert_return(v
, -EINVAL
);
1665 va_start(ap
, method
);
1666 r
= json_buildv(¶meters
, ap
);
1670 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1672 return varlink_invoke(v
, method
, parameters
);
1675 int varlink_observe(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1676 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1679 assert_return(v
, -EINVAL
);
1680 assert_return(method
, -EINVAL
);
1682 if (v
->state
== VARLINK_DISCONNECTED
)
1683 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1685 /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
1686 * thus insist on an idle client here. */
1687 if (v
->state
!= VARLINK_IDLE_CLIENT
)
1688 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1690 r
= varlink_sanitize_parameters(¶meters
);
1692 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1694 r
= json_build(&m
, JSON_BUILD_OBJECT(
1695 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1696 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1697 JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
1699 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1701 r
= varlink_enqueue_json(v
, m
);
1703 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1705 varlink_set_state(v
, VARLINK_AWAITING_REPLY_MORE
);
1707 v
->timestamp
= now(CLOCK_MONOTONIC
);
1712 int varlink_observeb(Varlink
*v
, const char *method
, ...) {
1713 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1717 assert_return(v
, -EINVAL
);
1719 va_start(ap
, method
);
1720 r
= json_buildv(¶meters
, ap
);
1724 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1726 return varlink_observe(v
, method
, parameters
);
1732 JsonVariant
*parameters
,
1733 JsonVariant
**ret_parameters
,
1734 const char **ret_error_id
,
1735 VarlinkReplyFlags
*ret_flags
) {
1737 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1740 assert_return(v
, -EINVAL
);
1741 assert_return(method
, -EINVAL
);
1743 if (v
->state
== VARLINK_DISCONNECTED
)
1744 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1745 if (v
->state
!= VARLINK_IDLE_CLIENT
)
1746 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1748 assert(v
->n_pending
== 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
1750 /* If there was still a reply pinned from a previous call, now it's the time to get rid of it, so
1751 * that we can assign a new reply shortly. */
1752 varlink_clear_current(v
);
1754 r
= varlink_sanitize_parameters(¶meters
);
1756 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1758 r
= json_build(&m
, JSON_BUILD_OBJECT(
1759 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1760 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1762 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1764 r
= varlink_enqueue_json(v
, m
);
1766 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1768 varlink_set_state(v
, VARLINK_CALLING
);
1770 v
->timestamp
= now(CLOCK_MONOTONIC
);
1772 while (v
->state
== VARLINK_CALLING
) {
1774 r
= varlink_process(v
);
1780 r
= varlink_wait(v
, USEC_INFINITY
);
1787 case VARLINK_CALLED
:
1790 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
1791 assert(v
->n_pending
== 1);
1795 *ret_parameters
= json_variant_by_key(v
->current
, "parameters");
1797 *ret_error_id
= json_variant_string(json_variant_by_key(v
->current
, "error"));
1803 case VARLINK_PENDING_DISCONNECT
:
1804 case VARLINK_DISCONNECTED
:
1805 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ECONNRESET
), "Connection was closed.");
1807 case VARLINK_PENDING_TIMEOUT
:
1808 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ETIME
), "Connection timed out.");
1811 assert_not_reached();
1818 JsonVariant
**ret_parameters
,
1819 const char **ret_error_id
,
1820 VarlinkReplyFlags
*ret_flags
, ...) {
1822 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1826 assert_return(v
, -EINVAL
);
1828 va_start(ap
, ret_flags
);
1829 r
= json_buildv(¶meters
, ap
);
1833 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1835 return varlink_call(v
, method
, parameters
, ret_parameters
, ret_error_id
, ret_flags
);
1838 int varlink_reply(Varlink
*v
, JsonVariant
*parameters
) {
1839 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1842 assert_return(v
, -EINVAL
);
1844 if (v
->state
== VARLINK_DISCONNECTED
)
1846 if (!IN_SET(v
->state
,
1847 VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
,
1848 VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
))
1851 r
= varlink_sanitize_parameters(¶meters
);
1853 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1855 r
= json_build(&m
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1857 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1859 r
= varlink_enqueue_json(v
, m
);
1861 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1863 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
)) {
1864 /* We just replied to a method call that was let hanging for a while (i.e. we were outside of
1865 * the varlink_dispatch_method() stack frame), which means with this reply we are ready to
1866 * process further messages. */
1867 varlink_clear_current(v
);
1868 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
1870 /* We replied to a method call from within the varlink_dispatch_method() stack frame), which
1871 * means we should it handle the rest of the state engine. */
1872 varlink_set_state(v
, VARLINK_PROCESSED_METHOD
);
1877 int varlink_replyb(Varlink
*v
, ...) {
1878 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1882 assert_return(v
, -EINVAL
);
1885 r
= json_buildv(¶meters
, ap
);
1891 return varlink_reply(v
, parameters
);
1894 int varlink_error(Varlink
*v
, const char *error_id
, JsonVariant
*parameters
) {
1895 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1898 assert_return(v
, -EINVAL
);
1899 assert_return(error_id
, -EINVAL
);
1901 if (v
->state
== VARLINK_DISCONNECTED
)
1902 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
1903 if (!IN_SET(v
->state
,
1904 VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
,
1905 VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
))
1906 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
1908 /* Reset the list of pushed file descriptors before sending an error reply. We do this here to
1909 * simplify code that puts together a complex reply message with fds, and half-way something
1910 * fails. In that case the pushed fds need to be flushed out again. Under the assumption that it
1911 * never makes sense to send fds along with errors we simply flush them out here beforehand, so that
1912 * the callers don't need to do this explicitly. */
1913 varlink_reset_fds(v
);
1915 r
= varlink_sanitize_parameters(¶meters
);
1917 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
1919 r
= json_build(&m
, JSON_BUILD_OBJECT(
1920 JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id
)),
1921 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1923 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1925 r
= varlink_enqueue_json(v
, m
);
1927 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
1929 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
)) {
1930 varlink_clear_current(v
);
1931 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
1933 varlink_set_state(v
, VARLINK_PROCESSED_METHOD
);
1938 int varlink_errorb(Varlink
*v
, const char *error_id
, ...) {
1939 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1943 assert_return(v
, -EINVAL
);
1944 assert_return(error_id
, -EINVAL
);
1946 va_start(ap
, error_id
);
1947 r
= json_buildv(¶meters
, ap
);
1951 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
1953 return varlink_error(v
, error_id
, parameters
);
1956 int varlink_error_invalid_parameter(Varlink
*v
, JsonVariant
*parameters
) {
1959 assert_return(v
, -EINVAL
);
1960 assert_return(parameters
, -EINVAL
);
1962 /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which
1963 * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object
1964 * variant in which case we'll pull out the first key. The latter mode is useful in functions that
1965 * don't expect any arguments. */
1967 /* varlink_error(...) expects a json object as the third parameter. Passing a string variant causes
1968 * parameter sanitization to fail, and it returns -EINVAL. */
1970 if (json_variant_is_string(parameters
)) {
1971 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters_obj
= NULL
;
1973 r
= json_build(¶meters_obj
,
1975 JSON_BUILD_PAIR("parameter", JSON_BUILD_VARIANT(parameters
))));
1979 return varlink_error(v
, VARLINK_ERROR_INVALID_PARAMETER
, parameters_obj
);
1982 if (json_variant_is_object(parameters
) &&
1983 json_variant_elements(parameters
) > 0) {
1984 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters_obj
= NULL
;
1986 r
= json_build(¶meters_obj
,
1988 JSON_BUILD_PAIR("parameter", JSON_BUILD_VARIANT(json_variant_by_index(parameters
, 0)))));
1992 return varlink_error(v
, VARLINK_ERROR_INVALID_PARAMETER
, parameters_obj
);
1998 int varlink_error_errno(Varlink
*v
, int error
) {
1999 return varlink_errorb(
2001 VARLINK_ERROR_SYSTEM
,
2002 JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(abs(error
)))));
2005 int varlink_notify(Varlink
*v
, JsonVariant
*parameters
) {
2006 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
2009 assert_return(v
, -EINVAL
);
2011 if (v
->state
== VARLINK_DISCONNECTED
)
2012 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENOTCONN
), "Not connected.");
2013 if (!IN_SET(v
->state
, VARLINK_PROCESSING_METHOD_MORE
, VARLINK_PENDING_METHOD_MORE
))
2014 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "Connection busy.");
2016 r
= varlink_sanitize_parameters(¶meters
);
2018 return varlink_log_errno(v
, r
, "Failed to sanitize parameters: %m");
2020 r
= json_build(&m
, JSON_BUILD_OBJECT(
2021 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
2022 JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true))));
2024 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2026 r
= varlink_enqueue_json(v
, m
);
2028 return varlink_log_errno(v
, r
, "Failed to enqueue json message: %m");
2030 /* No state change, as more is coming */
2034 int varlink_notifyb(Varlink
*v
, ...) {
2035 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
2039 assert_return(v
, -EINVAL
);
2042 r
= json_buildv(¶meters
, ap
);
2046 return varlink_log_errno(v
, r
, "Failed to build json message: %m");
2048 return varlink_notify(v
, parameters
);
2051 int varlink_bind_reply(Varlink
*v
, VarlinkReply callback
) {
2052 assert_return(v
, -EINVAL
);
2054 if (callback
&& v
->reply_callback
&& callback
!= v
->reply_callback
)
2055 return varlink_log_errno(v
, SYNTHETIC_ERRNO(EBUSY
), "A different callback was already set.");
2057 v
->reply_callback
= callback
;
2062 void* varlink_set_userdata(Varlink
*v
, void *userdata
) {
2065 assert_return(v
, NULL
);
2068 v
->userdata
= userdata
;
2073 void* varlink_get_userdata(Varlink
*v
) {
2074 assert_return(v
, NULL
);
2079 static int varlink_acquire_ucred(Varlink
*v
) {
2084 if (v
->ucred_acquired
)
2087 r
= getpeercred(v
->fd
, &v
->ucred
);
2091 v
->ucred_acquired
= true;
2095 int varlink_get_peer_uid(Varlink
*v
, uid_t
*ret
) {
2098 assert_return(v
, -EINVAL
);
2099 assert_return(ret
, -EINVAL
);
2101 r
= varlink_acquire_ucred(v
);
2103 return varlink_log_errno(v
, r
, "Failed to acquire credentials: %m");
2105 if (!uid_is_valid(v
->ucred
.uid
))
2106 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENODATA
), "Peer uid is invalid.");
2108 *ret
= v
->ucred
.uid
;
2112 int varlink_get_peer_pid(Varlink
*v
, pid_t
*ret
) {
2115 assert_return(v
, -EINVAL
);
2116 assert_return(ret
, -EINVAL
);
2118 r
= varlink_acquire_ucred(v
);
2120 return varlink_log_errno(v
, r
, "Failed to acquire credentials: %m");
2122 if (!pid_is_valid(v
->ucred
.pid
))
2123 return varlink_log_errno(v
, SYNTHETIC_ERRNO(ENODATA
), "Peer uid is invalid.");
2125 *ret
= v
->ucred
.pid
;
2129 int varlink_set_relative_timeout(Varlink
*v
, usec_t timeout
) {
2130 assert_return(v
, -EINVAL
);
2131 assert_return(timeout
> 0, -EINVAL
);
2133 v
->timeout
= timeout
;
2137 VarlinkServer
*varlink_get_server(Varlink
*v
) {
2138 assert_return(v
, NULL
);
2143 int varlink_set_description(Varlink
*v
, const char *description
) {
2144 assert_return(v
, -EINVAL
);
2146 return free_and_strdup(&v
->description
, description
);
2149 static int io_callback(sd_event_source
*s
, int fd
, uint32_t revents
, void *userdata
) {
2150 Varlink
*v
= ASSERT_PTR(userdata
);
2154 handle_revents(v
, revents
);
2155 (void) varlink_process(v
);
2160 static int time_callback(sd_event_source
*s
, uint64_t usec
, void *userdata
) {
2161 Varlink
*v
= ASSERT_PTR(userdata
);
2165 (void) varlink_process(v
);
2169 static int defer_callback(sd_event_source
*s
, void *userdata
) {
2170 Varlink
*v
= ASSERT_PTR(userdata
);
2174 (void) varlink_process(v
);
2178 static int prepare_callback(sd_event_source
*s
, void *userdata
) {
2179 Varlink
*v
= ASSERT_PTR(userdata
);
2186 e
= varlink_get_events(v
);
2190 r
= sd_event_source_set_io_events(v
->io_event_source
, e
);
2192 return varlink_log_errno(v
, r
, "Failed to set source events: %m");
2194 r
= varlink_get_timeout(v
, &until
);
2197 have_timeout
= r
> 0;
2200 r
= sd_event_source_set_time(v
->time_event_source
, until
);
2202 return varlink_log_errno(v
, r
, "Failed to set source time: %m");
2205 r
= sd_event_source_set_enabled(v
->time_event_source
, have_timeout
? SD_EVENT_ON
: SD_EVENT_OFF
);
2207 return varlink_log_errno(v
, r
, "Failed to enable event source: %m");
2212 static int quit_callback(sd_event_source
*event
, void *userdata
) {
2213 Varlink
*v
= ASSERT_PTR(userdata
);
2223 int varlink_attach_event(Varlink
*v
, sd_event
*e
, int64_t priority
) {
2226 assert_return(v
, -EINVAL
);
2227 assert_return(!v
->event
, -EBUSY
);
2230 v
->event
= sd_event_ref(e
);
2232 r
= sd_event_default(&v
->event
);
2234 return varlink_log_errno(v
, r
, "Failed to create event source: %m");
2237 r
= sd_event_add_time(v
->event
, &v
->time_event_source
, CLOCK_MONOTONIC
, 0, 0, time_callback
, v
);
2241 r
= sd_event_source_set_priority(v
->time_event_source
, priority
);
2245 (void) sd_event_source_set_description(v
->time_event_source
, "varlink-time");
2247 r
= sd_event_add_exit(v
->event
, &v
->quit_event_source
, quit_callback
, v
);
2251 r
= sd_event_source_set_priority(v
->quit_event_source
, priority
);
2255 (void) sd_event_source_set_description(v
->quit_event_source
, "varlink-quit");
2257 r
= sd_event_add_io(v
->event
, &v
->io_event_source
, v
->fd
, 0, io_callback
, v
);
2261 r
= sd_event_source_set_prepare(v
->io_event_source
, prepare_callback
);
2265 r
= sd_event_source_set_priority(v
->io_event_source
, priority
);
2269 (void) sd_event_source_set_description(v
->io_event_source
, "varlink-io");
2271 r
= sd_event_add_defer(v
->event
, &v
->defer_event_source
, defer_callback
, v
);
2275 r
= sd_event_source_set_priority(v
->defer_event_source
, priority
);
2279 (void) sd_event_source_set_description(v
->defer_event_source
, "varlink-defer");
2284 varlink_log_errno(v
, r
, "Failed to setup event source: %m");
2285 varlink_detach_event(v
);
2289 void varlink_detach_event(Varlink
*v
) {
2293 varlink_detach_event_sources(v
);
2295 v
->event
= sd_event_unref(v
->event
);
2298 sd_event
*varlink_get_event(Varlink
*v
) {
2299 assert_return(v
, NULL
);
2304 int varlink_push_fd(Varlink
*v
, int fd
) {
2307 assert_return(v
, -EINVAL
);
2308 assert_return(fd
>= 0, -EBADF
);
2310 /* Takes an fd to send along with the *next* varlink message sent via this varlink connection. This
2311 * takes ownership of the specified fd. Use varlink_dup_fd() below to duplicate the fd first. */
2313 if (!v
->allow_fd_passing_output
)
2316 if (v
->n_pushed_fds
>= INT_MAX
)
2319 if (!GREEDY_REALLOC(v
->pushed_fds
, v
->n_pushed_fds
+ 1))
2322 i
= (int) v
->n_pushed_fds
;
2323 v
->pushed_fds
[v
->n_pushed_fds
++] = fd
;
2327 int varlink_dup_fd(Varlink
*v
, int fd
) {
2328 _cleanup_close_
int dp
= -1;
2331 assert_return(v
, -EINVAL
);
2332 assert_return(fd
>= 0, -EBADF
);
2334 /* Like varlink_push_fd() but duplicates the specified fd instead of taking possession of it */
2336 dp
= fcntl(fd
, F_DUPFD_CLOEXEC
, 3);
2340 r
= varlink_push_fd(v
, dp
);
2348 int varlink_reset_fds(Varlink
*v
) {
2349 assert_return(v
, -EINVAL
);
2351 /* Closes all currently pending fds to send. This may be used whenever the caller is in the process
2352 * of putting together a message with fds, and then eventually something fails and they need to
2353 * rollback the fds. Note that this is implicitly called whenever an error reply is sent, see above. */
2355 close_many(v
->output_fds
, v
->n_output_fds
);
2356 v
->n_output_fds
= 0;
2360 int varlink_peek_fd(Varlink
*v
, size_t i
) {
2361 assert_return(v
, -EINVAL
);
2363 /* Returns one of the file descriptors that were received along with the current message. This does
2364 * not duplicate the fd nor invalidate it, it hence remains in our possession. */
2366 if (!v
->allow_fd_passing_input
)
2369 if (i
>= v
->n_input_fds
)
2372 return v
->input_fds
[i
];
2375 int varlink_take_fd(Varlink
*v
, size_t i
) {
2376 assert_return(v
, -EINVAL
);
2378 /* Similar to varlink_peek_fd() but the file descriptor's ownership is passed to the caller, and
2379 * we'll invalidate the reference to it under our possession. If called twice in a row will return
2382 if (!v
->allow_fd_passing_input
)
2385 if (i
>= v
->n_input_fds
)
2388 return TAKE_FD(v
->input_fds
[i
]);
2391 static int verify_unix_socket(Varlink
*v
) {
2397 if (fstat(v
->fd
, &st
) < 0)
2399 if (!S_ISSOCK(st
.st_mode
)) {
2404 v
->af
= socket_get_family(v
->fd
);
2409 return v
->af
== AF_UNIX
? 0 : -ENOMEDIUM
;
2412 int varlink_set_allow_fd_passing_input(Varlink
*v
, bool b
) {
2415 assert_return(v
, -EINVAL
);
2417 if (v
->allow_fd_passing_input
== b
)
2421 v
->allow_fd_passing_input
= false;
2425 r
= verify_unix_socket(v
);
2429 v
->allow_fd_passing_input
= true;
2433 int varlink_set_allow_fd_passing_output(Varlink
*v
, bool b
) {
2436 assert_return(v
, -EINVAL
);
2438 if (v
->allow_fd_passing_output
== b
)
2442 v
->allow_fd_passing_output
= false;
2446 r
= verify_unix_socket(v
);
2450 v
->allow_fd_passing_output
= true;
2454 int varlink_server_new(VarlinkServer
**ret
, VarlinkServerFlags flags
) {
2457 assert_return(ret
, -EINVAL
);
2458 assert_return((flags
& ~_VARLINK_SERVER_FLAGS_ALL
) == 0, -EINVAL
);
2460 s
= new(VarlinkServer
, 1);
2462 return log_oom_debug();
2464 *s
= (VarlinkServer
) {
2467 .connections_max
= varlink_server_connections_max(NULL
),
2468 .connections_per_uid_max
= varlink_server_connections_per_uid_max(NULL
),
2475 static VarlinkServer
* varlink_server_destroy(VarlinkServer
*s
) {
2481 varlink_server_shutdown(s
);
2483 while ((m
= hashmap_steal_first_key(s
->methods
)))
2486 hashmap_free(s
->methods
);
2487 hashmap_free(s
->by_uid
);
2489 sd_event_unref(s
->event
);
2491 free(s
->description
);
2496 DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer
, varlink_server
, varlink_server_destroy
);
2498 static int validate_connection(VarlinkServer
*server
, const struct ucred
*ucred
) {
2504 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ROOT_ONLY
))
2505 allowed
= ucred
->uid
== 0;
2507 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_MYSELF_ONLY
))
2508 allowed
= allowed
> 0 || ucred
->uid
== getuid();
2510 if (allowed
== 0) { /* Allow access when it is explicitly allowed or when neither
2511 * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */
2512 varlink_server_log(server
, "Unprivileged client attempted connection, refusing.");
2516 if (server
->n_connections
>= server
->connections_max
) {
2517 varlink_server_log(server
, "Connection limit of %u reached, refusing.", server
->connections_max
);
2521 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ACCOUNT_UID
)) {
2524 if (!uid_is_valid(ucred
->uid
)) {
2525 varlink_server_log(server
, "Client with invalid UID attempted connection, refusing.");
2529 c
= PTR_TO_UINT(hashmap_get(server
->by_uid
, UID_TO_PTR(ucred
->uid
)));
2530 if (c
>= server
->connections_per_uid_max
) {
2531 varlink_server_log(server
, "Per-UID connection limit of %u reached, refusing.",
2532 server
->connections_per_uid_max
);
2540 static int count_connection(VarlinkServer
*server
, const struct ucred
*ucred
) {
2547 server
->n_connections
++;
2549 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ACCOUNT_UID
)) {
2550 r
= hashmap_ensure_allocated(&server
->by_uid
, NULL
);
2552 return log_debug_errno(r
, "Failed to allocate UID hash table: %m");
2554 c
= PTR_TO_UINT(hashmap_get(server
->by_uid
, UID_TO_PTR(ucred
->uid
)));
2556 varlink_server_log(server
, "Connections of user " UID_FMT
": %u (of %u max)",
2557 ucred
->uid
, c
, server
->connections_per_uid_max
);
2559 r
= hashmap_replace(server
->by_uid
, UID_TO_PTR(ucred
->uid
), UINT_TO_PTR(c
+ 1));
2561 return log_debug_errno(r
, "Failed to increment counter in UID hash table: %m");
2567 int varlink_server_add_connection(VarlinkServer
*server
, int fd
, Varlink
**ret
) {
2568 _cleanup_(varlink_unrefp
) Varlink
*v
= NULL
;
2569 struct ucred ucred
= UCRED_INVALID
;
2570 bool ucred_acquired
;
2573 assert_return(server
, -EINVAL
);
2574 assert_return(fd
>= 0, -EBADF
);
2576 if ((server
->flags
& (VARLINK_SERVER_ROOT_ONLY
|VARLINK_SERVER_ACCOUNT_UID
)) != 0) {
2577 r
= getpeercred(fd
, &ucred
);
2579 return varlink_server_log_errno(server
, r
, "Failed to acquire peer credentials of incoming socket, refusing: %m");
2581 ucred_acquired
= true;
2583 r
= validate_connection(server
, &ucred
);
2589 ucred_acquired
= false;
2591 r
= varlink_new(&v
);
2593 return varlink_server_log_errno(server
, r
, "Failed to allocate connection object: %m");
2595 r
= count_connection(server
, &ucred
);
2600 if (server
->flags
& VARLINK_SERVER_INHERIT_USERDATA
)
2601 v
->userdata
= server
->userdata
;
2603 if (ucred_acquired
) {
2605 v
->ucred_acquired
= true;
2608 _cleanup_free_
char *desc
= NULL
;
2609 if (asprintf(&desc
, "%s-%i", server
->description
?: "varlink", v
->fd
) >= 0)
2610 v
->description
= TAKE_PTR(desc
);
2612 /* Link up the server and the connection, and take reference in both directions. Note that the
2613 * reference on the connection is left dangling. It will be dropped when the connection is closed,
2614 * which happens in varlink_close(), including in the event loop quit callback. */
2615 v
->server
= varlink_server_ref(server
);
2618 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
2620 if (server
->event
) {
2621 r
= varlink_attach_event(v
, server
->event
, server
->event_priority
);
2623 varlink_log_errno(v
, r
, "Failed to attach new connection: %m");
2624 v
->fd
= -EBADF
; /* take the fd out of the connection again */
2636 static VarlinkServerSocket
*varlink_server_socket_free(VarlinkServerSocket
*ss
) {
2644 DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkServerSocket
*, varlink_server_socket_free
);
2646 static int connect_callback(sd_event_source
*source
, int fd
, uint32_t revents
, void *userdata
) {
2647 VarlinkServerSocket
*ss
= ASSERT_PTR(userdata
);
2648 _cleanup_close_
int cfd
= -EBADF
;
2654 varlink_server_log(ss
->server
, "New incoming connection.");
2656 cfd
= accept4(fd
, NULL
, NULL
, SOCK_NONBLOCK
|SOCK_CLOEXEC
);
2658 if (ERRNO_IS_ACCEPT_AGAIN(errno
))
2661 return varlink_server_log_errno(ss
->server
, errno
, "Failed to accept incoming socket: %m");
2664 r
= varlink_server_add_connection(ss
->server
, cfd
, &v
);
2670 if (ss
->server
->connect_callback
) {
2671 r
= ss
->server
->connect_callback(ss
->server
, v
, ss
->server
->userdata
);
2673 varlink_log_errno(v
, r
, "Connection callback returned error, disconnecting client: %m");
2682 static int varlink_server_create_listen_fd_socket(VarlinkServer
*s
, int fd
, VarlinkServerSocket
**ret_ss
) {
2683 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
2690 r
= fd_nonblock(fd
, true);
2694 ss
= new(VarlinkServerSocket
, 1);
2696 return log_oom_debug();
2698 *ss
= (VarlinkServerSocket
) {
2704 r
= sd_event_add_io(s
->event
, &ss
->event_source
, fd
, EPOLLIN
, connect_callback
, ss
);
2708 r
= sd_event_source_set_priority(ss
->event_source
, s
->event_priority
);
2713 *ret_ss
= TAKE_PTR(ss
);
2717 int varlink_server_listen_fd(VarlinkServer
*s
, int fd
) {
2718 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
2721 assert_return(s
, -EINVAL
);
2722 assert_return(fd
>= 0, -EBADF
);
2724 r
= varlink_server_create_listen_fd_socket(s
, fd
, &ss
);
2728 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));
2732 int varlink_server_listen_address(VarlinkServer
*s
, const char *address
, mode_t m
) {
2733 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
2734 union sockaddr_union sockaddr
;
2735 socklen_t sockaddr_len
;
2736 _cleanup_close_
int fd
= -EBADF
;
2739 assert_return(s
, -EINVAL
);
2740 assert_return(address
, -EINVAL
);
2741 assert_return((m
& ~0777) == 0, -EINVAL
);
2743 r
= sockaddr_un_set_path(&sockaddr
.un
, address
);
2748 fd
= socket(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0);
2752 fd
= fd_move_above_stdio(fd
);
2754 (void) sockaddr_un_unlink(&sockaddr
.un
);
2756 WITH_UMASK(~m
& 0777) {
2757 r
= mac_selinux_bind(fd
, &sockaddr
.sa
, sockaddr_len
);
2762 if (listen(fd
, SOMAXCONN_DELUXE
) < 0)
2765 r
= varlink_server_create_listen_fd_socket(s
, fd
, &ss
);
2769 r
= free_and_strdup(&ss
->address
, address
);
2773 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));
2778 void* varlink_server_set_userdata(VarlinkServer
*s
, void *userdata
) {
2781 assert_return(s
, NULL
);
2784 s
->userdata
= userdata
;
2789 void* varlink_server_get_userdata(VarlinkServer
*s
) {
2790 assert_return(s
, NULL
);
2795 static VarlinkServerSocket
* varlink_server_socket_destroy(VarlinkServerSocket
*ss
) {
2800 LIST_REMOVE(sockets
, ss
->server
->sockets
, ss
);
2802 sd_event_source_disable_unref(ss
->event_source
);
2810 int varlink_server_shutdown(VarlinkServer
*s
) {
2811 assert_return(s
, -EINVAL
);
2814 varlink_server_socket_destroy(s
->sockets
);
2819 static int varlink_server_add_socket_event_source(VarlinkServer
*s
, VarlinkServerSocket
*ss
, int64_t priority
) {
2820 _cleanup_(sd_event_source_unrefp
) sd_event_source
*es
= NULL
;
2827 assert(ss
->fd
>= 0);
2828 assert(!ss
->event_source
);
2830 r
= sd_event_add_io(s
->event
, &es
, ss
->fd
, EPOLLIN
, connect_callback
, ss
);
2834 r
= sd_event_source_set_priority(es
, priority
);
2838 ss
->event_source
= TAKE_PTR(es
);
2842 int varlink_server_attach_event(VarlinkServer
*s
, sd_event
*e
, int64_t priority
) {
2845 assert_return(s
, -EINVAL
);
2846 assert_return(!s
->event
, -EBUSY
);
2849 s
->event
= sd_event_ref(e
);
2851 r
= sd_event_default(&s
->event
);
2856 LIST_FOREACH(sockets
, ss
, s
->sockets
) {
2857 r
= varlink_server_add_socket_event_source(s
, ss
, priority
);
2862 s
->event_priority
= priority
;
2866 varlink_server_detach_event(s
);
2870 int varlink_server_detach_event(VarlinkServer
*s
) {
2871 assert_return(s
, -EINVAL
);
2873 LIST_FOREACH(sockets
, ss
, s
->sockets
)
2874 ss
->event_source
= sd_event_source_disable_unref(ss
->event_source
);
2876 sd_event_unref(s
->event
);
2880 sd_event
*varlink_server_get_event(VarlinkServer
*s
) {
2881 assert_return(s
, NULL
);
2886 int varlink_server_bind_method(VarlinkServer
*s
, const char *method
, VarlinkMethod callback
) {
2887 _cleanup_free_
char *m
= NULL
;
2890 assert_return(s
, -EINVAL
);
2891 assert_return(method
, -EINVAL
);
2892 assert_return(callback
, -EINVAL
);
2894 if (startswith(method
, "org.varlink.service."))
2895 return log_debug_errno(SYNTHETIC_ERRNO(EEXIST
), "Cannot bind server to '%s'.", method
);
2899 return log_oom_debug();
2901 r
= hashmap_ensure_put(&s
->methods
, &string_hash_ops
, m
, callback
);
2903 return log_oom_debug();
2905 return log_debug_errno(r
, "Failed to register callback: %m");
2912 int varlink_server_bind_method_many_internal(VarlinkServer
*s
, ...) {
2916 assert_return(s
, -EINVAL
);
2920 VarlinkMethod callback
;
2923 method
= va_arg(ap
, const char *);
2927 callback
= va_arg(ap
, VarlinkMethod
);
2929 r
= varlink_server_bind_method(s
, method
, callback
);
2938 int varlink_server_bind_connect(VarlinkServer
*s
, VarlinkConnect callback
) {
2939 assert_return(s
, -EINVAL
);
2941 if (callback
&& s
->connect_callback
&& callback
!= s
->connect_callback
)
2942 return log_debug_errno(SYNTHETIC_ERRNO(EBUSY
), "A different callback was already set.");
2944 s
->connect_callback
= callback
;
2948 int varlink_server_bind_disconnect(VarlinkServer
*s
, VarlinkDisconnect callback
) {
2949 assert_return(s
, -EINVAL
);
2951 if (callback
&& s
->disconnect_callback
&& callback
!= s
->disconnect_callback
)
2952 return log_debug_errno(SYNTHETIC_ERRNO(EBUSY
), "A different callback was already set.");
2954 s
->disconnect_callback
= callback
;
2958 unsigned varlink_server_connections_max(VarlinkServer
*s
) {
2961 /* If a server is specified, return the setting for that server, otherwise the default value */
2963 return s
->connections_max
;
2965 dts
= getdtablesize();
2968 /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
2969 if (VARLINK_DEFAULT_CONNECTIONS_MAX
> (unsigned) dts
/ 4 * 3)
2972 return VARLINK_DEFAULT_CONNECTIONS_MAX
;
2975 unsigned varlink_server_connections_per_uid_max(VarlinkServer
*s
) {
2979 return s
->connections_per_uid_max
;
2981 /* Make sure to never use up more than ¾th of available connections for a single user */
2982 m
= varlink_server_connections_max(NULL
);
2983 if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX
> m
)
2986 return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX
;
2989 int varlink_server_set_connections_per_uid_max(VarlinkServer
*s
, unsigned m
) {
2990 assert_return(s
, -EINVAL
);
2991 assert_return(m
> 0, -EINVAL
);
2993 s
->connections_per_uid_max
= m
;
2997 int varlink_server_set_connections_max(VarlinkServer
*s
, unsigned m
) {
2998 assert_return(s
, -EINVAL
);
2999 assert_return(m
> 0, -EINVAL
);
3001 s
->connections_max
= m
;
3005 unsigned varlink_server_current_connections(VarlinkServer
*s
) {
3006 assert_return(s
, UINT_MAX
);
3008 return s
->n_connections
;
3011 int varlink_server_set_description(VarlinkServer
*s
, const char *description
) {
3012 assert_return(s
, -EINVAL
);
3014 return free_and_strdup(&s
->description
, description
);
3017 int varlink_server_serialize(VarlinkServer
*s
, FILE *f
, FDSet
*fds
) {
3024 LIST_FOREACH(sockets
, ss
, s
->sockets
) {
3027 assert(ss
->address
);
3028 assert(ss
->fd
>= 0);
3030 fprintf(f
, "varlink-server-socket-address=%s", ss
->address
);
3032 /* If we fail to serialize the fd, it will be considered an error during deserialization */
3033 copy
= fdset_put_dup(fds
, ss
->fd
);
3037 fprintf(f
, " varlink-server-socket-fd=%i", copy
);
3045 int varlink_server_deserialize_one(VarlinkServer
*s
, const char *value
, FDSet
*fds
) {
3046 _cleanup_(varlink_server_socket_freep
) VarlinkServerSocket
*ss
= NULL
;
3047 _cleanup_free_
char *address
= NULL
;
3048 const char *v
= ASSERT_PTR(value
);
3056 n
= strcspn(v
, " ");
3057 address
= strndup(v
, n
);
3059 return log_oom_debug();
3062 return log_debug_errno(SYNTHETIC_ERRNO(EINVAL
),
3063 "Failed to deserialize VarlinkServerSocket: %s: %m", value
);
3064 v
= startswith(v
+ n
+ 1, "varlink-server-socket-fd=");
3066 return log_debug_errno(SYNTHETIC_ERRNO(EINVAL
),
3067 "Failed to deserialize VarlinkServerSocket fd %s: %m", value
);
3069 n
= strcspn(v
, " ");
3070 buf
= strndupa_safe(v
, n
);
3074 return log_debug_errno(fd
, "Unable to parse VarlinkServerSocket varlink-server-socket-fd=%s: %m", buf
);
3075 if (!fdset_contains(fds
, fd
))
3076 return log_debug_errno(SYNTHETIC_ERRNO(EBADF
),
3077 "VarlinkServerSocket varlink-server-socket-fd= has unknown fd %d: %m", fd
);
3079 ss
= new(VarlinkServerSocket
, 1);
3081 return log_oom_debug();
3083 *ss
= (VarlinkServerSocket
) {
3085 .address
= TAKE_PTR(address
),
3086 .fd
= fdset_remove(fds
, fd
),
3089 r
= varlink_server_add_socket_event_source(s
, ss
, SD_EVENT_PRIORITY_NORMAL
);
3091 return log_debug_errno(r
, "Failed to add VarlinkServerSocket event source to the event loop: %m");
3093 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));