1 /* SPDX-License-Identifier: LGPL-2.1+ */
5 #include "alloc-util.h"
6 #include "errno-util.h"
10 #include "process-util.h"
12 #include "socket-util.h"
13 #include "string-table.h"
14 #include "string-util.h"
16 #include "time-util.h"
17 #include "umask-util.h"
18 #include "user-util.h"
21 #define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
22 #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
24 #define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
25 #define VARLINK_BUFFER_MAX (16U*1024U*1024U)
26 #define VARLINK_READ_SIZE (64U*1024U)
28 typedef enum VarlinkState
{
29 /* Client side states */
31 VARLINK_AWAITING_REPLY
,
32 VARLINK_AWAITING_REPLY_MORE
,
35 VARLINK_PROCESSING_REPLY
,
37 /* Server side states */
39 VARLINK_PROCESSING_METHOD
,
40 VARLINK_PROCESSING_METHOD_MORE
,
41 VARLINK_PROCESSING_METHOD_ONEWAY
,
42 VARLINK_PROCESSED_METHOD
,
43 VARLINK_PENDING_METHOD
,
44 VARLINK_PENDING_METHOD_MORE
,
46 /* Common states (only during shutdown) */
47 VARLINK_PENDING_DISCONNECT
,
48 VARLINK_PENDING_TIMEOUT
,
49 VARLINK_PROCESSING_DISCONNECT
,
50 VARLINK_PROCESSING_TIMEOUT
,
51 VARLINK_PROCESSING_FAILURE
,
55 _VARLINK_STATE_INVALID
= -1
58 /* Tests whether we are not yet disconnected. Note that this is true during all states where the connection
59 * is still good for something, and false only when it's dead for good. This means: when we are
60 * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as
61 * the connection is still good, and we are likely to be able to properly operate on it soon. */
62 #define VARLINK_STATE_IS_ALIVE(state) \
64 VARLINK_IDLE_CLIENT, \
65 VARLINK_AWAITING_REPLY, \
66 VARLINK_AWAITING_REPLY_MORE, \
69 VARLINK_PROCESSING_REPLY, \
70 VARLINK_IDLE_SERVER, \
71 VARLINK_PROCESSING_METHOD, \
72 VARLINK_PROCESSING_METHOD_MORE, \
73 VARLINK_PROCESSING_METHOD_ONEWAY, \
74 VARLINK_PROCESSED_METHOD, \
75 VARLINK_PENDING_METHOD, \
76 VARLINK_PENDING_METHOD_MORE)
81 VarlinkServer
*server
;
84 bool connecting
; /* This boolean indicates whether the socket fd we are operating on is currently
85 * processing an asynchronous connect(). In that state we watch the socket for
86 * EPOLLOUT, but we refrain from calling read() or write() on the socket as that
87 * will trigger ENOTCONN. Note that this boolean is kept separate from the
88 * VarlinkState above on purpose: while the connect() is still not complete we
89 * already want to allow queuing of messages and similar. Thus it's nice to keep
90 * these two state concepts separate: the VarlinkState encodes what our own view of
91 * the connection is, i.e. whether we think it's a server, a client, and has
92 * something queued already, while 'connecting' tells us a detail about the
93 * transport used below, that should have no effect on how we otherwise accept and
94 * process operations from the user.
96 * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the
97 * connection is good to use, even if it might not be fully connected
98 * yet. connecting=true then informs you that actually we are still connecting, and
99 * the connection is actually not established yet and thus any requests you enqueue
100 * now will still work fine but will be queued only, not sent yet, but that
101 * shouldn't stop you from using the connection, since eventually whatever you queue
104 * Or to say this even differently: 'state' is a high-level ("application layer"
105 * high, if you so will) state, while 'conecting' is a low-level ("transport layer"
106 * low, if you so will) state, and while they are not entirely unrelated and
107 * sometimes propagate effects to each other they are only asynchronously connected
113 char *input_buffer
; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */
114 size_t input_buffer_allocated
;
115 size_t input_buffer_index
;
116 size_t input_buffer_size
;
117 size_t input_buffer_unscanned
;
119 char *output_buffer
; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */
120 size_t output_buffer_allocated
;
121 size_t output_buffer_index
;
122 size_t output_buffer_size
;
124 VarlinkReply reply_callback
;
126 JsonVariant
*current
;
130 bool ucred_acquired
:1;
132 bool write_disconnected
:1;
133 bool read_disconnected
:1;
134 bool prefer_read_write
:1;
144 sd_event_source
*io_event_source
;
145 sd_event_source
*time_event_source
;
146 sd_event_source
*quit_event_source
;
147 sd_event_source
*defer_event_source
;
150 typedef struct VarlinkServerSocket VarlinkServerSocket
;
152 struct VarlinkServerSocket
{
153 VarlinkServer
*server
;
158 sd_event_source
*event_source
;
160 LIST_FIELDS(VarlinkServerSocket
, sockets
);
163 struct VarlinkServer
{
165 VarlinkServerFlags flags
;
167 LIST_HEAD(VarlinkServerSocket
, sockets
);
170 VarlinkConnect connect_callback
;
173 int64_t event_priority
;
175 unsigned n_connections
;
181 unsigned connections_max
;
182 unsigned connections_per_uid_max
;
185 static const char* const varlink_state_table
[_VARLINK_STATE_MAX
] = {
186 [VARLINK_IDLE_CLIENT
] = "idle-client",
187 [VARLINK_AWAITING_REPLY
] = "awaiting-reply",
188 [VARLINK_AWAITING_REPLY_MORE
] = "awaiting-reply-more",
189 [VARLINK_CALLING
] = "calling",
190 [VARLINK_CALLED
] = "called",
191 [VARLINK_PROCESSING_REPLY
] = "processing-reply",
192 [VARLINK_IDLE_SERVER
] = "idle-server",
193 [VARLINK_PROCESSING_METHOD
] = "processing-method",
194 [VARLINK_PROCESSING_METHOD_MORE
] = "processing-method-more",
195 [VARLINK_PROCESSING_METHOD_ONEWAY
] = "processing-method-oneway",
196 [VARLINK_PROCESSED_METHOD
] = "processed-method",
197 [VARLINK_PENDING_METHOD
] = "pending-method",
198 [VARLINK_PENDING_METHOD_MORE
] = "pending-method-more",
199 [VARLINK_PENDING_DISCONNECT
] = "pending-disconnect",
200 [VARLINK_PENDING_TIMEOUT
] = "pending-timeout",
201 [VARLINK_PROCESSING_DISCONNECT
] = "processing-disconnect",
202 [VARLINK_PROCESSING_TIMEOUT
] = "processing-timeout",
203 [VARLINK_PROCESSING_FAILURE
] = "processing-failure",
204 [VARLINK_DISCONNECTED
] = "disconnected",
207 DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state
, VarlinkState
);
209 #define varlink_log_errno(v, error, fmt, ...) \
210 log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
212 #define varlink_log(v, fmt, ...) \
213 log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__)
215 #define varlink_server_log_errno(s, error, fmt, ...) \
216 log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
218 #define varlink_server_log(s, fmt, ...) \
219 log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
221 static inline const char *varlink_description(Varlink
*v
) {
222 return strna(v
? v
->description
: NULL
);
225 static inline const char *varlink_server_description(VarlinkServer
*s
) {
226 return strna(s
? s
->description
: NULL
);
229 static void varlink_set_state(Varlink
*v
, VarlinkState state
) {
231 assert(state
>= 0 && state
< _VARLINK_STATE_MAX
);
234 varlink_log(v
, "varlink: setting state %s",
235 varlink_state_to_string(state
));
237 varlink_log(v
, "varlink: changing state %s → %s",
238 varlink_state_to_string(v
->state
),
239 varlink_state_to_string(state
));
244 static int varlink_new(Varlink
**ret
) {
257 .state
= _VARLINK_STATE_INVALID
,
259 .ucred
.uid
= UID_INVALID
,
260 .ucred
.gid
= GID_INVALID
,
262 .timestamp
= USEC_INFINITY
,
263 .timeout
= VARLINK_DEFAULT_TIMEOUT_USEC
270 int varlink_connect_address(Varlink
**ret
, const char *address
) {
271 _cleanup_(varlink_unrefp
) Varlink
*v
= NULL
;
272 union sockaddr_union sockaddr
;
275 assert_return(ret
, -EINVAL
);
276 assert_return(address
, -EINVAL
);
278 r
= sockaddr_un_set_path(&sockaddr
.un
, address
);
286 v
->fd
= socket(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0);
290 v
->fd
= fd_move_above_stdio(v
->fd
);
292 if (connect(v
->fd
, &sockaddr
.sa
, SOCKADDR_UN_LEN(sockaddr
.un
)) < 0) {
293 if (!IN_SET(errno
, EAGAIN
, EINPROGRESS
))
296 v
->connecting
= true; /* We are asynchronously connecting, i.e. the connect() is being
297 * processed in the background. As long as that's the case the socket
298 * is in a special state: it's there, we can poll it for EPOLLOUT, but
299 * if we attempt to write() to it before we see EPOLLOUT we'll get
300 * ENOTCONN (and not EAGAIN, like we would for a normal connected
301 * socket that isn't writable at the moment). Since ENOTCONN on write()
302 * hence can mean two different things (i.e. connection not complete
303 * yet vs. already disconnected again), we store as a boolean whether
304 * we are still in connect(). */
307 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
313 int varlink_connect_fd(Varlink
**ret
, int fd
) {
317 assert_return(ret
, -EINVAL
);
318 assert_return(fd
>= 0, -EBADF
);
320 r
= fd_nonblock(fd
, true);
329 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
331 /* Note that if this function is called we assume the passed socket (if it is one) is already
332 * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that
333 * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket
334 * until the connection is fully set up. Behaviour here is hence a bit different from
335 * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
336 * avoid doing write() on it before we saw EPOLLOUT for the first time. */
342 static void varlink_detach_event_sources(Varlink
*v
) {
345 v
->io_event_source
= sd_event_source_disable_unref(v
->io_event_source
);
347 v
->time_event_source
= sd_event_source_disable_unref(v
->time_event_source
);
349 v
->quit_event_source
= sd_event_source_disable_unref(v
->quit_event_source
);
351 v
->defer_event_source
= sd_event_source_disable_unref(v
->defer_event_source
);
354 static void varlink_clear(Varlink
*v
) {
357 varlink_detach_event_sources(v
);
359 v
->fd
= safe_close(v
->fd
);
361 v
->input_buffer
= mfree(v
->input_buffer
);
362 v
->output_buffer
= mfree(v
->output_buffer
);
364 v
->current
= json_variant_unref(v
->current
);
365 v
->reply
= json_variant_unref(v
->reply
);
367 v
->event
= sd_event_unref(v
->event
);
370 static Varlink
* varlink_destroy(Varlink
*v
) {
374 /* If this is called the server object must already been unreffed here. Why that? because when we
375 * linked up the varlink connection with the server object we took one ref in each direction */
380 free(v
->description
);
384 DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink
, varlink
, varlink_destroy
);
386 static int varlink_test_disconnect(Varlink
*v
) {
389 /* Tests whether we the the connection has been terminated. We are careful to not stop processing it
390 * prematurely, since we want to handle half-open connections as well as possible and want to flush
391 * out and read data before we close down if we can. */
393 /* Already disconnected? */
394 if (!VARLINK_STATE_IS_ALIVE(v
->state
))
397 /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
401 /* Still something to write and we can write? Stay around */
402 if (v
->output_buffer_size
> 0 && !v
->write_disconnected
)
405 /* Both sides gone already? Then there's no need to stick around */
406 if (v
->read_disconnected
&& v
->write_disconnected
)
409 /* If we are waiting for incoming data but the read side is shut down, disconnect. */
410 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
) && v
->read_disconnected
)
413 /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
414 * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
415 * being down if we never wrote anything. */
416 if (IN_SET(v
->state
, VARLINK_IDLE_CLIENT
) && (v
->write_disconnected
|| v
->got_pollhup
))
422 varlink_set_state(v
, VARLINK_PENDING_DISCONNECT
);
426 static int varlink_write(Varlink
*v
) {
431 if (!VARLINK_STATE_IS_ALIVE(v
->state
))
433 if (v
->connecting
) /* Writing while we are still wait for a non-blocking connect() to complete will
434 * result in ENOTCONN, hence exit early here */
436 if (v
->output_buffer_size
== 0)
438 if (v
->write_disconnected
)
443 /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
444 * with non-socket IO, hence fall back automatically */
445 if (!v
->prefer_read_write
) {
446 n
= send(v
->fd
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
, MSG_DONTWAIT
|MSG_NOSIGNAL
);
447 if (n
< 0 && errno
== ENOTSOCK
)
448 v
->prefer_read_write
= true;
450 if (v
->prefer_read_write
)
451 n
= write(v
->fd
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
);
456 if (ERRNO_IS_DISCONNECT(errno
)) {
457 /* If we get informed about a disconnect on write, then let's remember that, but not
458 * act on it just yet. Let's wait for read() to report the issue first. */
459 v
->write_disconnected
= true;
466 v
->output_buffer_size
-= n
;
468 if (v
->output_buffer_size
== 0)
469 v
->output_buffer_index
= 0;
471 v
->output_buffer_index
+= n
;
473 v
->timestamp
= now(CLOCK_MONOTONIC
);
477 static int varlink_read(Varlink
*v
) {
483 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
))
485 if (v
->connecting
) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
489 if (v
->input_buffer_unscanned
> 0)
491 if (v
->read_disconnected
)
494 if (v
->input_buffer_size
>= VARLINK_BUFFER_MAX
)
499 if (v
->input_buffer_allocated
<= v
->input_buffer_index
+ v
->input_buffer_size
) {
502 add
= MIN(VARLINK_BUFFER_MAX
- v
->input_buffer_size
, VARLINK_READ_SIZE
);
504 if (v
->input_buffer_index
== 0) {
506 if (!GREEDY_REALLOC(v
->input_buffer
, v
->input_buffer_allocated
, v
->input_buffer_size
+ add
))
512 b
= new(char, v
->input_buffer_size
+ add
);
516 memcpy(b
, v
->input_buffer
+ v
->input_buffer_index
, v
->input_buffer_size
);
518 free_and_replace(v
->input_buffer
, b
);
520 v
->input_buffer_allocated
= v
->input_buffer_size
+ add
;
521 v
->input_buffer_index
= 0;
525 rs
= v
->input_buffer_allocated
- (v
->input_buffer_index
+ v
->input_buffer_size
);
527 if (!v
->prefer_read_write
) {
528 n
= recv(v
->fd
, v
->input_buffer
+ v
->input_buffer_index
+ v
->input_buffer_size
, rs
, MSG_DONTWAIT
);
529 if (n
< 0 && errno
== ENOTSOCK
)
530 v
->prefer_read_write
= true;
532 if (v
->prefer_read_write
)
533 n
= read(v
->fd
, v
->input_buffer
+ v
->input_buffer_index
+ v
->input_buffer_size
, rs
);
538 if (ERRNO_IS_DISCONNECT(errno
)) {
539 v
->read_disconnected
= true;
545 if (n
== 0) { /* EOF */
546 v
->read_disconnected
= true;
550 v
->input_buffer_size
+= n
;
551 v
->input_buffer_unscanned
+= n
;
556 static int varlink_parse_message(Varlink
*v
) {
557 const char *e
, *begin
;
565 if (v
->input_buffer_unscanned
<= 0)
568 assert(v
->input_buffer_unscanned
<= v
->input_buffer_size
);
569 assert(v
->input_buffer_index
+ v
->input_buffer_size
<= v
->input_buffer_allocated
);
571 begin
= v
->input_buffer
+ v
->input_buffer_index
;
573 e
= memchr(begin
+ v
->input_buffer_size
- v
->input_buffer_unscanned
, 0, v
->input_buffer_unscanned
);
575 v
->input_buffer_unscanned
= 0;
581 varlink_log(v
, "New incoming message: %s", begin
);
583 r
= json_parse(begin
, &v
->current
, NULL
, NULL
);
587 v
->input_buffer_size
-= sz
;
589 if (v
->input_buffer_size
== 0)
590 v
->input_buffer_index
= 0;
592 v
->input_buffer_index
+= sz
;
594 v
->input_buffer_unscanned
= v
->input_buffer_size
;
598 static int varlink_test_timeout(Varlink
*v
) {
601 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
))
603 if (v
->timeout
== USEC_INFINITY
)
606 if (now(CLOCK_MONOTONIC
) < usec_add(v
->timestamp
, v
->timeout
))
609 varlink_set_state(v
, VARLINK_PENDING_TIMEOUT
);
614 static int varlink_dispatch_local_error(Varlink
*v
, const char *error
) {
620 if (!v
->reply_callback
)
623 r
= v
->reply_callback(v
, NULL
, error
, VARLINK_REPLY_ERROR
|VARLINK_REPLY_LOCAL
, v
->userdata
);
625 log_debug_errno(r
, "Reply callback returned error, ignoring: %m");
630 static int varlink_dispatch_timeout(Varlink
*v
) {
633 if (v
->state
!= VARLINK_PENDING_TIMEOUT
)
636 varlink_set_state(v
, VARLINK_PROCESSING_TIMEOUT
);
637 varlink_dispatch_local_error(v
, VARLINK_ERROR_TIMEOUT
);
643 static int varlink_dispatch_disconnect(Varlink
*v
) {
646 if (v
->state
!= VARLINK_PENDING_DISCONNECT
)
649 varlink_set_state(v
, VARLINK_PROCESSING_DISCONNECT
);
650 varlink_dispatch_local_error(v
, VARLINK_ERROR_DISCONNECTED
);
656 static int varlink_sanitize_parameters(JsonVariant
**v
) {
659 /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */
661 return json_variant_new_object(v
, NULL
, 0);
662 else if (!json_variant_is_object(*v
))
668 static int varlink_dispatch_reply(Varlink
*v
) {
669 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
670 VarlinkReplyFlags flags
= 0;
671 const char *error
= NULL
;
678 if (!IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
))
683 assert(v
->n_pending
> 0);
685 if (!json_variant_is_object(v
->current
))
688 JSON_VARIANT_OBJECT_FOREACH(k
, e
, v
->current
) {
690 if (streq(k
, "error")) {
693 if (!json_variant_is_string(e
))
696 error
= json_variant_string(e
);
697 flags
|= VARLINK_REPLY_ERROR
;
699 } else if (streq(k
, "parameters")) {
702 if (!json_variant_is_object(e
))
705 parameters
= json_variant_ref(e
);
707 } else if (streq(k
, "continues")) {
708 if (FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
711 if (!json_variant_is_boolean(e
))
714 if (json_variant_boolean(e
))
715 flags
|= VARLINK_REPLY_CONTINUES
;
720 /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
721 if (v
->state
!= VARLINK_AWAITING_REPLY_MORE
&& FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
724 /* An error is final */
725 if (error
&& FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
728 r
= varlink_sanitize_parameters(¶meters
);
732 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
)) {
733 varlink_set_state(v
, VARLINK_PROCESSING_REPLY
);
735 if (v
->reply_callback
) {
736 r
= v
->reply_callback(v
, parameters
, error
, flags
, v
->userdata
);
738 log_debug_errno(r
, "Reply callback returned error, ignoring: %m");
741 v
->current
= json_variant_unref(v
->current
);
743 if (v
->state
== VARLINK_PROCESSING_REPLY
) {
745 assert(v
->n_pending
> 0);
747 if (!FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
))
751 FLAGS_SET(flags
, VARLINK_REPLY_CONTINUES
) ? VARLINK_AWAITING_REPLY_MORE
:
752 v
->n_pending
== 0 ? VARLINK_IDLE_CLIENT
: VARLINK_AWAITING_REPLY
);
755 assert(v
->state
== VARLINK_CALLING
);
756 varlink_set_state(v
, VARLINK_CALLED
);
762 varlink_set_state(v
, VARLINK_PROCESSING_FAILURE
);
763 varlink_dispatch_local_error(v
, VARLINK_ERROR_PROTOCOL
);
769 static int varlink_dispatch_method(Varlink
*v
) {
770 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
771 VarlinkMethodFlags flags
= 0;
772 const char *method
= NULL
, *error
;
774 VarlinkMethod callback
;
780 if (v
->state
!= VARLINK_IDLE_SERVER
)
785 if (!json_variant_is_object(v
->current
))
788 JSON_VARIANT_OBJECT_FOREACH(k
, e
, v
->current
) {
790 if (streq(k
, "method")) {
793 if (!json_variant_is_string(e
))
796 method
= json_variant_string(e
);
798 } else if (streq(k
, "parameters")) {
801 if (!json_variant_is_object(e
))
804 parameters
= json_variant_ref(e
);
806 } else if (streq(k
, "oneway")) {
808 if ((flags
& (VARLINK_METHOD_ONEWAY
|VARLINK_METHOD_MORE
)) != 0)
811 if (!json_variant_is_boolean(e
))
814 if (json_variant_boolean(e
))
815 flags
|= VARLINK_METHOD_ONEWAY
;
817 } else if (streq(k
, "more")) {
819 if ((flags
& (VARLINK_METHOD_ONEWAY
|VARLINK_METHOD_MORE
)) != 0)
822 if (!json_variant_is_boolean(e
))
825 if (json_variant_boolean(e
))
826 flags
|= VARLINK_METHOD_MORE
;
835 r
= varlink_sanitize_parameters(¶meters
);
839 varlink_set_state(v
, (flags
& VARLINK_METHOD_MORE
) ? VARLINK_PROCESSING_METHOD_MORE
:
840 (flags
& VARLINK_METHOD_ONEWAY
) ? VARLINK_PROCESSING_METHOD_ONEWAY
:
841 VARLINK_PROCESSING_METHOD
);
845 if (STR_IN_SET(method
, "org.varlink.service.GetInfo", "org.varlink.service.GetInterface")) {
846 /* For now, we don't implement a single of varlink's own methods */
848 error
= VARLINK_ERROR_METHOD_NOT_IMPLEMENTED
;
849 } else if (startswith(method
, "org.varlink.service.")) {
851 error
= VARLINK_ERROR_METHOD_NOT_FOUND
;
853 callback
= hashmap_get(v
->server
->methods
, method
);
854 error
= VARLINK_ERROR_METHOD_NOT_FOUND
;
858 r
= callback(v
, parameters
, flags
, v
->userdata
);
860 log_debug_errno(r
, "Callback for %s returned error: %m", method
);
862 /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */
863 if (!FLAGS_SET(flags
, VARLINK_METHOD_ONEWAY
)) {
864 r
= varlink_errorb(v
, VARLINK_ERROR_SYSTEM
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(-r
))));
869 } else if (!FLAGS_SET(flags
, VARLINK_METHOD_ONEWAY
)) {
872 r
= varlink_errorb(v
, error
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
))));
879 case VARLINK_PROCESSED_METHOD
: /* Method call is fully processed */
880 case VARLINK_PROCESSING_METHOD_ONEWAY
: /* ditto */
881 v
->current
= json_variant_unref(v
->current
);
882 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
885 case VARLINK_PROCESSING_METHOD
: /* Method call wasn't replied to, will be replied to later */
886 varlink_set_state(v
, VARLINK_PENDING_METHOD
);
889 case VARLINK_PROCESSING_METHOD_MORE
: /* No reply for a "more" message was sent, more to come */
890 varlink_set_state(v
, VARLINK_PENDING_METHOD_MORE
);
894 assert_not_reached("Unexpected state");
904 varlink_set_state(v
, VARLINK_PROCESSING_FAILURE
);
905 varlink_dispatch_local_error(v
, VARLINK_ERROR_PROTOCOL
);
911 int varlink_process(Varlink
*v
) {
914 assert_return(v
, -EINVAL
);
916 if (v
->state
== VARLINK_DISCONNECTED
)
921 r
= varlink_write(v
);
925 r
= varlink_dispatch_reply(v
);
929 r
= varlink_dispatch_method(v
);
933 r
= varlink_parse_message(v
);
941 r
= varlink_test_disconnect(v
);
945 r
= varlink_dispatch_disconnect(v
);
949 r
= varlink_test_timeout(v
);
953 r
= varlink_dispatch_timeout(v
);
958 if (r
>= 0 && v
->defer_event_source
) {
961 /* If we did some processing, make sure we are called again soon */
962 q
= sd_event_source_set_enabled(v
->defer_event_source
, r
> 0 ? SD_EVENT_ON
: SD_EVENT_OFF
);
968 if (VARLINK_STATE_IS_ALIVE(v
->state
))
969 /* Initiate disconnection */
970 varlink_set_state(v
, VARLINK_PENDING_DISCONNECT
);
972 /* We failed while disconnecting, in that case close right away */
980 static void handle_revents(Varlink
*v
, int revents
) {
984 /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
985 * to complete on, we know we are ready. We don't read the connection error here though,
986 * we'll get the error on the next read() or write(). */
987 if ((revents
& (POLLOUT
|POLLHUP
)) == 0)
990 varlink_log(v
, "Anynchronous connection completed.");
991 v
->connecting
= false;
993 /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
994 * what we can. However, we do care about POLLHUP to detect connection termination even if we
995 * momentarily don't want to read nor write anything. */
997 if (!FLAGS_SET(revents
, POLLHUP
))
1000 varlink_log(v
, "Got POLLHUP from socket.");
1001 v
->got_pollhup
= true;
1005 int varlink_wait(Varlink
*v
, usec_t timeout
) {
1011 assert_return(v
, -EINVAL
);
1013 if (v
->state
== VARLINK_DISCONNECTED
)
1016 r
= varlink_get_timeout(v
, &t
);
1019 if (t
!= USEC_INFINITY
) {
1022 n
= now(CLOCK_MONOTONIC
);
1026 t
= usec_sub_unsigned(t
, n
);
1029 if (timeout
!= USEC_INFINITY
&&
1030 (t
== USEC_INFINITY
|| timeout
< t
))
1033 fd
= varlink_get_fd(v
);
1037 events
= varlink_get_events(v
);
1041 pfd
= (struct pollfd
) {
1047 t
== USEC_INFINITY
? NULL
: timespec_store(&ts
, t
),
1052 handle_revents(v
, pfd
.revents
);
1054 return r
> 0 ? 1 : 0;
1057 int varlink_get_fd(Varlink
*v
) {
1059 assert_return(v
, -EINVAL
);
1061 if (v
->state
== VARLINK_DISCONNECTED
)
1069 int varlink_get_events(Varlink
*v
) {
1072 assert_return(v
, -EINVAL
);
1074 if (v
->state
== VARLINK_DISCONNECTED
)
1077 if (v
->connecting
) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
1078 * tells us that the connection is now complete. Before that we should neither
1079 * write() or read() from the fd. */
1082 if (!v
->read_disconnected
&&
1083 IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
, VARLINK_IDLE_SERVER
) &&
1085 v
->input_buffer_unscanned
<= 0)
1088 if (!v
->write_disconnected
&&
1089 v
->output_buffer_size
> 0)
1095 int varlink_get_timeout(Varlink
*v
, usec_t
*ret
) {
1096 assert_return(v
, -EINVAL
);
1098 if (v
->state
== VARLINK_DISCONNECTED
)
1101 if (IN_SET(v
->state
, VARLINK_AWAITING_REPLY
, VARLINK_AWAITING_REPLY_MORE
, VARLINK_CALLING
) &&
1102 v
->timeout
!= USEC_INFINITY
) {
1104 *ret
= usec_add(v
->timestamp
, v
->timeout
);
1108 *ret
= USEC_INFINITY
;
1113 int varlink_flush(Varlink
*v
) {
1116 assert_return(v
, -EINVAL
);
1118 if (v
->state
== VARLINK_DISCONNECTED
)
1124 if (v
->output_buffer_size
== 0)
1126 if (v
->write_disconnected
)
1129 r
= varlink_write(v
);
1137 pfd
= (struct pollfd
) {
1142 if (poll(&pfd
, 1, -1) < 0)
1145 handle_revents(v
, pfd
.revents
);
1151 static void varlink_detach_server(Varlink
*v
) {
1157 if (v
->server
->by_uid
&&
1158 v
->ucred_acquired
&&
1159 uid_is_valid(v
->ucred
.uid
)) {
1162 c
= PTR_TO_UINT(hashmap_get(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
)));
1166 (void) hashmap_remove(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
));
1168 (void) hashmap_replace(v
->server
->by_uid
, UID_TO_PTR(v
->ucred
.uid
), UINT_TO_PTR(c
- 1));
1171 assert(v
->server
->n_connections
> 0);
1172 v
->server
->n_connections
--;
1174 /* If this is a connection associated to a server, then let's disconnect the server and the
1175 * connection from each other. This drops the dangling reference that connect_callback() set up. */
1176 v
->server
= varlink_server_unref(v
->server
);
1180 int varlink_close(Varlink
*v
) {
1182 assert_return(v
, -EINVAL
);
1184 if (v
->state
== VARLINK_DISCONNECTED
)
1187 varlink_set_state(v
, VARLINK_DISCONNECTED
);
1189 /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref
1190 * which would destroy us before we can call varlink_clear() */
1192 varlink_detach_server(v
);
1199 Varlink
* varlink_flush_close_unref(Varlink
*v
) {
1204 (void) varlink_flush(v
);
1205 (void) varlink_close(v
);
1207 return varlink_unref(v
);
1210 static int varlink_enqueue_json(Varlink
*v
, JsonVariant
*m
) {
1211 _cleanup_free_
char *text
= NULL
;
1217 r
= json_variant_format(m
, 0, &text
);
1220 assert(text
[r
] == '\0');
1222 if (v
->output_buffer_size
+ r
+ 1 > VARLINK_BUFFER_MAX
)
1225 varlink_log(v
, "Sending message: %s", text
);
1227 if (v
->output_buffer_size
== 0) {
1229 free_and_replace(v
->output_buffer
, text
);
1231 v
->output_buffer_size
= v
->output_buffer_allocated
= r
+ 1;
1232 v
->output_buffer_index
= 0;
1234 } else if (v
->output_buffer_index
== 0) {
1236 if (!GREEDY_REALLOC(v
->output_buffer
, v
->output_buffer_allocated
, v
->output_buffer_size
+ r
+ 1))
1239 memcpy(v
->output_buffer
+ v
->output_buffer_size
, text
, r
+ 1);
1240 v
->output_buffer_size
+= r
+ 1;
1244 const size_t new_size
= v
->output_buffer_size
+ r
+ 1;
1246 n
= new(char, new_size
);
1250 memcpy(mempcpy(n
, v
->output_buffer
+ v
->output_buffer_index
, v
->output_buffer_size
), text
, r
+ 1);
1252 free_and_replace(v
->output_buffer
, n
);
1253 v
->output_buffer_allocated
= v
->output_buffer_size
= new_size
;
1254 v
->output_buffer_index
= 0;
1260 int varlink_send(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1261 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1264 assert_return(v
, -EINVAL
);
1265 assert_return(method
, -EINVAL
);
1267 if (v
->state
== VARLINK_DISCONNECTED
)
1270 /* We allow enqueuing multiple method calls at once! */
1271 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
, VARLINK_AWAITING_REPLY
))
1274 r
= varlink_sanitize_parameters(¶meters
);
1278 r
= json_build(&m
, JSON_BUILD_OBJECT(
1279 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1280 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1281 JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true))));
1285 r
= varlink_enqueue_json(v
, m
);
1289 /* No state change here, this is one-way only after all */
1290 v
->timestamp
= now(CLOCK_MONOTONIC
);
1294 int varlink_sendb(Varlink
*v
, const char *method
, ...) {
1295 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1299 assert_return(v
, -EINVAL
);
1301 va_start(ap
, method
);
1302 r
= json_buildv(¶meters
, ap
);
1308 return varlink_send(v
, method
, parameters
);
1311 int varlink_invoke(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1312 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1315 assert_return(v
, -EINVAL
);
1316 assert_return(method
, -EINVAL
);
1318 if (v
->state
== VARLINK_DISCONNECTED
)
1321 /* We allow enqueing multiple method calls at once! */
1322 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
, VARLINK_AWAITING_REPLY
))
1325 r
= varlink_sanitize_parameters(¶meters
);
1329 r
= json_build(&m
, JSON_BUILD_OBJECT(
1330 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1331 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1335 r
= varlink_enqueue_json(v
, m
);
1339 varlink_set_state(v
, VARLINK_AWAITING_REPLY
);
1341 v
->timestamp
= now(CLOCK_MONOTONIC
);
1346 int varlink_invokeb(Varlink
*v
, const char *method
, ...) {
1347 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1351 assert_return(v
, -EINVAL
);
1353 va_start(ap
, method
);
1354 r
= json_buildv(¶meters
, ap
);
1360 return varlink_invoke(v
, method
, parameters
);
1363 int varlink_observe(Varlink
*v
, const char *method
, JsonVariant
*parameters
) {
1364 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1367 assert_return(v
, -EINVAL
);
1368 assert_return(method
, -EINVAL
);
1370 if (v
->state
== VARLINK_DISCONNECTED
)
1372 /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
1373 * thus insist on an idle client here. */
1374 if (v
->state
!= VARLINK_IDLE_CLIENT
)
1377 r
= varlink_sanitize_parameters(¶meters
);
1381 r
= json_build(&m
, JSON_BUILD_OBJECT(
1382 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1383 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1384 JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
1388 r
= varlink_enqueue_json(v
, m
);
1393 varlink_set_state(v
, VARLINK_AWAITING_REPLY_MORE
);
1395 v
->timestamp
= now(CLOCK_MONOTONIC
);
1400 int varlink_observeb(Varlink
*v
, const char *method
, ...) {
1401 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1405 assert_return(v
, -EINVAL
);
1407 va_start(ap
, method
);
1408 r
= json_buildv(¶meters
, ap
);
1414 return varlink_observe(v
, method
, parameters
);
1420 JsonVariant
*parameters
,
1421 JsonVariant
**ret_parameters
,
1422 const char **ret_error_id
,
1423 VarlinkReplyFlags
*ret_flags
) {
1425 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1428 assert_return(v
, -EINVAL
);
1429 assert_return(method
, -EINVAL
);
1431 if (v
->state
== VARLINK_DISCONNECTED
)
1433 if (!IN_SET(v
->state
, VARLINK_IDLE_CLIENT
))
1436 assert(v
->n_pending
== 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
1438 r
= varlink_sanitize_parameters(¶meters
);
1442 r
= json_build(&m
, JSON_BUILD_OBJECT(
1443 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method
)),
1444 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1448 r
= varlink_enqueue_json(v
, m
);
1452 varlink_set_state(v
, VARLINK_CALLING
);
1454 v
->timestamp
= now(CLOCK_MONOTONIC
);
1456 while (v
->state
== VARLINK_CALLING
) {
1458 r
= varlink_process(v
);
1464 r
= varlink_wait(v
, USEC_INFINITY
);
1471 case VARLINK_CALLED
:
1474 json_variant_unref(v
->reply
);
1475 v
->reply
= TAKE_PTR(v
->current
);
1477 varlink_set_state(v
, VARLINK_IDLE_CLIENT
);
1478 assert(v
->n_pending
== 1);
1482 *ret_parameters
= json_variant_by_key(v
->reply
, "parameters");
1484 *ret_error_id
= json_variant_string(json_variant_by_key(v
->reply
, "error"));
1490 case VARLINK_PENDING_DISCONNECT
:
1491 case VARLINK_DISCONNECTED
:
1494 case VARLINK_PENDING_TIMEOUT
:
1498 assert_not_reached("Unexpected state after method call.");
1505 JsonVariant
**ret_parameters
,
1506 const char **ret_error_id
,
1507 VarlinkReplyFlags
*ret_flags
, ...) {
1509 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1513 assert_return(v
, -EINVAL
);
1515 va_start(ap
, ret_flags
);
1516 r
= json_buildv(¶meters
, ap
);
1522 return varlink_call(v
, method
, parameters
, ret_parameters
, ret_error_id
, ret_flags
);
1525 int varlink_reply(Varlink
*v
, JsonVariant
*parameters
) {
1526 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1529 assert_return(v
, -EINVAL
);
1531 if (v
->state
== VARLINK_DISCONNECTED
)
1533 if (!IN_SET(v
->state
,
1534 VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
,
1535 VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
))
1538 r
= varlink_sanitize_parameters(¶meters
);
1542 r
= json_build(&m
, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1546 r
= varlink_enqueue_json(v
, m
);
1550 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
)) {
1551 /* We just replied to a method call that was let hanging for a while (i.e. we were outside of
1552 * the varlink_dispatch_method() stack frame), which means with this reply we are ready to
1553 * process further messages. */
1554 v
->current
= json_variant_unref(v
->current
);
1555 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
1557 /* We replied to a method call from within the varlink_dispatch_method() stack frame), which
1558 * means we should it handle the rest of the state engine. */
1559 varlink_set_state(v
, VARLINK_PROCESSED_METHOD
);
1564 int varlink_replyb(Varlink
*v
, ...) {
1565 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1569 assert_return(v
, -EINVAL
);
1572 r
= json_buildv(¶meters
, ap
);
1578 return varlink_reply(v
, parameters
);
1581 int varlink_error(Varlink
*v
, const char *error_id
, JsonVariant
*parameters
) {
1582 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1585 assert_return(v
, -EINVAL
);
1586 assert_return(error_id
, -EINVAL
);
1588 if (v
->state
== VARLINK_DISCONNECTED
)
1590 if (!IN_SET(v
->state
,
1591 VARLINK_PROCESSING_METHOD
, VARLINK_PROCESSING_METHOD_MORE
,
1592 VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
))
1595 r
= varlink_sanitize_parameters(¶meters
);
1599 r
= json_build(&m
, JSON_BUILD_OBJECT(
1600 JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id
)),
1601 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
))));
1605 r
= varlink_enqueue_json(v
, m
);
1609 if (IN_SET(v
->state
, VARLINK_PENDING_METHOD
, VARLINK_PENDING_METHOD_MORE
)) {
1610 v
->current
= json_variant_unref(v
->current
);
1611 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
1613 varlink_set_state(v
, VARLINK_PROCESSED_METHOD
);
1618 int varlink_errorb(Varlink
*v
, const char *error_id
, ...) {
1619 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1623 assert_return(v
, -EINVAL
);
1624 assert_return(error_id
, -EINVAL
);
1626 va_start(ap
, error_id
);
1627 r
= json_buildv(¶meters
, ap
);
1633 return varlink_error(v
, error_id
, parameters
);
1636 int varlink_error_invalid_parameter(Varlink
*v
, JsonVariant
*parameters
) {
1638 assert_return(v
, -EINVAL
);
1639 assert_return(parameters
, -EINVAL
);
1641 /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which
1642 * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object
1643 * variant in which case we'll pull out the first key. The latter mode is useful in functions that
1644 * don't expect any arguments. */
1646 if (json_variant_is_string(parameters
))
1647 return varlink_error(v
, VARLINK_ERROR_INVALID_PARAMETER
, parameters
);
1649 if (json_variant_is_object(parameters
) &&
1650 json_variant_elements(parameters
) > 0)
1651 return varlink_error(v
, VARLINK_ERROR_INVALID_PARAMETER
,
1652 json_variant_by_index(parameters
, 0));
1657 int varlink_notify(Varlink
*v
, JsonVariant
*parameters
) {
1658 _cleanup_(json_variant_unrefp
) JsonVariant
*m
= NULL
;
1661 assert_return(v
, -EINVAL
);
1663 if (v
->state
== VARLINK_DISCONNECTED
)
1665 if (!IN_SET(v
->state
, VARLINK_PROCESSING_METHOD_MORE
, VARLINK_PENDING_METHOD_MORE
))
1668 r
= varlink_sanitize_parameters(¶meters
);
1672 r
= json_build(&m
, JSON_BUILD_OBJECT(
1673 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters
)),
1674 JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true))));
1678 r
= varlink_enqueue_json(v
, m
);
1682 /* No state change, as more is coming */
1686 int varlink_notifyb(Varlink
*v
, ...) {
1687 _cleanup_(json_variant_unrefp
) JsonVariant
*parameters
= NULL
;
1691 assert_return(v
, -EINVAL
);
1694 r
= json_buildv(¶meters
, ap
);
1700 return varlink_notify(v
, parameters
);
1703 int varlink_bind_reply(Varlink
*v
, VarlinkReply callback
) {
1704 assert_return(v
, -EINVAL
);
1706 if (callback
&& v
->reply_callback
&& callback
!= v
->reply_callback
)
1709 v
->reply_callback
= callback
;
1714 void* varlink_set_userdata(Varlink
*v
, void *userdata
) {
1717 assert_return(v
, NULL
);
1720 v
->userdata
= userdata
;
1725 void* varlink_get_userdata(Varlink
*v
) {
1726 assert_return(v
, NULL
);
1731 static int varlink_acquire_ucred(Varlink
*v
) {
1736 if (v
->ucred_acquired
)
1739 r
= getpeercred(v
->fd
, &v
->ucred
);
1743 v
->ucred_acquired
= true;
1747 int varlink_get_peer_uid(Varlink
*v
, uid_t
*ret
) {
1750 assert_return(v
, -EINVAL
);
1751 assert_return(ret
, -EINVAL
);
1753 r
= varlink_acquire_ucred(v
);
1757 if (!uid_is_valid(v
->ucred
.uid
))
1760 *ret
= v
->ucred
.uid
;
1764 int varlink_get_peer_pid(Varlink
*v
, pid_t
*ret
) {
1767 assert_return(v
, -EINVAL
);
1768 assert_return(ret
, -EINVAL
);
1770 r
= varlink_acquire_ucred(v
);
1774 if (!pid_is_valid(v
->ucred
.pid
))
1777 *ret
= v
->ucred
.pid
;
1781 int varlink_set_relative_timeout(Varlink
*v
, usec_t timeout
) {
1782 assert_return(v
, -EINVAL
);
1783 assert_return(timeout
> 0, -EINVAL
);
1785 v
->timeout
= timeout
;
1789 VarlinkServer
*varlink_get_server(Varlink
*v
) {
1790 assert_return(v
, NULL
);
1795 int varlink_set_description(Varlink
*v
, const char *description
) {
1796 assert_return(v
, -EINVAL
);
1798 return free_and_strdup(&v
->description
, description
);
1801 static int io_callback(sd_event_source
*s
, int fd
, uint32_t revents
, void *userdata
) {
1802 Varlink
*v
= userdata
;
1807 handle_revents(v
, revents
);
1808 (void) varlink_process(v
);
1813 static int time_callback(sd_event_source
*s
, uint64_t usec
, void *userdata
) {
1814 Varlink
*v
= userdata
;
1819 (void) varlink_process(v
);
1823 static int defer_callback(sd_event_source
*s
, void *userdata
) {
1824 Varlink
*v
= userdata
;
1829 (void) varlink_process(v
);
1833 static int prepare_callback(sd_event_source
*s
, void *userdata
) {
1834 Varlink
*v
= userdata
;
1841 e
= varlink_get_events(v
);
1845 r
= sd_event_source_set_io_events(v
->io_event_source
, e
);
1849 r
= varlink_get_timeout(v
, &until
);
1853 r
= sd_event_source_set_time(v
->time_event_source
, until
);
1858 r
= sd_event_source_set_enabled(v
->time_event_source
, r
> 0 ? SD_EVENT_ON
: SD_EVENT_OFF
);
1865 static int quit_callback(sd_event_source
*event
, void *userdata
) {
1866 Varlink
*v
= userdata
;
1877 int varlink_attach_event(Varlink
*v
, sd_event
*e
, int64_t priority
) {
1880 assert_return(v
, -EINVAL
);
1881 assert_return(!v
->event
, -EBUSY
);
1884 v
->event
= sd_event_ref(e
);
1886 r
= sd_event_default(&v
->event
);
1891 r
= sd_event_add_time(v
->event
, &v
->time_event_source
, CLOCK_MONOTONIC
, 0, 0, time_callback
, v
);
1895 r
= sd_event_source_set_priority(v
->time_event_source
, priority
);
1899 (void) sd_event_source_set_description(v
->time_event_source
, "varlink-time");
1901 r
= sd_event_add_exit(v
->event
, &v
->quit_event_source
, quit_callback
, v
);
1905 r
= sd_event_source_set_priority(v
->quit_event_source
, priority
);
1909 (void) sd_event_source_set_description(v
->quit_event_source
, "varlink-quit");
1911 r
= sd_event_add_io(v
->event
, &v
->io_event_source
, v
->fd
, 0, io_callback
, v
);
1915 r
= sd_event_source_set_prepare(v
->io_event_source
, prepare_callback
);
1919 r
= sd_event_source_set_priority(v
->io_event_source
, priority
);
1923 (void) sd_event_source_set_description(v
->io_event_source
, "varlink-io");
1925 r
= sd_event_add_defer(v
->event
, &v
->defer_event_source
, defer_callback
, v
);
1929 r
= sd_event_source_set_priority(v
->defer_event_source
, priority
);
1933 (void) sd_event_source_set_description(v
->defer_event_source
, "varlink-defer");
1938 varlink_detach_event(v
);
1942 void varlink_detach_event(Varlink
*v
) {
1946 varlink_detach_event_sources(v
);
1948 v
->event
= sd_event_unref(v
->event
);
1951 sd_event
*varlink_get_event(Varlink
*v
) {
1952 assert_return(v
, NULL
);
1957 int varlink_server_new(VarlinkServer
**ret
, VarlinkServerFlags flags
) {
1960 assert_return(ret
, -EINVAL
);
1961 assert_return((flags
& ~_VARLINK_SERVER_FLAGS_ALL
) == 0, -EINVAL
);
1963 s
= new(VarlinkServer
, 1);
1967 *s
= (VarlinkServer
) {
1970 .connections_max
= varlink_server_connections_max(NULL
),
1971 .connections_per_uid_max
= varlink_server_connections_per_uid_max(NULL
),
1978 static VarlinkServer
* varlink_server_destroy(VarlinkServer
*s
) {
1984 varlink_server_shutdown(s
);
1986 while ((m
= hashmap_steal_first_key(s
->methods
)))
1989 hashmap_free(s
->methods
);
1990 hashmap_free(s
->by_uid
);
1992 sd_event_unref(s
->event
);
1994 free(s
->description
);
1999 DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer
, varlink_server
, varlink_server_destroy
);
2001 static int validate_connection(VarlinkServer
*server
, const struct ucred
*ucred
) {
2007 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ROOT_ONLY
))
2008 allowed
= ucred
->uid
== 0;
2010 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_MYSELF_ONLY
))
2011 allowed
= allowed
> 0 || ucred
->uid
== getuid();
2013 if (allowed
== 0) { /* Allow access when it is explicitly allowed or when neither
2014 * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */
2015 varlink_server_log(server
, "Unprivileged client attempted connection, refusing.");
2019 if (server
->n_connections
>= server
->connections_max
) {
2020 varlink_server_log(server
, "Connection limit of %u reached, refusing.", server
->connections_max
);
2024 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ACCOUNT_UID
)) {
2027 if (!uid_is_valid(ucred
->uid
)) {
2028 varlink_server_log(server
, "Client with invalid UID attempted connection, refusing.");
2032 c
= PTR_TO_UINT(hashmap_get(server
->by_uid
, UID_TO_PTR(ucred
->uid
)));
2033 if (c
>= server
->connections_per_uid_max
) {
2034 varlink_server_log(server
, "Per-UID connection limit of %u reached, refusing.",
2035 server
->connections_per_uid_max
);
2043 static int count_connection(VarlinkServer
*server
, struct ucred
*ucred
) {
2050 server
->n_connections
++;
2052 if (FLAGS_SET(server
->flags
, VARLINK_SERVER_ACCOUNT_UID
)) {
2053 r
= hashmap_ensure_allocated(&server
->by_uid
, NULL
);
2055 return log_debug_errno(r
, "Failed to allocate UID hash table: %m");
2057 c
= PTR_TO_UINT(hashmap_get(server
->by_uid
, UID_TO_PTR(ucred
->uid
)));
2059 varlink_server_log(server
, "Connections of user " UID_FMT
": %u (of %u max)",
2060 ucred
->uid
, c
, server
->connections_per_uid_max
);
2062 r
= hashmap_replace(server
->by_uid
, UID_TO_PTR(ucred
->uid
), UINT_TO_PTR(c
+ 1));
2064 return log_debug_errno(r
, "Failed to increment counter in UID hash table: %m");
2070 int varlink_server_add_connection(VarlinkServer
*server
, int fd
, Varlink
**ret
) {
2071 _cleanup_(varlink_unrefp
) Varlink
*v
= NULL
;
2072 bool ucred_acquired
;
2076 assert_return(server
, -EINVAL
);
2077 assert_return(fd
>= 0, -EBADF
);
2079 if ((server
->flags
& (VARLINK_SERVER_ROOT_ONLY
|VARLINK_SERVER_ACCOUNT_UID
)) != 0) {
2080 r
= getpeercred(fd
, &ucred
);
2082 return varlink_server_log_errno(server
, r
, "Failed to acquire peer credentials of incoming socket, refusing: %m");
2084 ucred_acquired
= true;
2086 r
= validate_connection(server
, &ucred
);
2092 ucred_acquired
= false;
2094 r
= varlink_new(&v
);
2096 return varlink_server_log_errno(server
, r
, "Failed to allocate connection object: %m");
2098 r
= count_connection(server
, &ucred
);
2103 v
->userdata
= server
->userdata
;
2104 if (ucred_acquired
) {
2106 v
->ucred_acquired
= true;
2109 (void) asprintf(&v
->description
, "%s-%i", server
->description
?: "varlink", v
->fd
);
2111 /* Link up the server and the connection, and take reference in both directions. Note that the
2112 * reference on the connection is left dangling. It will be dropped when the connection is closed,
2113 * which happens in varlink_close(), including in the event loop quit callback. */
2114 v
->server
= varlink_server_ref(server
);
2117 varlink_set_state(v
, VARLINK_IDLE_SERVER
);
2119 if (server
->event
) {
2120 r
= varlink_attach_event(v
, server
->event
, server
->event_priority
);
2122 varlink_log_errno(v
, r
, "Failed to attach new connection: %m");
2123 v
->fd
= -1; /* take the fd out of the connection again */
2135 static int connect_callback(sd_event_source
*source
, int fd
, uint32_t revents
, void *userdata
) {
2136 VarlinkServerSocket
*ss
= userdata
;
2137 _cleanup_close_
int cfd
= -1;
2144 varlink_server_log(ss
->server
, "New incoming connection.");
2146 cfd
= accept4(fd
, NULL
, NULL
, SOCK_NONBLOCK
|SOCK_CLOEXEC
);
2148 if (ERRNO_IS_ACCEPT_AGAIN(errno
))
2151 return varlink_server_log_errno(ss
->server
, errno
, "Failed to accept incoming socket: %m");
2154 r
= varlink_server_add_connection(ss
->server
, cfd
, &v
);
2160 if (ss
->server
->connect_callback
) {
2161 r
= ss
->server
->connect_callback(ss
->server
, v
, ss
->server
->userdata
);
2163 varlink_log_errno(v
, r
, "Connection callback returned error, disconnecting client: %m");
2172 int varlink_server_listen_fd(VarlinkServer
*s
, int fd
) {
2173 _cleanup_free_ VarlinkServerSocket
*ss
= NULL
;
2176 assert_return(s
, -EINVAL
);
2177 assert_return(fd
>= 0, -EBADF
);
2179 r
= fd_nonblock(fd
, true);
2183 ss
= new(VarlinkServerSocket
, 1);
2187 *ss
= (VarlinkServerSocket
) {
2193 _cleanup_(sd_event_source_unrefp
) sd_event_source
*es
= NULL
;
2195 r
= sd_event_add_io(s
->event
, &es
, fd
, EPOLLIN
, connect_callback
, ss
);
2199 r
= sd_event_source_set_priority(ss
->event_source
, s
->event_priority
);
2204 LIST_PREPEND(sockets
, s
->sockets
, TAKE_PTR(ss
));
2208 int varlink_server_listen_address(VarlinkServer
*s
, const char *address
, mode_t m
) {
2209 union sockaddr_union sockaddr
;
2210 _cleanup_close_
int fd
= -1;
2213 assert_return(s
, -EINVAL
);
2214 assert_return(address
, -EINVAL
);
2215 assert_return((m
& ~0777) == 0, -EINVAL
);
2217 r
= sockaddr_un_set_path(&sockaddr
.un
, address
);
2221 fd
= socket(AF_UNIX
, SOCK_STREAM
|SOCK_CLOEXEC
|SOCK_NONBLOCK
, 0);
2225 fd
= fd_move_above_stdio(fd
);
2227 (void) sockaddr_un_unlink(&sockaddr
.un
);
2229 RUN_WITH_UMASK(~m
& 0777)
2230 if (bind(fd
, &sockaddr
.sa
, SOCKADDR_UN_LEN(sockaddr
.un
)) < 0)
2233 if (listen(fd
, SOMAXCONN
) < 0)
2236 r
= varlink_server_listen_fd(s
, fd
);
2244 void* varlink_server_set_userdata(VarlinkServer
*s
, void *userdata
) {
2247 assert_return(s
, NULL
);
2250 s
->userdata
= userdata
;
2255 void* varlink_server_get_userdata(VarlinkServer
*s
) {
2256 assert_return(s
, NULL
);
2261 static VarlinkServerSocket
* varlink_server_socket_destroy(VarlinkServerSocket
*ss
) {
2266 LIST_REMOVE(sockets
, ss
->server
->sockets
, ss
);
2268 sd_event_source_disable_unref(ss
->event_source
);
2276 int varlink_server_shutdown(VarlinkServer
*s
) {
2277 assert_return(s
, -EINVAL
);
2280 varlink_server_socket_destroy(s
->sockets
);
2285 int varlink_server_attach_event(VarlinkServer
*s
, sd_event
*e
, int64_t priority
) {
2286 VarlinkServerSocket
*ss
;
2289 assert_return(s
, -EINVAL
);
2290 assert_return(!s
->event
, -EBUSY
);
2293 s
->event
= sd_event_ref(e
);
2295 r
= sd_event_default(&s
->event
);
2300 LIST_FOREACH(sockets
, ss
, s
->sockets
) {
2301 assert(!ss
->event_source
);
2303 r
= sd_event_add_io(s
->event
, &ss
->event_source
, ss
->fd
, EPOLLIN
, connect_callback
, ss
);
2307 r
= sd_event_source_set_priority(ss
->event_source
, priority
);
2312 s
->event_priority
= priority
;
2316 varlink_server_detach_event(s
);
2320 int varlink_server_detach_event(VarlinkServer
*s
) {
2321 VarlinkServerSocket
*ss
;
2323 assert_return(s
, -EINVAL
);
2325 LIST_FOREACH(sockets
, ss
, s
->sockets
) {
2327 if (!ss
->event_source
)
2330 (void) sd_event_source_set_enabled(ss
->event_source
, SD_EVENT_OFF
);
2331 ss
->event_source
= sd_event_source_unref(ss
->event_source
);
2334 sd_event_unref(s
->event
);
2338 sd_event
*varlink_server_get_event(VarlinkServer
*s
) {
2339 assert_return(s
, NULL
);
2344 int varlink_server_bind_method(VarlinkServer
*s
, const char *method
, VarlinkMethod callback
) {
2348 assert_return(s
, -EINVAL
);
2349 assert_return(method
, -EINVAL
);
2350 assert_return(callback
, -EINVAL
);
2352 if (startswith(method
, "org.varlink.service."))
2355 r
= hashmap_ensure_allocated(&s
->methods
, &string_hash_ops
);
2363 r
= hashmap_put(s
->methods
, m
, callback
);
2372 int varlink_server_bind_method_many_internal(VarlinkServer
*s
, ...) {
2376 assert_return(s
, -EINVAL
);
2380 VarlinkMethod callback
;
2383 method
= va_arg(ap
, const char *);
2387 callback
= va_arg(ap
, VarlinkMethod
);
2389 r
= varlink_server_bind_method(s
, method
, callback
);
2398 int varlink_server_bind_connect(VarlinkServer
*s
, VarlinkConnect callback
) {
2399 assert_return(s
, -EINVAL
);
2401 if (callback
&& s
->connect_callback
&& callback
!= s
->connect_callback
)
2404 s
->connect_callback
= callback
;
2408 unsigned varlink_server_connections_max(VarlinkServer
*s
) {
2411 /* If a server is specified, return the setting for that server, otherwise the default value */
2413 return s
->connections_max
;
2415 assert_se(getrlimit(RLIMIT_NOFILE
, &rl
) >= 0);
2417 /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
2418 if (VARLINK_DEFAULT_CONNECTIONS_MAX
> rl
.rlim_cur
/ 4 * 3)
2419 return rl
.rlim_cur
/ 4 * 3;
2421 return VARLINK_DEFAULT_CONNECTIONS_MAX
;
2424 unsigned varlink_server_connections_per_uid_max(VarlinkServer
*s
) {
2428 return s
->connections_per_uid_max
;
2430 /* Make sure to never use up more than ¾th of available connections for a single user */
2431 m
= varlink_server_connections_max(NULL
);
2432 if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX
> m
)
2435 return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX
;
2438 int varlink_server_set_connections_per_uid_max(VarlinkServer
*s
, unsigned m
) {
2439 assert_return(s
, -EINVAL
);
2440 assert_return(m
> 0, -EINVAL
);
2442 s
->connections_per_uid_max
= m
;
2446 int varlink_server_set_connections_max(VarlinkServer
*s
, unsigned m
) {
2447 assert_return(s
, -EINVAL
);
2448 assert_return(m
> 0, -EINVAL
);
2450 s
->connections_max
= m
;
2454 int varlink_server_set_description(VarlinkServer
*s
, const char *description
) {
2455 assert_return(s
, -EINVAL
);
2457 return free_and_strdup(&s
->description
, description
);