]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/shared/varlink.c
alloc-util: add realloc0() helper than is like realloc() but zero-initializes appende...
[thirdparty/systemd.git] / src / shared / varlink.c
CommitLineData
db9ecf05 1/* SPDX-License-Identifier: LGPL-2.1-or-later */
d41bd96f 2
319a4f4b 3#include <malloc.h>
2b6c0bb2 4#include <poll.h>
d41bd96f
LP
5
6#include "alloc-util.h"
7#include "errno-util.h"
8#include "fd-util.h"
e2341b6b 9#include "glyph-util.h"
d41bd96f 10#include "hashmap.h"
0f2d351f 11#include "io-util.h"
d41bd96f
LP
12#include "list.h"
13#include "process-util.h"
63e00ccd 14#include "selinux-util.h"
008798e9 15#include "serialize.h"
d41bd96f
LP
16#include "set.h"
17#include "socket-util.h"
18#include "string-table.h"
19#include "string-util.h"
20#include "strv.h"
21#include "time-util.h"
22#include "umask-util.h"
23#include "user-util.h"
24#include "varlink.h"
008798e9 25#include "varlink-internal.h"
d41bd96f
LP
26
27#define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
28#define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
29
30#define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
31#define VARLINK_BUFFER_MAX (16U*1024U*1024U)
32#define VARLINK_READ_SIZE (64U*1024U)
33
34typedef enum VarlinkState {
35 /* Client side states */
36 VARLINK_IDLE_CLIENT,
37 VARLINK_AWAITING_REPLY,
45a6c965 38 VARLINK_AWAITING_REPLY_MORE,
d41bd96f
LP
39 VARLINK_CALLING,
40 VARLINK_CALLED,
41 VARLINK_PROCESSING_REPLY,
42
43 /* Server side states */
44 VARLINK_IDLE_SERVER,
45 VARLINK_PROCESSING_METHOD,
46 VARLINK_PROCESSING_METHOD_MORE,
47 VARLINK_PROCESSING_METHOD_ONEWAY,
48 VARLINK_PROCESSED_METHOD,
d41bd96f
LP
49 VARLINK_PENDING_METHOD,
50 VARLINK_PENDING_METHOD_MORE,
51
52 /* Common states (only during shutdown) */
53 VARLINK_PENDING_DISCONNECT,
54 VARLINK_PENDING_TIMEOUT,
55 VARLINK_PROCESSING_DISCONNECT,
56 VARLINK_PROCESSING_TIMEOUT,
57 VARLINK_PROCESSING_FAILURE,
58 VARLINK_DISCONNECTED,
59
60 _VARLINK_STATE_MAX,
2d93c20e 61 _VARLINK_STATE_INVALID = -EINVAL,
d41bd96f
LP
62} VarlinkState;
63
64/* Tests whether we are not yet disconnected. Note that this is true during all states where the connection
65 * is still good for something, and false only when it's dead for good. This means: when we are
66 * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as
67 * the connection is still good, and we are likely to be able to properly operate on it soon. */
68#define VARLINK_STATE_IS_ALIVE(state) \
69 IN_SET(state, \
70 VARLINK_IDLE_CLIENT, \
71 VARLINK_AWAITING_REPLY, \
45a6c965 72 VARLINK_AWAITING_REPLY_MORE, \
d41bd96f
LP
73 VARLINK_CALLING, \
74 VARLINK_CALLED, \
75 VARLINK_PROCESSING_REPLY, \
76 VARLINK_IDLE_SERVER, \
77 VARLINK_PROCESSING_METHOD, \
78 VARLINK_PROCESSING_METHOD_MORE, \
79 VARLINK_PROCESSING_METHOD_ONEWAY, \
80 VARLINK_PROCESSED_METHOD, \
d41bd96f
LP
81 VARLINK_PENDING_METHOD, \
82 VARLINK_PENDING_METHOD_MORE)
83
d37cdac6
LP
84typedef struct VarlinkJsonQueueItem VarlinkJsonQueueItem;
85
86/* A queued message we shall write into the socket, along with the file descriptors to send at the same
87 * time. This queue item binds them together so that message/fd boundaries are maintained throughout the
88 * whole pipeline. */
89struct VarlinkJsonQueueItem {
90 LIST_FIELDS(VarlinkJsonQueueItem, queue);
91 JsonVariant *data;
92 size_t n_fds;
93 int fds[];
94};
95
d41bd96f
LP
96struct Varlink {
97 unsigned n_ref;
98
99 VarlinkServer *server;
100
101 VarlinkState state;
102 bool connecting; /* This boolean indicates whether the socket fd we are operating on is currently
103 * processing an asynchronous connect(). In that state we watch the socket for
104 * EPOLLOUT, but we refrain from calling read() or write() on the socket as that
105 * will trigger ENOTCONN. Note that this boolean is kept separate from the
106 * VarlinkState above on purpose: while the connect() is still not complete we
107 * already want to allow queuing of messages and similar. Thus it's nice to keep
108 * these two state concepts separate: the VarlinkState encodes what our own view of
109 * the connection is, i.e. whether we think it's a server, a client, and has
110 * something queued already, while 'connecting' tells us a detail about the
111 * transport used below, that should have no effect on how we otherwise accept and
112 * process operations from the user.
113 *
114 * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the
115 * connection is good to use, even if it might not be fully connected
116 * yet. connecting=true then informs you that actually we are still connecting, and
117 * the connection is actually not established yet and thus any requests you enqueue
118 * now will still work fine but will be queued only, not sent yet, but that
119 * shouldn't stop you from using the connection, since eventually whatever you queue
120 * *will* be sent.
121 *
122 * Or to say this even differently: 'state' is a high-level ("application layer"
123 * high, if you so will) state, while 'conecting' is a low-level ("transport layer"
124 * low, if you so will) state, and while they are not entirely unrelated and
125 * sometimes propagate effects to each other they are only asynchronously connected
126 * at most. */
127 unsigned n_pending;
128
129 int fd;
130
131 char *input_buffer; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */
d41bd96f
LP
132 size_t input_buffer_index;
133 size_t input_buffer_size;
134 size_t input_buffer_unscanned;
135
3c868058
LP
136 void *input_control_buffer;
137 size_t input_control_buffer_size;
138
d41bd96f 139 char *output_buffer; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */
d41bd96f
LP
140 size_t output_buffer_index;
141 size_t output_buffer_size;
142
d37cdac6
LP
143 int *input_fds; /* file descriptors associated with the data in input_buffer (for fd passing) */
144 size_t n_input_fds;
145
146 int *output_fds; /* file descriptors associated with the data in output_buffer (for fd passing) */
147 size_t n_output_fds;
148
149 /* Further messages to output not yet formatted into text, and thus not included in output_buffer
150 * yet. We keep them separate from output_buffer, to not violate fd message boundaries: we want that
151 * each fd that is sent is associated with its fds, and that fds cannot be accidentally associated
94d82b59 152 * with preceding or following messages. */
d37cdac6
LP
153 LIST_HEAD(VarlinkJsonQueueItem, output_queue);
154 VarlinkJsonQueueItem *output_queue_tail;
155
156 /* The fds to associate with the next message that is about to be enqueued. The user first pushes the
157 * fds it intends to send via varlink_push_fd() into this queue, and then once the message data is
158 * submitted we'll combine the fds and the message data into one. */
159 int *pushed_fds;
160 size_t n_pushed_fds;
161
d41bd96f
LP
162 VarlinkReply reply_callback;
163
164 JsonVariant *current;
d41bd96f
LP
165
166 struct ucred ucred;
167 bool ucred_acquired:1;
168
169 bool write_disconnected:1;
170 bool read_disconnected:1;
171 bool prefer_read_write:1;
172 bool got_pollhup:1;
173
d37cdac6
LP
174 bool allow_fd_passing_input:1;
175 bool allow_fd_passing_output:1;
176
db1f7c84
LP
177 bool output_buffer_sensitive:1; /* whether to erase the output buffer after writing it to the socket */
178
d37cdac6
LP
179 int af; /* address family if socket; AF_UNSPEC if not socket; negative if not known */
180
d41bd96f
LP
181 usec_t timestamp;
182 usec_t timeout;
183
184 void *userdata;
185 char *description;
186
187 sd_event *event;
188 sd_event_source *io_event_source;
189 sd_event_source *time_event_source;
190 sd_event_source *quit_event_source;
191 sd_event_source *defer_event_source;
192};
193
194typedef struct VarlinkServerSocket VarlinkServerSocket;
195
196struct VarlinkServerSocket {
197 VarlinkServer *server;
198
199 int fd;
200 char *address;
201
202 sd_event_source *event_source;
203
204 LIST_FIELDS(VarlinkServerSocket, sockets);
205};
206
207struct VarlinkServer {
208 unsigned n_ref;
209 VarlinkServerFlags flags;
210
211 LIST_HEAD(VarlinkServerSocket, sockets);
212
213 Hashmap *methods;
214 VarlinkConnect connect_callback;
6d4d6002 215 VarlinkDisconnect disconnect_callback;
d41bd96f
LP
216
217 sd_event *event;
218 int64_t event_priority;
219
220 unsigned n_connections;
221 Hashmap *by_uid;
222
223 void *userdata;
224 char *description;
225
226 unsigned connections_max;
227 unsigned connections_per_uid_max;
228};
229
230static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
231 [VARLINK_IDLE_CLIENT] = "idle-client",
232 [VARLINK_AWAITING_REPLY] = "awaiting-reply",
45a6c965 233 [VARLINK_AWAITING_REPLY_MORE] = "awaiting-reply-more",
d41bd96f
LP
234 [VARLINK_CALLING] = "calling",
235 [VARLINK_CALLED] = "called",
236 [VARLINK_PROCESSING_REPLY] = "processing-reply",
237 [VARLINK_IDLE_SERVER] = "idle-server",
238 [VARLINK_PROCESSING_METHOD] = "processing-method",
239 [VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more",
240 [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
241 [VARLINK_PROCESSED_METHOD] = "processed-method",
d41bd96f
LP
242 [VARLINK_PENDING_METHOD] = "pending-method",
243 [VARLINK_PENDING_METHOD_MORE] = "pending-method-more",
244 [VARLINK_PENDING_DISCONNECT] = "pending-disconnect",
245 [VARLINK_PENDING_TIMEOUT] = "pending-timeout",
246 [VARLINK_PROCESSING_DISCONNECT] = "processing-disconnect",
247 [VARLINK_PROCESSING_TIMEOUT] = "processing-timeout",
248 [VARLINK_PROCESSING_FAILURE] = "processing-failure",
249 [VARLINK_DISCONNECTED] = "disconnected",
250};
251
252DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state, VarlinkState);
253
254#define varlink_log_errno(v, error, fmt, ...) \
255 log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
256
257#define varlink_log(v, fmt, ...) \
258 log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__)
259
260#define varlink_server_log_errno(s, error, fmt, ...) \
261 log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
262
263#define varlink_server_log(s, fmt, ...) \
264 log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
265
d37cdac6
LP
266static int varlink_format_queue(Varlink *v);
267
cf1ab844 268static const char *varlink_description(Varlink *v) {
f35e9b10 269 return (v ? v->description : NULL) ?: "varlink";
d41bd96f
LP
270}
271
cf1ab844 272static const char *varlink_server_description(VarlinkServer *s) {
f35e9b10 273 return (s ? s->description : NULL) ?: "varlink";
d41bd96f
LP
274}
275
d37cdac6
LP
276static VarlinkJsonQueueItem *varlink_json_queue_item_free(VarlinkJsonQueueItem *q) {
277 if (!q)
278 return NULL;
279
280 json_variant_unref(q->data);
281 close_many(q->fds, q->n_fds);
282
283 return mfree(q);
284}
285
286static VarlinkJsonQueueItem *varlink_json_queue_item_new(JsonVariant *m, const int fds[], size_t n_fds) {
287 VarlinkJsonQueueItem *q;
288
289 assert(m);
290 assert(fds || n_fds == 0);
291
292 q = malloc(offsetof(VarlinkJsonQueueItem, fds) + sizeof(int) * n_fds);
293 if (!q)
294 return NULL;
295
296 *q = (VarlinkJsonQueueItem) {
297 .data = json_variant_ref(m),
298 .n_fds = n_fds,
299 };
300
301 memcpy_safe(q->fds, fds, n_fds * sizeof(int));
302
303 return TAKE_PTR(q);
304}
305
d41bd96f
LP
306static void varlink_set_state(Varlink *v, VarlinkState state) {
307 assert(v);
77740b59
ZJS
308 assert(state >= 0 && state < _VARLINK_STATE_MAX);
309
310 if (v->state < 0)
953394e3 311 varlink_log(v, "Setting state %s",
77740b59
ZJS
312 varlink_state_to_string(state));
313 else
e2341b6b 314 varlink_log(v, "Changing state %s %s %s",
77740b59 315 varlink_state_to_string(v->state),
e2341b6b 316 special_glyph(SPECIAL_GLYPH_ARROW_RIGHT),
77740b59 317 varlink_state_to_string(state));
d41bd96f
LP
318
319 v->state = state;
320}
321
322static int varlink_new(Varlink **ret) {
323 Varlink *v;
324
325 assert(ret);
326
a48481dc 327 v = new(Varlink, 1);
d41bd96f
LP
328 if (!v)
329 return -ENOMEM;
330
331 *v = (Varlink) {
332 .n_ref = 1,
254d1313 333 .fd = -EBADF,
d41bd96f
LP
334
335 .state = _VARLINK_STATE_INVALID,
336
a995ce47 337 .ucred = UCRED_INVALID,
d41bd96f
LP
338
339 .timestamp = USEC_INFINITY,
d37cdac6
LP
340 .timeout = VARLINK_DEFAULT_TIMEOUT_USEC,
341
342 .af = -1,
d41bd96f
LP
343 };
344
345 *ret = v;
346 return 0;
347}
348
349int varlink_connect_address(Varlink **ret, const char *address) {
350 _cleanup_(varlink_unrefp) Varlink *v = NULL;
351 union sockaddr_union sockaddr;
352 int r;
353
354 assert_return(ret, -EINVAL);
355 assert_return(address, -EINVAL);
356
d41bd96f
LP
357 r = varlink_new(&v);
358 if (r < 0)
db3d4222 359 return log_debug_errno(r, "Failed to create varlink object: %m");
d41bd96f
LP
360
361 v->fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
362 if (v->fd < 0)
db3d4222 363 return log_debug_errno(errno, "Failed to create AF_UNIX socket: %m");
d41bd96f 364
a0c41de2 365 v->fd = fd_move_above_stdio(v->fd);
d37cdac6 366 v->af = AF_UNIX;
a0c41de2 367
1861986a
LP
368 r = sockaddr_un_set_path(&sockaddr.un, address);
369 if (r < 0) {
370 if (r != -ENAMETOOLONG)
371 return log_debug_errno(r, "Failed to set socket address '%s': %m", address);
372
373 /* This is a file system path, and too long to fit into sockaddr_un. Let's connect via O_PATH
374 * to this socket. */
375
376 r = connect_unix_path(v->fd, AT_FDCWD, address);
377 } else
378 r = RET_NERRNO(connect(v->fd, &sockaddr.sa, r));
379
380 if (r < 0) {
381 if (!IN_SET(r, -EAGAIN, -EINPROGRESS))
382 return log_debug_errno(r, "Failed to connect to %s: %m", address);
d41bd96f
LP
383
384 v->connecting = true; /* We are asynchronously connecting, i.e. the connect() is being
385 * processed in the background. As long as that's the case the socket
386 * is in a special state: it's there, we can poll it for EPOLLOUT, but
387 * if we attempt to write() to it before we see EPOLLOUT we'll get
388 * ENOTCONN (and not EAGAIN, like we would for a normal connected
389 * socket that isn't writable at the moment). Since ENOTCONN on write()
390 * hence can mean two different things (i.e. connection not complete
391 * yet vs. already disconnected again), we store as a boolean whether
392 * we are still in connect(). */
393 }
394
395 varlink_set_state(v, VARLINK_IDLE_CLIENT);
396
397 *ret = TAKE_PTR(v);
db3d4222 398 return 0;
d41bd96f
LP
399}
400
401int varlink_connect_fd(Varlink **ret, int fd) {
402 Varlink *v;
403 int r;
404
405 assert_return(ret, -EINVAL);
406 assert_return(fd >= 0, -EBADF);
407
408 r = fd_nonblock(fd, true);
409 if (r < 0)
db3d4222 410 return log_debug_errno(r, "Failed to make fd %d nonblocking: %m", fd);
d41bd96f
LP
411
412 r = varlink_new(&v);
413 if (r < 0)
db3d4222 414 return log_debug_errno(r, "Failed to create varlink object: %m");
d41bd96f
LP
415
416 v->fd = fd;
d37cdac6 417 v->af = -1,
d41bd96f
LP
418 varlink_set_state(v, VARLINK_IDLE_CLIENT);
419
420 /* Note that if this function is called we assume the passed socket (if it is one) is already
421 * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that
422 * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket
423 * until the connection is fully set up. Behaviour here is hence a bit different from
424 * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
425 * avoid doing write() on it before we saw EPOLLOUT for the first time. */
426
427 *ret = v;
428 return 0;
429}
430
431static void varlink_detach_event_sources(Varlink *v) {
432 assert(v);
433
1d3fe304 434 v->io_event_source = sd_event_source_disable_unref(v->io_event_source);
1d3fe304 435 v->time_event_source = sd_event_source_disable_unref(v->time_event_source);
1d3fe304 436 v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
1d3fe304 437 v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
d41bd96f
LP
438}
439
790446bd
LP
440static void varlink_clear_current(Varlink *v) {
441 assert(v);
442
443 /* Clears the currently processed incoming message */
444 v->current = json_variant_unref(v->current);
d37cdac6
LP
445
446 close_many(v->input_fds, v->n_input_fds);
447 v->input_fds = mfree(v->input_fds);
448 v->n_input_fds = 0;
790446bd
LP
449}
450
d41bd96f
LP
451static void varlink_clear(Varlink *v) {
452 assert(v);
453
454 varlink_detach_event_sources(v);
455
456 v->fd = safe_close(v->fd);
457
d37cdac6
LP
458 varlink_clear_current(v);
459
d41bd96f 460 v->input_buffer = mfree(v->input_buffer);
db1f7c84 461 v->output_buffer = v->output_buffer_sensitive ? erase_and_free(v->output_buffer) : mfree(v->output_buffer);
d41bd96f 462
3c868058
LP
463 v->input_control_buffer = mfree(v->input_control_buffer);
464 v->input_control_buffer_size = 0;
465
d37cdac6
LP
466 close_many(v->output_fds, v->n_output_fds);
467 v->output_fds = mfree(v->output_fds);
468 v->n_output_fds = 0;
469
470 close_many(v->pushed_fds, v->n_pushed_fds);
471 v->pushed_fds = mfree(v->pushed_fds);
472 v->n_pushed_fds = 0;
473
9aad490e 474 LIST_CLEAR(queue, v->output_queue, varlink_json_queue_item_free);
d37cdac6
LP
475 v->output_queue_tail = NULL;
476
d41bd96f
LP
477 v->event = sd_event_unref(v->event);
478}
479
480static Varlink* varlink_destroy(Varlink *v) {
481 if (!v)
482 return NULL;
483
484 /* If this is called the server object must already been unreffed here. Why that? because when we
485 * linked up the varlink connection with the server object we took one ref in each direction */
486 assert(!v->server);
487
488 varlink_clear(v);
489
490 free(v->description);
491 return mfree(v);
492}
493
494DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink, varlink, varlink_destroy);
495
496static int varlink_test_disconnect(Varlink *v) {
497 assert(v);
498
37b22b3b 499 /* Tests whether we the connection has been terminated. We are careful to not stop processing it
d41bd96f
LP
500 * prematurely, since we want to handle half-open connections as well as possible and want to flush
501 * out and read data before we close down if we can. */
502
503 /* Already disconnected? */
504 if (!VARLINK_STATE_IS_ALIVE(v->state))
505 return 0;
506
507 /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
508 if (v->connecting)
509 return 0;
510
511 /* Still something to write and we can write? Stay around */
512 if (v->output_buffer_size > 0 && !v->write_disconnected)
513 return 0;
514
515 /* Both sides gone already? Then there's no need to stick around */
516 if (v->read_disconnected && v->write_disconnected)
517 goto disconnect;
518
519 /* If we are waiting for incoming data but the read side is shut down, disconnect. */
45a6c965 520 if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
d41bd96f
LP
521 goto disconnect;
522
523 /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
524 * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
525 * being down if we never wrote anything. */
4f06325c 526 if (v->state == VARLINK_IDLE_CLIENT && (v->write_disconnected || v->got_pollhup))
d41bd96f
LP
527 goto disconnect;
528
7c26a631
LP
529 /* We are on the server side and still want to send out more replies, but we saw POLLHUP already, and
530 * either got no buffered bytes to write anymore or already saw a write error. In that case we should
531 * shut down the varlink link. */
532 if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE) && (v->write_disconnected || v->output_buffer_size == 0) && v->got_pollhup)
e8e9227f
AZ
533 goto disconnect;
534
d41bd96f
LP
535 return 0;
536
537disconnect:
538 varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
539 return 1;
540}
541
542static int varlink_write(Varlink *v) {
543 ssize_t n;
d37cdac6 544 int r;
d41bd96f
LP
545
546 assert(v);
547
548 if (!VARLINK_STATE_IS_ALIVE(v->state))
549 return 0;
550 if (v->connecting) /* Writing while we are still wait for a non-blocking connect() to complete will
551 * result in ENOTCONN, hence exit early here */
552 return 0;
d41bd96f
LP
553 if (v->write_disconnected)
554 return 0;
555
d37cdac6
LP
556 /* If needed let's convert some output queue json variants into text form */
557 r = varlink_format_queue(v);
558 if (r < 0)
559 return r;
560
561 if (v->output_buffer_size == 0)
562 return 0;
563
d41bd96f
LP
564 assert(v->fd >= 0);
565
d37cdac6
LP
566 if (v->n_output_fds > 0) { /* If we shall send fds along, we must use sendmsg() */
567 struct iovec iov = {
568 .iov_base = v->output_buffer + v->output_buffer_index,
569 .iov_len = v->output_buffer_size,
570 };
571 struct msghdr mh = {
572 .msg_iov = &iov,
573 .msg_iovlen = 1,
574 .msg_controllen = CMSG_SPACE(sizeof(int) * v->n_output_fds),
575 };
576
577 mh.msg_control = alloca0(mh.msg_controllen);
578
579 struct cmsghdr *control = CMSG_FIRSTHDR(&mh);
580 control->cmsg_len = CMSG_LEN(sizeof(int) * v->n_output_fds);
581 control->cmsg_level = SOL_SOCKET;
582 control->cmsg_type = SCM_RIGHTS;
583 memcpy(CMSG_DATA(control), v->output_fds, sizeof(int) * v->n_output_fds);
584
585 n = sendmsg(v->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
586 } else {
587 /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
588 * with non-socket IO, hence fall back automatically.
589 *
590 * Use a local variable to help gcc figure out that we set 'n' in all cases. */
591 bool prefer_write = v->prefer_read_write;
592 if (!prefer_write) {
593 n = send(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL);
594 if (n < 0 && errno == ENOTSOCK)
595 prefer_write = v->prefer_read_write = true;
596 }
597 if (prefer_write)
598 n = write(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size);
d41bd96f 599 }
d41bd96f
LP
600 if (n < 0) {
601 if (errno == EAGAIN)
602 return 0;
603
604 if (ERRNO_IS_DISCONNECT(errno)) {
605 /* If we get informed about a disconnect on write, then let's remember that, but not
606 * act on it just yet. Let's wait for read() to report the issue first. */
607 v->write_disconnected = true;
608 return 1;
609 }
610
611 return -errno;
612 }
613
db1f7c84
LP
614 if (v->output_buffer_sensitive)
615 explicit_bzero_safe(v->output_buffer + v->output_buffer_index, n);
616
d41bd96f
LP
617 v->output_buffer_size -= n;
618
db1f7c84 619 if (v->output_buffer_size == 0) {
d41bd96f 620 v->output_buffer_index = 0;
db1f7c84
LP
621 v->output_buffer_sensitive = false; /* We can reset the sensitive flag once the buffer is empty */
622 } else
d41bd96f
LP
623 v->output_buffer_index += n;
624
d37cdac6
LP
625 close_many(v->output_fds, v->n_output_fds);
626 v->n_output_fds = 0;
627
d41bd96f
LP
628 v->timestamp = now(CLOCK_MONOTONIC);
629 return 1;
630}
631
d37cdac6
LP
632#define VARLINK_FDS_MAX (16U*1024U)
633
d41bd96f 634static int varlink_read(Varlink *v) {
d37cdac6
LP
635 struct iovec iov;
636 struct msghdr mh;
d41bd96f
LP
637 size_t rs;
638 ssize_t n;
d37cdac6 639 void *p;
d41bd96f
LP
640
641 assert(v);
642
45a6c965 643 if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
d41bd96f
LP
644 return 0;
645 if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
646 return 0;
647 if (v->current)
648 return 0;
649 if (v->input_buffer_unscanned > 0)
650 return 0;
651 if (v->read_disconnected)
652 return 0;
653
654 if (v->input_buffer_size >= VARLINK_BUFFER_MAX)
655 return -ENOBUFS;
656
657 assert(v->fd >= 0);
658
319a4f4b 659 if (MALLOC_SIZEOF_SAFE(v->input_buffer) <= v->input_buffer_index + v->input_buffer_size) {
d41bd96f
LP
660 size_t add;
661
662 add = MIN(VARLINK_BUFFER_MAX - v->input_buffer_size, VARLINK_READ_SIZE);
663
664 if (v->input_buffer_index == 0) {
665
319a4f4b 666 if (!GREEDY_REALLOC(v->input_buffer, v->input_buffer_size + add))
d41bd96f
LP
667 return -ENOMEM;
668
669 } else {
670 char *b;
671
672 b = new(char, v->input_buffer_size + add);
673 if (!b)
674 return -ENOMEM;
675
676 memcpy(b, v->input_buffer + v->input_buffer_index, v->input_buffer_size);
677
678 free_and_replace(v->input_buffer, b);
d41bd96f
LP
679 v->input_buffer_index = 0;
680 }
681 }
682
d37cdac6 683 p = v->input_buffer + v->input_buffer_index + v->input_buffer_size;
319a4f4b 684 rs = MALLOC_SIZEOF_SAFE(v->input_buffer) - (v->input_buffer_index + v->input_buffer_size);
d41bd96f 685
d37cdac6 686 if (v->allow_fd_passing_input) {
0347b9fd 687 iov = IOVEC_MAKE(p, rs);
3c868058
LP
688
689 /* Allocate the fd buffer on the heap, since we need a lot of space potentially */
690 if (!v->input_control_buffer) {
691 v->input_control_buffer_size = CMSG_SPACE(sizeof(int) * VARLINK_FDS_MAX);
692 v->input_control_buffer = malloc(v->input_control_buffer_size);
693 if (!v->input_control_buffer)
694 return -ENOMEM;
695 }
696
d37cdac6
LP
697 mh = (struct msghdr) {
698 .msg_iov = &iov,
699 .msg_iovlen = 1,
3c868058
LP
700 .msg_control = v->input_control_buffer,
701 .msg_controllen = v->input_control_buffer_size,
d37cdac6 702 };
b456f226 703
d37cdac6
LP
704 n = recvmsg_safe(v->fd, &mh, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
705 } else {
706 bool prefer_read = v->prefer_read_write;
707 if (!prefer_read) {
708 n = recv(v->fd, p, rs, MSG_DONTWAIT);
709 if (n < 0 && errno == ENOTSOCK)
710 prefer_read = v->prefer_read_write = true;
711 }
712 if (prefer_read)
713 n = read(v->fd, p, rs);
d41bd96f 714 }
d41bd96f
LP
715 if (n < 0) {
716 if (errno == EAGAIN)
717 return 0;
718
719 if (ERRNO_IS_DISCONNECT(errno)) {
720 v->read_disconnected = true;
721 return 1;
722 }
723
724 return -errno;
725 }
726 if (n == 0) { /* EOF */
d37cdac6
LP
727
728 if (v->allow_fd_passing_input)
729 cmsg_close_all(&mh);
730
d41bd96f
LP
731 v->read_disconnected = true;
732 return 1;
733 }
734
d37cdac6
LP
735 if (v->allow_fd_passing_input) {
736 struct cmsghdr* cmsg;
737
738 cmsg = cmsg_find(&mh, SOL_SOCKET, SCM_RIGHTS, (socklen_t) -1);
739 if (cmsg) {
740 size_t add;
741
742 /* We only allow file descriptors to be passed along with the first byte of a
743 * message. If they are passed with any other byte this is a protocol violation. */
744 if (v->input_buffer_size != 0) {
745 cmsg_close_all(&mh);
746 return -EPROTO;
747 }
748
749 add = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
750 if (add > INT_MAX - v->n_input_fds) {
751 cmsg_close_all(&mh);
752 return -EBADF;
753 }
754
755 if (!GREEDY_REALLOC(v->input_fds, v->n_input_fds + add)) {
756 cmsg_close_all(&mh);
757 return -ENOMEM;
758 }
759
760 memcpy_safe(v->input_fds + v->n_input_fds, CMSG_TYPED_DATA(cmsg, int), add * sizeof(int));
761 v->n_input_fds += add;
762 }
763 }
764
d41bd96f
LP
765 v->input_buffer_size += n;
766 v->input_buffer_unscanned += n;
767
768 return 1;
769}
770
771static int varlink_parse_message(Varlink *v) {
772 const char *e, *begin;
773 size_t sz;
774 int r;
775
776 assert(v);
777
778 if (v->current)
779 return 0;
780 if (v->input_buffer_unscanned <= 0)
781 return 0;
782
783 assert(v->input_buffer_unscanned <= v->input_buffer_size);
319a4f4b 784 assert(v->input_buffer_index + v->input_buffer_size <= MALLOC_SIZEOF_SAFE(v->input_buffer));
d41bd96f
LP
785
786 begin = v->input_buffer + v->input_buffer_index;
787
788 e = memchr(begin + v->input_buffer_size - v->input_buffer_unscanned, 0, v->input_buffer_unscanned);
789 if (!e) {
790 v->input_buffer_unscanned = 0;
791 return 0;
792 }
793
794 sz = e - begin + 1;
795
77472d06
ZJS
796 varlink_log(v, "New incoming message: %s", begin); /* FIXME: should we output the whole message here before validation?
797 * This may produce a non-printable journal entry if the message
798 * is invalid. We may also expose privileged information. */
d41bd96f 799
d642f640 800 r = json_parse(begin, 0, &v->current, NULL, NULL);
77472d06
ZJS
801 if (r < 0) {
802 /* If we encounter a parse failure flush all data. We cannot possibly recover from this,
803 * hence drop all buffered data now. */
804 v->input_buffer_index = v->input_buffer_size = v->input_buffer_unscanned = 0;
805 return varlink_log_errno(v, r, "Failed to parse JSON: %m");
806 }
d41bd96f
LP
807
808 v->input_buffer_size -= sz;
809
810 if (v->input_buffer_size == 0)
811 v->input_buffer_index = 0;
812 else
813 v->input_buffer_index += sz;
814
815 v->input_buffer_unscanned = v->input_buffer_size;
816 return 1;
817}
818
819static int varlink_test_timeout(Varlink *v) {
820 assert(v);
821
45a6c965 822 if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
d41bd96f
LP
823 return 0;
824 if (v->timeout == USEC_INFINITY)
825 return 0;
826
827 if (now(CLOCK_MONOTONIC) < usec_add(v->timestamp, v->timeout))
828 return 0;
829
830 varlink_set_state(v, VARLINK_PENDING_TIMEOUT);
831
832 return 1;
833}
834
835static int varlink_dispatch_local_error(Varlink *v, const char *error) {
836 int r;
837
838 assert(v);
839 assert(error);
840
841 if (!v->reply_callback)
842 return 0;
843
844 r = v->reply_callback(v, NULL, error, VARLINK_REPLY_ERROR|VARLINK_REPLY_LOCAL, v->userdata);
845 if (r < 0)
846 log_debug_errno(r, "Reply callback returned error, ignoring: %m");
847
848 return 1;
849}
850
851static int varlink_dispatch_timeout(Varlink *v) {
852 assert(v);
853
854 if (v->state != VARLINK_PENDING_TIMEOUT)
855 return 0;
856
857 varlink_set_state(v, VARLINK_PROCESSING_TIMEOUT);
858 varlink_dispatch_local_error(v, VARLINK_ERROR_TIMEOUT);
859 varlink_close(v);
860
861 return 1;
862}
863
864static int varlink_dispatch_disconnect(Varlink *v) {
865 assert(v);
866
867 if (v->state != VARLINK_PENDING_DISCONNECT)
868 return 0;
869
870 varlink_set_state(v, VARLINK_PROCESSING_DISCONNECT);
871 varlink_dispatch_local_error(v, VARLINK_ERROR_DISCONNECTED);
872 varlink_close(v);
873
874 return 1;
875}
876
877static int varlink_sanitize_parameters(JsonVariant **v) {
878 assert(v);
879
880 /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */
881 if (!*v)
882 return json_variant_new_object(v, NULL, 0);
883 else if (!json_variant_is_object(*v))
884 return -EINVAL;
885
886 return 0;
887}
888
889static int varlink_dispatch_reply(Varlink *v) {
890 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
891 VarlinkReplyFlags flags = 0;
892 const char *error = NULL;
893 JsonVariant *e;
894 const char *k;
895 int r;
896
897 assert(v);
898
45a6c965 899 if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
d41bd96f
LP
900 return 0;
901 if (!v->current)
902 return 0;
903
904 assert(v->n_pending > 0);
905
906 if (!json_variant_is_object(v->current))
907 goto invalid;
908
909 JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) {
910
911 if (streq(k, "error")) {
912 if (error)
913 goto invalid;
914 if (!json_variant_is_string(e))
915 goto invalid;
916
917 error = json_variant_string(e);
918 flags |= VARLINK_REPLY_ERROR;
919
920 } else if (streq(k, "parameters")) {
921 if (parameters)
922 goto invalid;
923 if (!json_variant_is_object(e))
924 goto invalid;
925
926 parameters = json_variant_ref(e);
927
928 } else if (streq(k, "continues")) {
929 if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
930 goto invalid;
931
932 if (!json_variant_is_boolean(e))
933 goto invalid;
934
935 if (json_variant_boolean(e))
936 flags |= VARLINK_REPLY_CONTINUES;
937 } else
938 goto invalid;
939 }
940
45a6c965
LP
941 /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
942 if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
943 goto invalid;
944
945 /* An error is final */
d41bd96f
LP
946 if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
947 goto invalid;
948
949 r = varlink_sanitize_parameters(&parameters);
950 if (r < 0)
951 goto invalid;
952
45a6c965 953 if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) {
d41bd96f
LP
954 varlink_set_state(v, VARLINK_PROCESSING_REPLY);
955
956 if (v->reply_callback) {
957 r = v->reply_callback(v, parameters, error, flags, v->userdata);
958 if (r < 0)
959 log_debug_errno(r, "Reply callback returned error, ignoring: %m");
960 }
961
790446bd 962 varlink_clear_current(v);
d41bd96f
LP
963
964 if (v->state == VARLINK_PROCESSING_REPLY) {
45a6c965 965
d41bd96f 966 assert(v->n_pending > 0);
d41bd96f 967
45a6c965
LP
968 if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
969 v->n_pending--;
970
971 varlink_set_state(v,
972 FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
973 v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
d41bd96f
LP
974 }
975 } else {
976 assert(v->state == VARLINK_CALLING);
d41bd96f
LP
977 varlink_set_state(v, VARLINK_CALLED);
978 }
979
980 return 1;
981
982invalid:
983 varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
984 varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL);
985 varlink_close(v);
986
987 return 1;
988}
989
990static int varlink_dispatch_method(Varlink *v) {
991 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
992 VarlinkMethodFlags flags = 0;
993 const char *method = NULL, *error;
994 JsonVariant *e;
995 VarlinkMethod callback;
996 const char *k;
997 int r;
998
999 assert(v);
1000
1001 if (v->state != VARLINK_IDLE_SERVER)
1002 return 0;
1003 if (!v->current)
1004 return 0;
1005
1006 if (!json_variant_is_object(v->current))
1007 goto invalid;
1008
1009 JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) {
1010
1011 if (streq(k, "method")) {
1012 if (method)
1013 goto invalid;
1014 if (!json_variant_is_string(e))
1015 goto invalid;
1016
1017 method = json_variant_string(e);
1018
1019 } else if (streq(k, "parameters")) {
1020 if (parameters)
1021 goto invalid;
1022 if (!json_variant_is_object(e))
1023 goto invalid;
1024
1025 parameters = json_variant_ref(e);
1026
1027 } else if (streq(k, "oneway")) {
1028
1029 if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0)
1030 goto invalid;
1031
1032 if (!json_variant_is_boolean(e))
1033 goto invalid;
1034
1035 if (json_variant_boolean(e))
1036 flags |= VARLINK_METHOD_ONEWAY;
1037
1038 } else if (streq(k, "more")) {
1039
1040 if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0)
1041 goto invalid;
1042
1043 if (!json_variant_is_boolean(e))
1044 goto invalid;
1045
1046 if (json_variant_boolean(e))
1047 flags |= VARLINK_METHOD_MORE;
1048
1049 } else
1050 goto invalid;
1051 }
1052
1053 if (!method)
1054 goto invalid;
1055
1056 r = varlink_sanitize_parameters(&parameters);
1057 if (r < 0)
1058 goto fail;
1059
1060 varlink_set_state(v, (flags & VARLINK_METHOD_MORE) ? VARLINK_PROCESSING_METHOD_MORE :
1061 (flags & VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
1062 VARLINK_PROCESSING_METHOD);
1063
1064 assert(v->server);
1065
1066 if (STR_IN_SET(method, "org.varlink.service.GetInfo", "org.varlink.service.GetInterface")) {
1067 /* For now, we don't implement a single of varlink's own methods */
1068 callback = NULL;
1069 error = VARLINK_ERROR_METHOD_NOT_IMPLEMENTED;
1070 } else if (startswith(method, "org.varlink.service.")) {
1071 callback = NULL;
1072 error = VARLINK_ERROR_METHOD_NOT_FOUND;
1073 } else {
1074 callback = hashmap_get(v->server->methods, method);
1075 error = VARLINK_ERROR_METHOD_NOT_FOUND;
1076 }
1077
1078 if (callback) {
1079 r = callback(v, parameters, flags, v->userdata);
1080 if (r < 0) {
1081 log_debug_errno(r, "Callback for %s returned error: %m", method);
1082
1083 /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */
1084 if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) {
7466e94f 1085 r = varlink_error_errno(v, r);
d41bd96f
LP
1086 if (r < 0)
1087 return r;
1088 }
1089 }
1090 } else if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) {
1091 assert(error);
1092
1093 r = varlink_errorb(v, error, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method))));
1094 if (r < 0)
1095 return r;
1096 }
1097
1098 switch (v->state) {
1099
1100 case VARLINK_PROCESSED_METHOD: /* Method call is fully processed */
1101 case VARLINK_PROCESSING_METHOD_ONEWAY: /* ditto */
790446bd 1102 varlink_clear_current(v);
d41bd96f
LP
1103 varlink_set_state(v, VARLINK_IDLE_SERVER);
1104 break;
1105
1106 case VARLINK_PROCESSING_METHOD: /* Method call wasn't replied to, will be replied to later */
1107 varlink_set_state(v, VARLINK_PENDING_METHOD);
1108 break;
1109
d41bd96f
LP
1110 case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
1111 varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
1112 break;
1113
1114 default:
04499a70 1115 assert_not_reached();
d41bd96f
LP
1116
1117 }
1118
1119 return r;
1120
1121invalid:
1122 r = -EINVAL;
1123
1124fail:
1125 varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
1126 varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL);
1127 varlink_close(v);
1128
1129 return r;
1130}
1131
1132int varlink_process(Varlink *v) {
1133 int r;
1134
1135 assert_return(v, -EINVAL);
1136
1137 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1138 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f
LP
1139
1140 varlink_ref(v);
1141
1142 r = varlink_write(v);
db3d4222
ZJS
1143 if (r < 0)
1144 varlink_log_errno(v, r, "Write failed: %m");
d41bd96f
LP
1145 if (r != 0)
1146 goto finish;
1147
1148 r = varlink_dispatch_reply(v);
db3d4222
ZJS
1149 if (r < 0)
1150 varlink_log_errno(v, r, "Reply dispatch failed: %m");
d41bd96f
LP
1151 if (r != 0)
1152 goto finish;
1153
1154 r = varlink_dispatch_method(v);
db3d4222
ZJS
1155 if (r < 0)
1156 varlink_log_errno(v, r, "Method dispatch failed: %m");
d41bd96f
LP
1157 if (r != 0)
1158 goto finish;
1159
1160 r = varlink_parse_message(v);
db3d4222
ZJS
1161 if (r < 0)
1162 varlink_log_errno(v, r, "Message parsing failed: %m");
d41bd96f
LP
1163 if (r != 0)
1164 goto finish;
1165
1166 r = varlink_read(v);
db3d4222
ZJS
1167 if (r < 0)
1168 varlink_log_errno(v, r, "Read failed: %m");
d41bd96f
LP
1169 if (r != 0)
1170 goto finish;
1171
1172 r = varlink_test_disconnect(v);
db3d4222 1173 assert(r >= 0);
d41bd96f
LP
1174 if (r != 0)
1175 goto finish;
1176
1177 r = varlink_dispatch_disconnect(v);
db3d4222 1178 assert(r >= 0);
d41bd96f
LP
1179 if (r != 0)
1180 goto finish;
1181
1182 r = varlink_test_timeout(v);
db3d4222 1183 assert(r >= 0);
d41bd96f
LP
1184 if (r != 0)
1185 goto finish;
1186
1187 r = varlink_dispatch_timeout(v);
db3d4222 1188 assert(r >= 0);
d41bd96f
LP
1189 if (r != 0)
1190 goto finish;
1191
1192finish:
1193 if (r >= 0 && v->defer_event_source) {
1194 int q;
1195
1196 /* If we did some processing, make sure we are called again soon */
1197 q = sd_event_source_set_enabled(v->defer_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF);
1198 if (q < 0)
db3d4222 1199 r = varlink_log_errno(v, q, "Failed to enable deferred event source: %m");
d41bd96f
LP
1200 }
1201
1202 if (r < 0) {
1203 if (VARLINK_STATE_IS_ALIVE(v->state))
1204 /* Initiate disconnection */
1205 varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
1206 else
1207 /* We failed while disconnecting, in that case close right away */
1208 varlink_close(v);
1209 }
1210
1211 varlink_unref(v);
1212 return r;
1213}
1214
1215static void handle_revents(Varlink *v, int revents) {
1216 assert(v);
1217
1218 if (v->connecting) {
1219 /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
1220 * to complete on, we know we are ready. We don't read the connection error here though,
1221 * we'll get the error on the next read() or write(). */
1222 if ((revents & (POLLOUT|POLLHUP)) == 0)
1223 return;
1224
6976bf5c 1225 varlink_log(v, "Asynchronous connection completed.");
d41bd96f
LP
1226 v->connecting = false;
1227 } else {
1228 /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
1229 * what we can. However, we do care about POLLHUP to detect connection termination even if we
1230 * momentarily don't want to read nor write anything. */
1231
1232 if (!FLAGS_SET(revents, POLLHUP))
1233 return;
1234
1235 varlink_log(v, "Got POLLHUP from socket.");
1236 v->got_pollhup = true;
1237 }
1238}
1239
1240int varlink_wait(Varlink *v, usec_t timeout) {
d41bd96f
LP
1241 int r, fd, events;
1242 usec_t t;
1243
1244 assert_return(v, -EINVAL);
d41bd96f
LP
1245
1246 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1247 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f
LP
1248
1249 r = varlink_get_timeout(v, &t);
1250 if (r < 0)
1251 return r;
1252 if (t != USEC_INFINITY) {
1253 usec_t n;
1254
1255 n = now(CLOCK_MONOTONIC);
1256 if (t < n)
1257 t = 0;
1258 else
1259 t = usec_sub_unsigned(t, n);
1260 }
1261
1262 if (timeout != USEC_INFINITY &&
1263 (t == USEC_INFINITY || timeout < t))
1264 t = timeout;
1265
1266 fd = varlink_get_fd(v);
1267 if (fd < 0)
1268 return fd;
1269
1270 events = varlink_get_events(v);
1271 if (events < 0)
1272 return events;
1273
0f2d351f 1274 r = fd_wait_for_event(fd, events, t);
13d84288
ZJS
1275 if (ERRNO_IS_NEG_TRANSIENT(r)) /* Treat EINTR as not a timeout, but also nothing happened, and
1276 * the caller gets a chance to call back into us */
6976bf5c 1277 return 1;
05827831 1278 if (r <= 0)
0f2d351f 1279 return r;
d41bd96f 1280
0f2d351f 1281 handle_revents(v, r);
dad28bff 1282 return 1;
d41bd96f
LP
1283}
1284
1285int varlink_get_fd(Varlink *v) {
1286
1287 assert_return(v, -EINVAL);
1288
1289 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1290 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f 1291 if (v->fd < 0)
db3d4222 1292 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid fd.");
d41bd96f
LP
1293
1294 return v->fd;
1295}
1296
1297int varlink_get_events(Varlink *v) {
1298 int ret = 0;
1299
1300 assert_return(v, -EINVAL);
1301
1302 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1303 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f
LP
1304
1305 if (v->connecting) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
1306 * tells us that the connection is now complete. Before that we should neither
1307 * write() or read() from the fd. */
1308 return EPOLLOUT;
1309
1310 if (!v->read_disconnected &&
45a6c965 1311 IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
d41bd96f
LP
1312 !v->current &&
1313 v->input_buffer_unscanned <= 0)
1314 ret |= EPOLLIN;
1315
1316 if (!v->write_disconnected &&
1317 v->output_buffer_size > 0)
1318 ret |= EPOLLOUT;
1319
1320 return ret;
1321}
1322
1323int varlink_get_timeout(Varlink *v, usec_t *ret) {
1324 assert_return(v, -EINVAL);
1325
1326 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1327 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f 1328
45a6c965 1329 if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
d41bd96f
LP
1330 v->timeout != USEC_INFINITY) {
1331 if (ret)
1332 *ret = usec_add(v->timestamp, v->timeout);
1333 return 1;
1334 } else {
1335 if (ret)
1336 *ret = USEC_INFINITY;
1337 return 0;
1338 }
1339}
1340
1341int varlink_flush(Varlink *v) {
1342 int ret = 0, r;
1343
1344 assert_return(v, -EINVAL);
1345
1346 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1347 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f
LP
1348
1349 for (;;) {
d41bd96f
LP
1350 if (v->output_buffer_size == 0)
1351 break;
1352 if (v->write_disconnected)
1353 return -ECONNRESET;
1354
1355 r = varlink_write(v);
1356 if (r < 0)
1357 return r;
1358 if (r > 0) {
1359 ret = 1;
1360 continue;
1361 }
1362
0f2d351f 1363 r = fd_wait_for_event(v->fd, POLLOUT, USEC_INFINITY);
bb44fd07
ZJS
1364 if (ERRNO_IS_NEG_TRANSIENT(r))
1365 continue;
1366 if (r < 0)
db3d4222 1367 return varlink_log_errno(v, r, "Poll failed on fd: %m");
bb44fd07 1368 assert(r > 0);
dad28bff 1369
0f2d351f 1370 handle_revents(v, r);
d41bd96f
LP
1371 }
1372
1373 return ret;
1374}
1375
1376static void varlink_detach_server(Varlink *v) {
6d4d6002 1377 VarlinkServer *saved_server;
d41bd96f
LP
1378 assert(v);
1379
1380 if (!v->server)
1381 return;
1382
1383 if (v->server->by_uid &&
1384 v->ucred_acquired &&
1385 uid_is_valid(v->ucred.uid)) {
1386 unsigned c;
1387
1388 c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(v->ucred.uid)));
1389 assert(c > 0);
1390
1391 if (c == 1)
1392 (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(v->ucred.uid));
1393 else
1394 (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(v->ucred.uid), UINT_TO_PTR(c - 1));
1395 }
1396
1397 assert(v->server->n_connections > 0);
1398 v->server->n_connections--;
1399
1400 /* If this is a connection associated to a server, then let's disconnect the server and the
6d4d6002
LP
1401 * connection from each other. This drops the dangling reference that connect_callback() set up. But
1402 * before we release the references, let's call the disconnection callback if it is defined. */
1403
1404 saved_server = TAKE_PTR(v->server);
1405
1406 if (saved_server->disconnect_callback)
1407 saved_server->disconnect_callback(saved_server, v, saved_server->userdata);
1408
1409 varlink_server_unref(saved_server);
d41bd96f
LP
1410 varlink_unref(v);
1411}
1412
1413int varlink_close(Varlink *v) {
d41bd96f
LP
1414 assert_return(v, -EINVAL);
1415
1416 if (v->state == VARLINK_DISCONNECTED)
1417 return 0;
1418
1419 varlink_set_state(v, VARLINK_DISCONNECTED);
1420
cc6b0a18
LP
1421 /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref
1422 * which would destroy us before we can call varlink_clear() */
d41bd96f
LP
1423 varlink_ref(v);
1424 varlink_detach_server(v);
1425 varlink_clear(v);
1426 varlink_unref(v);
1427
1428 return 1;
1429}
1430
9652d740 1431Varlink* varlink_close_unref(Varlink *v) {
9652d740
LP
1432 if (!v)
1433 return NULL;
1434
cc6b0a18 1435 (void) varlink_close(v);
9652d740
LP
1436 return varlink_unref(v);
1437}
1438
d41bd96f 1439Varlink* varlink_flush_close_unref(Varlink *v) {
cc6b0a18
LP
1440 if (!v)
1441 return NULL;
d41bd96f 1442
cc6b0a18 1443 (void) varlink_flush(v);
39ad3f1c 1444 return varlink_close_unref(v);
d41bd96f
LP
1445}
1446
d37cdac6 1447static int varlink_format_json(Varlink *v, JsonVariant *m) {
db1f7c84 1448 _cleanup_(erase_and_freep) char *text = NULL;
d41bd96f
LP
1449 int r;
1450
1451 assert(v);
1452 assert(m);
1453
1454 r = json_variant_format(m, 0, &text);
1455 if (r < 0)
1456 return r;
2a04712c 1457 assert(text[r] == '\0');
d41bd96f
LP
1458
1459 if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX)
1460 return -ENOBUFS;
1461
1462 varlink_log(v, "Sending message: %s", text);
1463
1464 if (v->output_buffer_size == 0) {
1465
1466 free_and_replace(v->output_buffer, text);
1467
319a4f4b 1468 v->output_buffer_size = r + 1;
d41bd96f
LP
1469 v->output_buffer_index = 0;
1470
1471 } else if (v->output_buffer_index == 0) {
1472
319a4f4b 1473 if (!GREEDY_REALLOC(v->output_buffer, v->output_buffer_size + r + 1))
d41bd96f
LP
1474 return -ENOMEM;
1475
1476 memcpy(v->output_buffer + v->output_buffer_size, text, r + 1);
1477 v->output_buffer_size += r + 1;
d41bd96f
LP
1478 } else {
1479 char *n;
be44e091 1480 const size_t new_size = v->output_buffer_size + r + 1;
d41bd96f 1481
be44e091 1482 n = new(char, new_size);
d41bd96f
LP
1483 if (!n)
1484 return -ENOMEM;
1485
1486 memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, r + 1);
1487
1488 free_and_replace(v->output_buffer, n);
319a4f4b 1489 v->output_buffer_size = new_size;
d41bd96f
LP
1490 v->output_buffer_index = 0;
1491 }
1492
db1f7c84
LP
1493 if (json_variant_is_sensitive(m))
1494 v->output_buffer_sensitive = true; /* Propagate sensitive flag */
1495 else
1496 text = mfree(text); /* No point in the erase_and_free() destructor declared above */
1497
d41bd96f
LP
1498 return 0;
1499}
1500
d37cdac6
LP
1501static int varlink_enqueue_json(Varlink *v, JsonVariant *m) {
1502 VarlinkJsonQueueItem *q;
1503
1504 assert(v);
1505 assert(m);
1506
94d82b59 1507 /* If there are no file descriptors to be queued and no queue entries yet we can shortcut things and
d37cdac6
LP
1508 * append this entry directly to the output buffer */
1509 if (v->n_pushed_fds == 0 && !v->output_queue)
1510 return varlink_format_json(v, m);
1511
1512 /* Otherwise add a queue entry for this */
1513 q = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds);
1514 if (!q)
1515 return -ENOMEM;
1516
1517 v->n_pushed_fds = 0; /* fds now belong to the queue entry */
1518
1519 LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q);
1520 v->output_queue_tail = q;
1521 return 0;
1522}
1523
1524static int varlink_format_queue(Varlink *v) {
1525 int r;
1526
1527 assert(v);
1528
1529 /* Takes entries out of the output queue and formats them into the output buffer. But only if this
1530 * would not corrupt our fd message boundaries */
1531
1532 while (v->output_queue) {
1533 _cleanup_free_ int *array = NULL;
1534 VarlinkJsonQueueItem *q = v->output_queue;
1535
1536 if (v->n_output_fds > 0) /* unwritten fds? if we'd add more we'd corrupt the fd message boundaries, hence wait */
1537 return 0;
1538
1539 if (q->n_fds > 0) {
1540 array = newdup(int, q->fds, q->n_fds);
1541 if (!array)
1542 return -ENOMEM;
1543 }
1544
1545 r = varlink_format_json(v, q->data);
1546 if (r < 0)
1547 return r;
1548
1549 /* Take possession of the queue element's fds */
1550 free(v->output_fds);
1551 v->output_fds = TAKE_PTR(array);
1552 v->n_output_fds = q->n_fds;
1553 q->n_fds = 0;
1554
1555 LIST_REMOVE(queue, v->output_queue, q);
1556 if (!v->output_queue)
1557 v->output_queue_tail = NULL;
1558
1559 varlink_json_queue_item_free(q);
1560 }
1561
1562 return 0;
1563}
1564
d41bd96f
LP
1565int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) {
1566 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1567 int r;
1568
1569 assert_return(v, -EINVAL);
1570 assert_return(method, -EINVAL);
1571
1572 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1573 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
45a6c965
LP
1574
1575 /* We allow enqueuing multiple method calls at once! */
d41bd96f 1576 if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
db3d4222 1577 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
d41bd96f
LP
1578
1579 r = varlink_sanitize_parameters(&parameters);
1580 if (r < 0)
db3d4222 1581 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
d41bd96f
LP
1582
1583 r = json_build(&m, JSON_BUILD_OBJECT(
1584 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1585 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1586 JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true))));
1587 if (r < 0)
db3d4222 1588 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1589
1590 r = varlink_enqueue_json(v, m);
1591 if (r < 0)
db3d4222 1592 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
d41bd96f
LP
1593
1594 /* No state change here, this is one-way only after all */
1595 v->timestamp = now(CLOCK_MONOTONIC);
1596 return 0;
1597}
1598
1599int varlink_sendb(Varlink *v, const char *method, ...) {
1600 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1601 va_list ap;
1602 int r;
1603
1604 assert_return(v, -EINVAL);
1605
1606 va_start(ap, method);
1607 r = json_buildv(&parameters, ap);
1608 va_end(ap);
1609
1610 if (r < 0)
db3d4222 1611 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1612
1613 return varlink_send(v, method, parameters);
1614}
1615
1616int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) {
1617 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1618 int r;
1619
1620 assert_return(v, -EINVAL);
1621 assert_return(method, -EINVAL);
1622
1623 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1624 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
45a6c965 1625
86b52a39 1626 /* We allow enqueuing multiple method calls at once! */
d41bd96f 1627 if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
db3d4222 1628 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
d41bd96f
LP
1629
1630 r = varlink_sanitize_parameters(&parameters);
1631 if (r < 0)
db3d4222 1632 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
d41bd96f
LP
1633
1634 r = json_build(&m, JSON_BUILD_OBJECT(
1635 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1636 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1637 if (r < 0)
db3d4222 1638 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1639
1640 r = varlink_enqueue_json(v, m);
1641 if (r < 0)
db3d4222 1642 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
d41bd96f
LP
1643
1644 varlink_set_state(v, VARLINK_AWAITING_REPLY);
1645 v->n_pending++;
1646 v->timestamp = now(CLOCK_MONOTONIC);
1647
1648 return 0;
1649}
1650
1651int varlink_invokeb(Varlink *v, const char *method, ...) {
1652 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1653 va_list ap;
1654 int r;
1655
1656 assert_return(v, -EINVAL);
1657
1658 va_start(ap, method);
1659 r = json_buildv(&parameters, ap);
1660 va_end(ap);
1661
1662 if (r < 0)
db3d4222 1663 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1664
1665 return varlink_invoke(v, method, parameters);
45a6c965
LP
1666}
1667
1668int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) {
1669 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1670 int r;
1671
1672 assert_return(v, -EINVAL);
1673 assert_return(method, -EINVAL);
1674
1675 if (v->state == VARLINK_DISCONNECTED)
db3d4222
ZJS
1676 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
1677
45a6c965
LP
1678 /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
1679 * thus insist on an idle client here. */
1680 if (v->state != VARLINK_IDLE_CLIENT)
db3d4222 1681 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
45a6c965
LP
1682
1683 r = varlink_sanitize_parameters(&parameters);
1684 if (r < 0)
db3d4222 1685 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
45a6c965
LP
1686
1687 r = json_build(&m, JSON_BUILD_OBJECT(
1688 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1689 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1690 JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
1691 if (r < 0)
db3d4222 1692 return varlink_log_errno(v, r, "Failed to build json message: %m");
45a6c965
LP
1693
1694 r = varlink_enqueue_json(v, m);
1695 if (r < 0)
db3d4222 1696 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
45a6c965
LP
1697
1698 varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
1699 v->n_pending++;
1700 v->timestamp = now(CLOCK_MONOTONIC);
1701
1702 return 0;
1703}
1704
1705int varlink_observeb(Varlink *v, const char *method, ...) {
1706 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1707 va_list ap;
1708 int r;
1709
1710 assert_return(v, -EINVAL);
1711
1712 va_start(ap, method);
1713 r = json_buildv(&parameters, ap);
1714 va_end(ap);
1715
1716 if (r < 0)
db3d4222 1717 return varlink_log_errno(v, r, "Failed to build json message: %m");
45a6c965
LP
1718
1719 return varlink_observe(v, method, parameters);
d41bd96f
LP
1720}
1721
1722int varlink_call(
1723 Varlink *v,
1724 const char *method,
1725 JsonVariant *parameters,
1726 JsonVariant **ret_parameters,
1727 const char **ret_error_id,
1728 VarlinkReplyFlags *ret_flags) {
1729
1730 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1731 int r;
1732
1733 assert_return(v, -EINVAL);
1734 assert_return(method, -EINVAL);
1735
1736 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1737 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
4f06325c 1738 if (v->state != VARLINK_IDLE_CLIENT)
db3d4222 1739 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
d41bd96f
LP
1740
1741 assert(v->n_pending == 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
1742
d37cdac6
LP
1743 /* If there was still a reply pinned from a previous call, now it's the time to get rid of it, so
1744 * that we can assign a new reply shortly. */
790446bd 1745 varlink_clear_current(v);
85316317 1746
d41bd96f
LP
1747 r = varlink_sanitize_parameters(&parameters);
1748 if (r < 0)
db3d4222 1749 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
d41bd96f
LP
1750
1751 r = json_build(&m, JSON_BUILD_OBJECT(
1752 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1753 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1754 if (r < 0)
db3d4222 1755 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1756
1757 r = varlink_enqueue_json(v, m);
1758 if (r < 0)
db3d4222 1759 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
d41bd96f
LP
1760
1761 varlink_set_state(v, VARLINK_CALLING);
1762 v->n_pending++;
1763 v->timestamp = now(CLOCK_MONOTONIC);
1764
1765 while (v->state == VARLINK_CALLING) {
1766
1767 r = varlink_process(v);
1768 if (r < 0)
1769 return r;
1770 if (r > 0)
1771 continue;
1772
1773 r = varlink_wait(v, USEC_INFINITY);
1774 if (r < 0)
1775 return r;
1776 }
1777
1778 switch (v->state) {
1779
1780 case VARLINK_CALLED:
1781 assert(v->current);
1782
d41bd96f
LP
1783 varlink_set_state(v, VARLINK_IDLE_CLIENT);
1784 assert(v->n_pending == 1);
1785 v->n_pending--;
1786
1787 if (ret_parameters)
85316317 1788 *ret_parameters = json_variant_by_key(v->current, "parameters");
d41bd96f 1789 if (ret_error_id)
85316317 1790 *ret_error_id = json_variant_string(json_variant_by_key(v->current, "error"));
d41bd96f
LP
1791 if (ret_flags)
1792 *ret_flags = 0;
1793
1794 return 1;
1795
1796 case VARLINK_PENDING_DISCONNECT:
1797 case VARLINK_DISCONNECTED:
db3d4222 1798 return varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET), "Connection was closed.");
d41bd96f
LP
1799
1800 case VARLINK_PENDING_TIMEOUT:
db3d4222 1801 return varlink_log_errno(v, SYNTHETIC_ERRNO(ETIME), "Connection timed out.");
d41bd96f
LP
1802
1803 default:
04499a70 1804 assert_not_reached();
d41bd96f
LP
1805 }
1806}
1807
1808int varlink_callb(
1809 Varlink *v,
1810 const char *method,
1811 JsonVariant **ret_parameters,
1812 const char **ret_error_id,
1813 VarlinkReplyFlags *ret_flags, ...) {
1814
1815 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1816 va_list ap;
1817 int r;
1818
1819 assert_return(v, -EINVAL);
1820
1821 va_start(ap, ret_flags);
1822 r = json_buildv(&parameters, ap);
1823 va_end(ap);
1824
1825 if (r < 0)
db3d4222 1826 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1827
1828 return varlink_call(v, method, parameters, ret_parameters, ret_error_id, ret_flags);
1829}
1830
1831int varlink_reply(Varlink *v, JsonVariant *parameters) {
1832 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1833 int r;
1834
1835 assert_return(v, -EINVAL);
1836
1837 if (v->state == VARLINK_DISCONNECTED)
1838 return -ENOTCONN;
1839 if (!IN_SET(v->state,
1840 VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
1841 VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1842 return -EBUSY;
1843
1844 r = varlink_sanitize_parameters(&parameters);
1845 if (r < 0)
db3d4222 1846 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
d41bd96f
LP
1847
1848 r = json_build(&m, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1849 if (r < 0)
db3d4222 1850 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1851
1852 r = varlink_enqueue_json(v, m);
1853 if (r < 0)
db3d4222 1854 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
d41bd96f
LP
1855
1856 if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
1857 /* We just replied to a method call that was let hanging for a while (i.e. we were outside of
1858 * the varlink_dispatch_method() stack frame), which means with this reply we are ready to
1859 * process further messages. */
790446bd 1860 varlink_clear_current(v);
d41bd96f
LP
1861 varlink_set_state(v, VARLINK_IDLE_SERVER);
1862 } else
1863 /* We replied to a method call from within the varlink_dispatch_method() stack frame), which
1864 * means we should it handle the rest of the state engine. */
1865 varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1866
1867 return 1;
1868}
1869
1870int varlink_replyb(Varlink *v, ...) {
1871 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1872 va_list ap;
1873 int r;
1874
1875 assert_return(v, -EINVAL);
1876
1877 va_start(ap, v);
1878 r = json_buildv(&parameters, ap);
1879 va_end(ap);
1880
1881 if (r < 0)
1882 return r;
1883
1884 return varlink_reply(v, parameters);
1885}
1886
1887int varlink_error(Varlink *v, const char *error_id, JsonVariant *parameters) {
1888 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1889 int r;
1890
1891 assert_return(v, -EINVAL);
1892 assert_return(error_id, -EINVAL);
1893
1894 if (v->state == VARLINK_DISCONNECTED)
db3d4222 1895 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f
LP
1896 if (!IN_SET(v->state,
1897 VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
1898 VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
db3d4222 1899 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
d41bd96f 1900
d37cdac6
LP
1901 /* Reset the list of pushed file descriptors before sending an error reply. We do this here to
1902 * simplify code that puts together a complex reply message with fds, and half-way something
1903 * fails. In that case the pushed fds need to be flushed out again. Under the assumption that it
1904 * never makes sense to send fds along with errors we simply flush them out here beforehand, so that
1905 * the callers don't need to do this explicitly. */
1906 varlink_reset_fds(v);
1907
d41bd96f
LP
1908 r = varlink_sanitize_parameters(&parameters);
1909 if (r < 0)
db3d4222 1910 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
d41bd96f
LP
1911
1912 r = json_build(&m, JSON_BUILD_OBJECT(
1913 JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id)),
1914 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1915 if (r < 0)
db3d4222 1916 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1917
1918 r = varlink_enqueue_json(v, m);
1919 if (r < 0)
db3d4222 1920 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
d41bd96f
LP
1921
1922 if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
790446bd 1923 varlink_clear_current(v);
d41bd96f
LP
1924 varlink_set_state(v, VARLINK_IDLE_SERVER);
1925 } else
1926 varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1927
1928 return 1;
1929}
1930
1931int varlink_errorb(Varlink *v, const char *error_id, ...) {
1932 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1933 va_list ap;
1934 int r;
1935
1936 assert_return(v, -EINVAL);
1937 assert_return(error_id, -EINVAL);
1938
1939 va_start(ap, error_id);
1940 r = json_buildv(&parameters, ap);
1941 va_end(ap);
1942
1943 if (r < 0)
db3d4222 1944 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
1945
1946 return varlink_error(v, error_id, parameters);
1947}
1948
1949int varlink_error_invalid_parameter(Varlink *v, JsonVariant *parameters) {
e8aba093 1950 int r;
d41bd96f
LP
1951
1952 assert_return(v, -EINVAL);
1953 assert_return(parameters, -EINVAL);
1954
1955 /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which
1956 * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object
1957 * variant in which case we'll pull out the first key. The latter mode is useful in functions that
1958 * don't expect any arguments. */
1959
e8aba093
VCS
1960 /* varlink_error(...) expects a json object as the third parameter. Passing a string variant causes
1961 * parameter sanitization to fail, and it returns -EINVAL. */
1962
1963 if (json_variant_is_string(parameters)) {
1964 _cleanup_(json_variant_unrefp) JsonVariant *parameters_obj = NULL;
1965
1966 r = json_build(&parameters_obj,
1967 JSON_BUILD_OBJECT(
1968 JSON_BUILD_PAIR("parameter", JSON_BUILD_VARIANT(parameters))));
1969 if (r < 0)
1970 return r;
1971
1972 return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER, parameters_obj);
1973 }
d41bd96f
LP
1974
1975 if (json_variant_is_object(parameters) &&
e8aba093
VCS
1976 json_variant_elements(parameters) > 0) {
1977 _cleanup_(json_variant_unrefp) JsonVariant *parameters_obj = NULL;
1978
1979 r = json_build(&parameters_obj,
1980 JSON_BUILD_OBJECT(
1981 JSON_BUILD_PAIR("parameter", JSON_BUILD_VARIANT(json_variant_by_index(parameters, 0)))));
1982 if (r < 0)
1983 return r;
1984
1985 return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER, parameters_obj);
1986 }
d41bd96f
LP
1987
1988 return -EINVAL;
1989}
1990
7466e94f
LP
1991int varlink_error_errno(Varlink *v, int error) {
1992 return varlink_errorb(
1993 v,
1994 VARLINK_ERROR_SYSTEM,
1995 JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(abs(error)))));
1996}
1997
d41bd96f
LP
1998int varlink_notify(Varlink *v, JsonVariant *parameters) {
1999 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
2000 int r;
2001
2002 assert_return(v, -EINVAL);
2003
2004 if (v->state == VARLINK_DISCONNECTED)
db3d4222 2005 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
d41bd96f 2006 if (!IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE))
db3d4222 2007 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
d41bd96f
LP
2008
2009 r = varlink_sanitize_parameters(&parameters);
2010 if (r < 0)
db3d4222 2011 return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
d41bd96f
LP
2012
2013 r = json_build(&m, JSON_BUILD_OBJECT(
2014 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
2015 JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true))));
2016 if (r < 0)
db3d4222 2017 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
2018
2019 r = varlink_enqueue_json(v, m);
2020 if (r < 0)
db3d4222 2021 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
d41bd96f
LP
2022
2023 /* No state change, as more is coming */
2024 return 1;
2025}
2026
2027int varlink_notifyb(Varlink *v, ...) {
2028 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
2029 va_list ap;
2030 int r;
2031
2032 assert_return(v, -EINVAL);
2033
2034 va_start(ap, v);
2035 r = json_buildv(&parameters, ap);
2036 va_end(ap);
2037
2038 if (r < 0)
db3d4222 2039 return varlink_log_errno(v, r, "Failed to build json message: %m");
d41bd96f
LP
2040
2041 return varlink_notify(v, parameters);
2042}
2043
2044int varlink_bind_reply(Varlink *v, VarlinkReply callback) {
2045 assert_return(v, -EINVAL);
2046
2047 if (callback && v->reply_callback && callback != v->reply_callback)
db3d4222 2048 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "A different callback was already set.");
d41bd96f
LP
2049
2050 v->reply_callback = callback;
2051
2052 return 0;
2053}
2054
2055void* varlink_set_userdata(Varlink *v, void *userdata) {
2056 void *old;
2057
2058 assert_return(v, NULL);
2059
2060 old = v->userdata;
2061 v->userdata = userdata;
2062
2063 return old;
2064}
2065
2066void* varlink_get_userdata(Varlink *v) {
2067 assert_return(v, NULL);
2068
2069 return v->userdata;
2070}
2071
2072static int varlink_acquire_ucred(Varlink *v) {
2073 int r;
2074
2075 assert(v);
2076
2077 if (v->ucred_acquired)
2078 return 0;
2079
2080 r = getpeercred(v->fd, &v->ucred);
2081 if (r < 0)
2082 return r;
2083
2084 v->ucred_acquired = true;
2085 return 0;
2086}
2087
2088int varlink_get_peer_uid(Varlink *v, uid_t *ret) {
2089 int r;
2090
2091 assert_return(v, -EINVAL);
2092 assert_return(ret, -EINVAL);
2093
2094 r = varlink_acquire_ucred(v);
2095 if (r < 0)
db3d4222 2096 return varlink_log_errno(v, r, "Failed to acquire credentials: %m");
d41bd96f
LP
2097
2098 if (!uid_is_valid(v->ucred.uid))
db3d4222 2099 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer uid is invalid.");
d41bd96f
LP
2100
2101 *ret = v->ucred.uid;
2102 return 0;
2103}
2104
2105int varlink_get_peer_pid(Varlink *v, pid_t *ret) {
2106 int r;
2107
2108 assert_return(v, -EINVAL);
2109 assert_return(ret, -EINVAL);
2110
2111 r = varlink_acquire_ucred(v);
2112 if (r < 0)
db3d4222 2113 return varlink_log_errno(v, r, "Failed to acquire credentials: %m");
d41bd96f
LP
2114
2115 if (!pid_is_valid(v->ucred.pid))
db3d4222 2116 return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer uid is invalid.");
d41bd96f
LP
2117
2118 *ret = v->ucred.pid;
2119 return 0;
2120}
2121
2122int varlink_set_relative_timeout(Varlink *v, usec_t timeout) {
2123 assert_return(v, -EINVAL);
2124 assert_return(timeout > 0, -EINVAL);
2125
2126 v->timeout = timeout;
2127 return 0;
2128}
2129
2130VarlinkServer *varlink_get_server(Varlink *v) {
2131 assert_return(v, NULL);
2132
2133 return v->server;
2134}
2135
2136int varlink_set_description(Varlink *v, const char *description) {
2137 assert_return(v, -EINVAL);
2138
2139 return free_and_strdup(&v->description, description);
2140}
2141
2142static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
99534007 2143 Varlink *v = ASSERT_PTR(userdata);
d41bd96f
LP
2144
2145 assert(s);
d41bd96f
LP
2146
2147 handle_revents(v, revents);
2148 (void) varlink_process(v);
2149
2150 return 1;
2151}
2152
2153static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
99534007 2154 Varlink *v = ASSERT_PTR(userdata);
d41bd96f
LP
2155
2156 assert(s);
d41bd96f
LP
2157
2158 (void) varlink_process(v);
2159 return 1;
2160}
2161
2162static int defer_callback(sd_event_source *s, void *userdata) {
99534007 2163 Varlink *v = ASSERT_PTR(userdata);
d41bd96f
LP
2164
2165 assert(s);
d41bd96f
LP
2166
2167 (void) varlink_process(v);
2168 return 1;
2169}
2170
2171static int prepare_callback(sd_event_source *s, void *userdata) {
99534007 2172 Varlink *v = ASSERT_PTR(userdata);
d41bd96f
LP
2173 int r, e;
2174 usec_t until;
f1194f5d 2175 bool have_timeout;
d41bd96f
LP
2176
2177 assert(s);
d41bd96f
LP
2178
2179 e = varlink_get_events(v);
2180 if (e < 0)
2181 return e;
2182
2183 r = sd_event_source_set_io_events(v->io_event_source, e);
2184 if (r < 0)
db3d4222 2185 return varlink_log_errno(v, r, "Failed to set source events: %m");
d41bd96f
LP
2186
2187 r = varlink_get_timeout(v, &until);
2188 if (r < 0)
2189 return r;
f1194f5d
LP
2190 have_timeout = r > 0;
2191
2192 if (have_timeout) {
d41bd96f
LP
2193 r = sd_event_source_set_time(v->time_event_source, until);
2194 if (r < 0)
db3d4222 2195 return varlink_log_errno(v, r, "Failed to set source time: %m");
d41bd96f
LP
2196 }
2197
f1194f5d 2198 r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF);
d41bd96f 2199 if (r < 0)
db3d4222 2200 return varlink_log_errno(v, r, "Failed to enable event source: %m");
d41bd96f
LP
2201
2202 return 1;
2203}
2204
2205static int quit_callback(sd_event_source *event, void *userdata) {
99534007 2206 Varlink *v = ASSERT_PTR(userdata);
d41bd96f
LP
2207
2208 assert(event);
d41bd96f
LP
2209
2210 varlink_flush(v);
2211 varlink_close(v);
2212
2213 return 1;
2214}
2215
2216int varlink_attach_event(Varlink *v, sd_event *e, int64_t priority) {
2217 int r;
2218
2219 assert_return(v, -EINVAL);
2220 assert_return(!v->event, -EBUSY);
2221
2222 if (e)
2223 v->event = sd_event_ref(e);
2224 else {
2225 r = sd_event_default(&v->event);
2226 if (r < 0)
db3d4222 2227 return varlink_log_errno(v, r, "Failed to create event source: %m");
d41bd96f
LP
2228 }
2229
2230 r = sd_event_add_time(v->event, &v->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, v);
2231 if (r < 0)
2232 goto fail;
2233
2234 r = sd_event_source_set_priority(v->time_event_source, priority);
2235 if (r < 0)
2236 goto fail;
2237
2238 (void) sd_event_source_set_description(v->time_event_source, "varlink-time");
2239
2240 r = sd_event_add_exit(v->event, &v->quit_event_source, quit_callback, v);
2241 if (r < 0)
2242 goto fail;
2243
2244 r = sd_event_source_set_priority(v->quit_event_source, priority);
2245 if (r < 0)
2246 goto fail;
2247
2248 (void) sd_event_source_set_description(v->quit_event_source, "varlink-quit");
2249
2250 r = sd_event_add_io(v->event, &v->io_event_source, v->fd, 0, io_callback, v);
2251 if (r < 0)
2252 goto fail;
2253
2254 r = sd_event_source_set_prepare(v->io_event_source, prepare_callback);
2255 if (r < 0)
2256 goto fail;
2257
2258 r = sd_event_source_set_priority(v->io_event_source, priority);
2259 if (r < 0)
2260 goto fail;
2261
2262 (void) sd_event_source_set_description(v->io_event_source, "varlink-io");
2263
2264 r = sd_event_add_defer(v->event, &v->defer_event_source, defer_callback, v);
2265 if (r < 0)
2266 goto fail;
2267
2268 r = sd_event_source_set_priority(v->defer_event_source, priority);
2269 if (r < 0)
2270 goto fail;
2271
2272 (void) sd_event_source_set_description(v->defer_event_source, "varlink-defer");
2273
2274 return 0;
2275
2276fail:
db3d4222 2277 varlink_log_errno(v, r, "Failed to setup event source: %m");
d41bd96f
LP
2278 varlink_detach_event(v);
2279 return r;
2280}
2281
d41bd96f
LP
2282void varlink_detach_event(Varlink *v) {
2283 if (!v)
2284 return;
2285
2286 varlink_detach_event_sources(v);
2287
2288 v->event = sd_event_unref(v->event);
2289}
2290
2291sd_event *varlink_get_event(Varlink *v) {
2292 assert_return(v, NULL);
2293
2294 return v->event;
2295}
2296
d37cdac6
LP
2297int varlink_push_fd(Varlink *v, int fd) {
2298 int i;
2299
2300 assert_return(v, -EINVAL);
2301 assert_return(fd >= 0, -EBADF);
2302
2303 /* Takes an fd to send along with the *next* varlink message sent via this varlink connection. This
2304 * takes ownership of the specified fd. Use varlink_dup_fd() below to duplicate the fd first. */
2305
2306 if (!v->allow_fd_passing_output)
2307 return -EPERM;
2308
2309 if (v->n_pushed_fds >= INT_MAX)
2310 return -ENOMEM;
2311
2312 if (!GREEDY_REALLOC(v->pushed_fds, v->n_pushed_fds + 1))
2313 return -ENOMEM;
2314
2315 i = (int) v->n_pushed_fds;
2316 v->pushed_fds[v->n_pushed_fds++] = fd;
2317 return i;
2318}
2319
2320int varlink_dup_fd(Varlink *v, int fd) {
2321 _cleanup_close_ int dp = -1;
2322 int r;
2323
2324 assert_return(v, -EINVAL);
2325 assert_return(fd >= 0, -EBADF);
2326
2327 /* Like varlink_push_fd() but duplicates the specified fd instead of taking possession of it */
2328
2329 dp = fcntl(fd, F_DUPFD_CLOEXEC, 3);
2330 if (dp < 0)
2331 return -errno;
2332
2333 r = varlink_push_fd(v, dp);
2334 if (r < 0)
2335 return r;
2336
2337 TAKE_FD(dp);
2338 return r;
2339}
2340
2341int varlink_reset_fds(Varlink *v) {
2342 assert_return(v, -EINVAL);
2343
2344 /* Closes all currently pending fds to send. This may be used whenever the caller is in the process
2345 * of putting together a message with fds, and then eventually something fails and they need to
2346 * rollback the fds. Note that this is implicitly called whenever an error reply is sent, see above. */
2347
2348 close_many(v->output_fds, v->n_output_fds);
2349 v->n_output_fds = 0;
2350 return 0;
2351}
2352
2353int varlink_peek_fd(Varlink *v, size_t i) {
2354 assert_return(v, -EINVAL);
2355
94d82b59 2356 /* Returns one of the file descriptors that were received along with the current message. This does
d37cdac6
LP
2357 * not duplicate the fd nor invalidate it, it hence remains in our possession. */
2358
2359 if (!v->allow_fd_passing_input)
2360 return -EPERM;
2361
2362 if (i >= v->n_input_fds)
2363 return -ENXIO;
2364
2365 return v->input_fds[i];
2366}
2367
2368int varlink_take_fd(Varlink *v, size_t i) {
2369 assert_return(v, -EINVAL);
2370
2371 /* Similar to varlink_peek_fd() but the file descriptor's ownership is passed to the caller, and
2372 * we'll invalidate the reference to it under our possession. If called twice in a row will return
2373 * -EBADF */
2374
2375 if (!v->allow_fd_passing_input)
2376 return -EPERM;
2377
2378 if (i >= v->n_input_fds)
2379 return -ENXIO;
2380
2381 return TAKE_FD(v->input_fds[i]);
2382}
2383
2384static int verify_unix_socket(Varlink *v) {
2385 assert(v);
2386
2387 if (v->af < 0) {
2388 struct stat st;
2389
2390 if (fstat(v->fd, &st) < 0)
2391 return -errno;
2392 if (!S_ISSOCK(st.st_mode)) {
2393 v->af = AF_UNSPEC;
2394 return -ENOTSOCK;
2395 }
2396
2397 v->af = socket_get_family(v->fd);
2398 if (v->af < 0)
2399 return v->af;
2400 }
2401
2402 return v->af == AF_UNIX ? 0 : -ENOMEDIUM;
2403}
2404
2405int varlink_set_allow_fd_passing_input(Varlink *v, bool b) {
2406 int r;
2407
2408 assert_return(v, -EINVAL);
2409
2410 if (v->allow_fd_passing_input == b)
2411 return 0;
2412
2413 if (!b) {
2414 v->allow_fd_passing_input = false;
2415 return 1;
2416 }
2417
2418 r = verify_unix_socket(v);
2419 if (r < 0)
2420 return r;
2421
2422 v->allow_fd_passing_input = true;
2423 return 0;
2424}
2425
2426int varlink_set_allow_fd_passing_output(Varlink *v, bool b) {
2427 int r;
2428
2429 assert_return(v, -EINVAL);
2430
2431 if (v->allow_fd_passing_output == b)
2432 return 0;
2433
2434 if (!b) {
2435 v->allow_fd_passing_output = false;
2436 return 1;
2437 }
2438
2439 r = verify_unix_socket(v);
2440 if (r < 0)
2441 return r;
2442
2443 v->allow_fd_passing_output = true;
2444 return 0;
2445}
2446
d41bd96f
LP
2447int varlink_server_new(VarlinkServer **ret, VarlinkServerFlags flags) {
2448 VarlinkServer *s;
2449
2450 assert_return(ret, -EINVAL);
2451 assert_return((flags & ~_VARLINK_SERVER_FLAGS_ALL) == 0, -EINVAL);
2452
2453 s = new(VarlinkServer, 1);
2454 if (!s)
db3d4222 2455 return log_oom_debug();
d41bd96f
LP
2456
2457 *s = (VarlinkServer) {
2458 .n_ref = 1,
2459 .flags = flags,
2460 .connections_max = varlink_server_connections_max(NULL),
2461 .connections_per_uid_max = varlink_server_connections_per_uid_max(NULL),
2462 };
2463
2464 *ret = s;
2465 return 0;
2466}
2467
2468static VarlinkServer* varlink_server_destroy(VarlinkServer *s) {
2469 char *m;
2470
2471 if (!s)
2472 return NULL;
2473
2474 varlink_server_shutdown(s);
2475
2476 while ((m = hashmap_steal_first_key(s->methods)))
2477 free(m);
2478
2479 hashmap_free(s->methods);
2480 hashmap_free(s->by_uid);
2481
2482 sd_event_unref(s->event);
2483
2484 free(s->description);
2485
2486 return mfree(s);
2487}
2488
2489DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer, varlink_server, varlink_server_destroy);
2490
2491static int validate_connection(VarlinkServer *server, const struct ucred *ucred) {
2492 int allowed = -1;
2493
2494 assert(server);
2495 assert(ucred);
2496
2497 if (FLAGS_SET(server->flags, VARLINK_SERVER_ROOT_ONLY))
2498 allowed = ucred->uid == 0;
2499
2500 if (FLAGS_SET(server->flags, VARLINK_SERVER_MYSELF_ONLY))
2501 allowed = allowed > 0 || ucred->uid == getuid();
2502
2503 if (allowed == 0) { /* Allow access when it is explicitly allowed or when neither
2504 * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */
2505 varlink_server_log(server, "Unprivileged client attempted connection, refusing.");
2506 return 0;
2507 }
2508
2509 if (server->n_connections >= server->connections_max) {
2510 varlink_server_log(server, "Connection limit of %u reached, refusing.", server->connections_max);
2511 return 0;
2512 }
2513
2514 if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) {
2515 unsigned c;
2516
2517 if (!uid_is_valid(ucred->uid)) {
2518 varlink_server_log(server, "Client with invalid UID attempted connection, refusing.");
2519 return 0;
2520 }
2521
2522 c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid)));
2523 if (c >= server->connections_per_uid_max) {
2524 varlink_server_log(server, "Per-UID connection limit of %u reached, refusing.",
2525 server->connections_per_uid_max);
2526 return 0;
2527 }
2528 }
2529
2530 return 1;
2531}
2532
678ca213 2533static int count_connection(VarlinkServer *server, const struct ucred *ucred) {
d41bd96f
LP
2534 unsigned c;
2535 int r;
2536
2537 assert(server);
2538 assert(ucred);
2539
2540 server->n_connections++;
2541
2542 if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) {
2543 r = hashmap_ensure_allocated(&server->by_uid, NULL);
2544 if (r < 0)
2545 return log_debug_errno(r, "Failed to allocate UID hash table: %m");
2546
2547 c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid)));
2548
2549 varlink_server_log(server, "Connections of user " UID_FMT ": %u (of %u max)",
2550 ucred->uid, c, server->connections_per_uid_max);
2551
2552 r = hashmap_replace(server->by_uid, UID_TO_PTR(ucred->uid), UINT_TO_PTR(c + 1));
2553 if (r < 0)
2554 return log_debug_errno(r, "Failed to increment counter in UID hash table: %m");
2555 }
2556
2557 return 0;
2558}
2559
2560int varlink_server_add_connection(VarlinkServer *server, int fd, Varlink **ret) {
2561 _cleanup_(varlink_unrefp) Varlink *v = NULL;
a995ce47 2562 struct ucred ucred = UCRED_INVALID;
d41bd96f 2563 bool ucred_acquired;
d41bd96f
LP
2564 int r;
2565
2566 assert_return(server, -EINVAL);
2567 assert_return(fd >= 0, -EBADF);
2568
2569 if ((server->flags & (VARLINK_SERVER_ROOT_ONLY|VARLINK_SERVER_ACCOUNT_UID)) != 0) {
2570 r = getpeercred(fd, &ucred);
2571 if (r < 0)
2572 return varlink_server_log_errno(server, r, "Failed to acquire peer credentials of incoming socket, refusing: %m");
2573
2574 ucred_acquired = true;
2575
2576 r = validate_connection(server, &ucred);
2577 if (r < 0)
2578 return r;
2579 if (r == 0)
2580 return -EPERM;
2581 } else
2582 ucred_acquired = false;
2583
2584 r = varlink_new(&v);
2585 if (r < 0)
2586 return varlink_server_log_errno(server, r, "Failed to allocate connection object: %m");
2587
2588 r = count_connection(server, &ucred);
2589 if (r < 0)
2590 return r;
2591
2592 v->fd = fd;
9807fdc1
LP
2593 if (server->flags & VARLINK_SERVER_INHERIT_USERDATA)
2594 v->userdata = server->userdata;
2595
d41bd96f
LP
2596 if (ucred_acquired) {
2597 v->ucred = ucred;
2598 v->ucred_acquired = true;
2599 }
2600
12619d0a
ZJS
2601 _cleanup_free_ char *desc = NULL;
2602 if (asprintf(&desc, "%s-%i", server->description ?: "varlink", v->fd) >= 0)
2603 v->description = TAKE_PTR(desc);
d41bd96f
LP
2604
2605 /* Link up the server and the connection, and take reference in both directions. Note that the
2606 * reference on the connection is left dangling. It will be dropped when the connection is closed,
2607 * which happens in varlink_close(), including in the event loop quit callback. */
2608 v->server = varlink_server_ref(server);
2609 varlink_ref(v);
2610
2611 varlink_set_state(v, VARLINK_IDLE_SERVER);
2612
7e69d90c
LP
2613 if (server->event) {
2614 r = varlink_attach_event(v, server->event, server->event_priority);
2615 if (r < 0) {
2616 varlink_log_errno(v, r, "Failed to attach new connection: %m");
254d1313 2617 v->fd = -EBADF; /* take the fd out of the connection again */
7e69d90c
LP
2618 varlink_close(v);
2619 return r;
2620 }
d41bd96f
LP
2621 }
2622
2623 if (ret)
2624 *ret = v;
2625
2626 return 0;
2627}
2628
c14e841f
AZ
2629static VarlinkServerSocket *varlink_server_socket_free(VarlinkServerSocket *ss) {
2630 if (!ss)
2631 return NULL;
2632
2633 free(ss->address);
2634 return mfree(ss);
2635}
2636
2637DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkServerSocket *, varlink_server_socket_free);
2638
d41bd96f 2639static int connect_callback(sd_event_source *source, int fd, uint32_t revents, void *userdata) {
99534007 2640 VarlinkServerSocket *ss = ASSERT_PTR(userdata);
254d1313 2641 _cleanup_close_ int cfd = -EBADF;
d41bd96f
LP
2642 Varlink *v = NULL;
2643 int r;
2644
2645 assert(source);
d41bd96f
LP
2646
2647 varlink_server_log(ss->server, "New incoming connection.");
2648
2649 cfd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
2650 if (cfd < 0) {
2651 if (ERRNO_IS_ACCEPT_AGAIN(errno))
2652 return 0;
2653
2654 return varlink_server_log_errno(ss->server, errno, "Failed to accept incoming socket: %m");
2655 }
2656
2657 r = varlink_server_add_connection(ss->server, cfd, &v);
2658 if (r < 0)
2659 return 0;
2660
2661 TAKE_FD(cfd);
2662
2663 if (ss->server->connect_callback) {
2664 r = ss->server->connect_callback(ss->server, v, ss->server->userdata);
2665 if (r < 0) {
2666 varlink_log_errno(v, r, "Connection callback returned error, disconnecting client: %m");
2667 varlink_close(v);
2668 return 0;
2669 }
2670 }
2671
2672 return 0;
2673}
2674
c14e841f
AZ
2675static int varlink_server_create_listen_fd_socket(VarlinkServer *s, int fd, VarlinkServerSocket **ret_ss) {
2676 _cleanup_(varlink_server_socket_freep) VarlinkServerSocket *ss = NULL;
d41bd96f
LP
2677 int r;
2678
c14e841f
AZ
2679 assert(s);
2680 assert(fd >= 0);
2681 assert(ret_ss);
d41bd96f 2682
d41bd96f
LP
2683 ss = new(VarlinkServerSocket, 1);
2684 if (!ss)
db3d4222 2685 return log_oom_debug();
d41bd96f
LP
2686
2687 *ss = (VarlinkServerSocket) {
2688 .server = s,
2689 .fd = fd,
2690 };
2691
2692 if (s->event) {
8d91b220 2693 r = sd_event_add_io(s->event, &ss->event_source, fd, EPOLLIN, connect_callback, ss);
d41bd96f
LP
2694 if (r < 0)
2695 return r;
2696
2697 r = sd_event_source_set_priority(ss->event_source, s->event_priority);
2698 if (r < 0)
2699 return r;
2700 }
2701
c14e841f
AZ
2702 *ret_ss = TAKE_PTR(ss);
2703 return 0;
2704}
2705
2706int varlink_server_listen_fd(VarlinkServer *s, int fd) {
2707 _cleanup_(varlink_server_socket_freep) VarlinkServerSocket *ss = NULL;
2708 int r;
2709
2710 assert_return(s, -EINVAL);
2711 assert_return(fd >= 0, -EBADF);
2712
a4edf033
LP
2713 r = fd_nonblock(fd, true);
2714 if (r < 0)
2715 return r;
2716
2717 r = fd_cloexec(fd, true);
2718 if (r < 0)
2719 return r;
2720
c14e841f
AZ
2721 r = varlink_server_create_listen_fd_socket(s, fd, &ss);
2722 if (r < 0)
2723 return r;
2724
d41bd96f
LP
2725 LIST_PREPEND(sockets, s->sockets, TAKE_PTR(ss));
2726 return 0;
2727}
2728
2729int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t m) {
c14e841f 2730 _cleanup_(varlink_server_socket_freep) VarlinkServerSocket *ss = NULL;
d41bd96f 2731 union sockaddr_union sockaddr;
f36a9d59 2732 socklen_t sockaddr_len;
254d1313 2733 _cleanup_close_ int fd = -EBADF;
d41bd96f
LP
2734 int r;
2735
2736 assert_return(s, -EINVAL);
2737 assert_return(address, -EINVAL);
2738 assert_return((m & ~0777) == 0, -EINVAL);
2739
2740 r = sockaddr_un_set_path(&sockaddr.un, address);
2741 if (r < 0)
2742 return r;
f36a9d59 2743 sockaddr_len = r;
d41bd96f
LP
2744
2745 fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
2746 if (fd < 0)
2747 return -errno;
2748
a0c41de2
LP
2749 fd = fd_move_above_stdio(fd);
2750
d41bd96f
LP
2751 (void) sockaddr_un_unlink(&sockaddr.un);
2752
2053593f 2753 WITH_UMASK(~m & 0777) {
63e00ccd
CG
2754 r = mac_selinux_bind(fd, &sockaddr.sa, sockaddr_len);
2755 if (r < 0)
2756 return r;
2757 }
d41bd96f 2758
768fcd77 2759 if (listen(fd, SOMAXCONN_DELUXE) < 0)
d41bd96f
LP
2760 return -errno;
2761
c14e841f 2762 r = varlink_server_create_listen_fd_socket(s, fd, &ss);
d41bd96f
LP
2763 if (r < 0)
2764 return r;
2765
c14e841f
AZ
2766 r = free_and_strdup(&ss->address, address);
2767 if (r < 0)
2768 return r;
2769
2770 LIST_PREPEND(sockets, s->sockets, TAKE_PTR(ss));
d41bd96f
LP
2771 TAKE_FD(fd);
2772 return 0;
2773}
2774
2775void* varlink_server_set_userdata(VarlinkServer *s, void *userdata) {
2776 void *ret;
2777
2778 assert_return(s, NULL);
2779
2780 ret = s->userdata;
2781 s->userdata = userdata;
2782
2783 return ret;
2784}
2785
2786void* varlink_server_get_userdata(VarlinkServer *s) {
2787 assert_return(s, NULL);
2788
2789 return s->userdata;
2790}
2791
2792static VarlinkServerSocket* varlink_server_socket_destroy(VarlinkServerSocket *ss) {
2793 if (!ss)
2794 return NULL;
2795
2796 if (ss->server)
2797 LIST_REMOVE(sockets, ss->server->sockets, ss);
2798
1d3fe304 2799 sd_event_source_disable_unref(ss->event_source);
d41bd96f
LP
2800
2801 free(ss->address);
2802 safe_close(ss->fd);
2803
2804 return mfree(ss);
2805}
2806
2807int varlink_server_shutdown(VarlinkServer *s) {
2808 assert_return(s, -EINVAL);
2809
2810 while (s->sockets)
2811 varlink_server_socket_destroy(s->sockets);
2812
2813 return 0;
2814}
2815
536827e0
AZ
2816static int varlink_server_add_socket_event_source(VarlinkServer *s, VarlinkServerSocket *ss, int64_t priority) {
2817 _cleanup_(sd_event_source_unrefp) sd_event_source *es = NULL;
2818
2819 int r;
2820
2821 assert(s);
2822 assert(s->event);
2823 assert(ss);
2824 assert(ss->fd >= 0);
2825 assert(!ss->event_source);
2826
2827 r = sd_event_add_io(s->event, &es, ss->fd, EPOLLIN, connect_callback, ss);
2828 if (r < 0)
2829 return r;
2830
2831 r = sd_event_source_set_priority(es, priority);
2832 if (r < 0)
2833 return r;
2834
2835 ss->event_source = TAKE_PTR(es);
2836 return 0;
2837}
2838
d41bd96f 2839int varlink_server_attach_event(VarlinkServer *s, sd_event *e, int64_t priority) {
d41bd96f
LP
2840 int r;
2841
2842 assert_return(s, -EINVAL);
2843 assert_return(!s->event, -EBUSY);
2844
2845 if (e)
2846 s->event = sd_event_ref(e);
2847 else {
2848 r = sd_event_default(&s->event);
2849 if (r < 0)
2850 return r;
2851 }
2852
2853 LIST_FOREACH(sockets, ss, s->sockets) {
536827e0 2854 r = varlink_server_add_socket_event_source(s, ss, priority);
d41bd96f
LP
2855 if (r < 0)
2856 goto fail;
2857 }
2858
2859 s->event_priority = priority;
2860 return 0;
2861
2862fail:
2863 varlink_server_detach_event(s);
2864 return r;
2865}
2866
2867int varlink_server_detach_event(VarlinkServer *s) {
d41bd96f
LP
2868 assert_return(s, -EINVAL);
2869
4f538d7b
LP
2870 LIST_FOREACH(sockets, ss, s->sockets)
2871 ss->event_source = sd_event_source_disable_unref(ss->event_source);
d41bd96f
LP
2872
2873 sd_event_unref(s->event);
2874 return 0;
2875}
2876
2877sd_event *varlink_server_get_event(VarlinkServer *s) {
2878 assert_return(s, NULL);
2879
2880 return s->event;
2881}
2882
2883int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMethod callback) {
db3d4222 2884 _cleanup_free_ char *m = NULL;
d41bd96f
LP
2885 int r;
2886
2887 assert_return(s, -EINVAL);
2888 assert_return(method, -EINVAL);
2889 assert_return(callback, -EINVAL);
2890
2891 if (startswith(method, "org.varlink.service."))
db3d4222 2892 return log_debug_errno(SYNTHETIC_ERRNO(EEXIST), "Cannot bind server to '%s'.", method);
d41bd96f 2893
d41bd96f
LP
2894 m = strdup(method);
2895 if (!m)
db3d4222 2896 return log_oom_debug();
d41bd96f 2897
1d2d1654
SS
2898 r = hashmap_ensure_put(&s->methods, &string_hash_ops, m, callback);
2899 if (r == -ENOMEM)
2900 return log_oom_debug();
db3d4222
ZJS
2901 if (r < 0)
2902 return log_debug_errno(r, "Failed to register callback: %m");
2903 if (r > 0)
2904 TAKE_PTR(m);
d41bd96f
LP
2905
2906 return 0;
2907}
2908
2909int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) {
2910 va_list ap;
e7b93f97 2911 int r = 0;
d41bd96f
LP
2912
2913 assert_return(s, -EINVAL);
2914
2915 va_start(ap, s);
2916 for (;;) {
2917 VarlinkMethod callback;
2918 const char *method;
2919
2920 method = va_arg(ap, const char *);
2921 if (!method)
2922 break;
2923
2924 callback = va_arg(ap, VarlinkMethod);
2925
2926 r = varlink_server_bind_method(s, method, callback);
2927 if (r < 0)
e7b93f97 2928 break;
d41bd96f 2929 }
e7b93f97 2930 va_end(ap);
d41bd96f 2931
e7b93f97 2932 return r;
d41bd96f
LP
2933}
2934
2935int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) {
2936 assert_return(s, -EINVAL);
2937
2938 if (callback && s->connect_callback && callback != s->connect_callback)
db3d4222 2939 return log_debug_errno(SYNTHETIC_ERRNO(EBUSY), "A different callback was already set.");
d41bd96f
LP
2940
2941 s->connect_callback = callback;
2942 return 0;
2943}
2944
6d4d6002
LP
2945int varlink_server_bind_disconnect(VarlinkServer *s, VarlinkDisconnect callback) {
2946 assert_return(s, -EINVAL);
2947
2948 if (callback && s->disconnect_callback && callback != s->disconnect_callback)
db3d4222 2949 return log_debug_errno(SYNTHETIC_ERRNO(EBUSY), "A different callback was already set.");
6d4d6002
LP
2950
2951 s->disconnect_callback = callback;
2952 return 0;
2953}
2954
d41bd96f 2955unsigned varlink_server_connections_max(VarlinkServer *s) {
88a36d36 2956 int dts;
d41bd96f
LP
2957
2958 /* If a server is specified, return the setting for that server, otherwise the default value */
2959 if (s)
2960 return s->connections_max;
2961
88a36d36
LP
2962 dts = getdtablesize();
2963 assert_se(dts > 0);
d41bd96f
LP
2964
2965 /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
88a36d36
LP
2966 if (VARLINK_DEFAULT_CONNECTIONS_MAX > (unsigned) dts / 4 * 3)
2967 return dts / 4 * 3;
d41bd96f
LP
2968
2969 return VARLINK_DEFAULT_CONNECTIONS_MAX;
2970}
2971
2972unsigned varlink_server_connections_per_uid_max(VarlinkServer *s) {
2973 unsigned m;
2974
2975 if (s)
2976 return s->connections_per_uid_max;
2977
2978 /* Make sure to never use up more than ¾th of available connections for a single user */
2979 m = varlink_server_connections_max(NULL);
2980 if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX > m)
2981 return m / 4 * 3;
2982
2983 return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX;
2984}
2985
2986int varlink_server_set_connections_per_uid_max(VarlinkServer *s, unsigned m) {
2987 assert_return(s, -EINVAL);
2988 assert_return(m > 0, -EINVAL);
2989
2990 s->connections_per_uid_max = m;
2991 return 0;
2992}
2993
2994int varlink_server_set_connections_max(VarlinkServer *s, unsigned m) {
2995 assert_return(s, -EINVAL);
2996 assert_return(m > 0, -EINVAL);
2997
2998 s->connections_max = m;
2999 return 0;
3000}
3001
c4f601f2
LP
3002unsigned varlink_server_current_connections(VarlinkServer *s) {
3003 assert_return(s, UINT_MAX);
3004
3005 return s->n_connections;
3006}
3007
d41bd96f
LP
3008int varlink_server_set_description(VarlinkServer *s, const char *description) {
3009 assert_return(s, -EINVAL);
3010
3011 return free_and_strdup(&s->description, description);
3012}
008798e9
AZ
3013
3014int varlink_server_serialize(VarlinkServer *s, FILE *f, FDSet *fds) {
3015 assert(f);
3016 assert(fds);
3017
3018 if (!s)
3019 return 0;
3020
3021 LIST_FOREACH(sockets, ss, s->sockets) {
3022 int copy;
3023
3024 assert(ss->address);
3025 assert(ss->fd >= 0);
3026
3027 fprintf(f, "varlink-server-socket-address=%s", ss->address);
3028
3029 /* If we fail to serialize the fd, it will be considered an error during deserialization */
3030 copy = fdset_put_dup(fds, ss->fd);
3031 if (copy < 0)
3032 return copy;
3033
3034 fprintf(f, " varlink-server-socket-fd=%i", copy);
3035
3036 fputc('\n', f);
3037 }
3038
3039 return 0;
3040}
3041
3042int varlink_server_deserialize_one(VarlinkServer *s, const char *value, FDSet *fds) {
3043 _cleanup_(varlink_server_socket_freep) VarlinkServerSocket *ss = NULL;
3044 _cleanup_free_ char *address = NULL;
3045 const char *v = ASSERT_PTR(value);
254d1313 3046 int r, fd = -EBADF;
008798e9
AZ
3047 char *buf;
3048 size_t n;
3049
3050 assert(s);
3051 assert(fds);
3052
3053 n = strcspn(v, " ");
3054 address = strndup(v, n);
3055 if (!address)
3056 return log_oom_debug();
3057
3058 if (v[n] != ' ')
3059 return log_debug_errno(SYNTHETIC_ERRNO(EINVAL),
3060 "Failed to deserialize VarlinkServerSocket: %s: %m", value);
3061 v = startswith(v + n + 1, "varlink-server-socket-fd=");
3062 if (!v)
3063 return log_debug_errno(SYNTHETIC_ERRNO(EINVAL),
3064 "Failed to deserialize VarlinkServerSocket fd %s: %m", value);
3065
3066 n = strcspn(v, " ");
3067 buf = strndupa_safe(v, n);
3068
e652663a 3069 fd = parse_fd(buf);
e652663a
DT
3070 if (fd < 0)
3071 return log_debug_errno(fd, "Unable to parse VarlinkServerSocket varlink-server-socket-fd=%s: %m", buf);
008798e9
AZ
3072 if (!fdset_contains(fds, fd))
3073 return log_debug_errno(SYNTHETIC_ERRNO(EBADF),
3074 "VarlinkServerSocket varlink-server-socket-fd= has unknown fd %d: %m", fd);
3075
3076 ss = new(VarlinkServerSocket, 1);
3077 if (!ss)
3078 return log_oom_debug();
3079
3080 *ss = (VarlinkServerSocket) {
3081 .server = s,
3082 .address = TAKE_PTR(address),
3083 .fd = fdset_remove(fds, fd),
3084 };
3085
3086 r = varlink_server_add_socket_event_source(s, ss, SD_EVENT_PRIORITY_NORMAL);
3087 if (r < 0)
3088 return log_debug_errno(r, "Failed to add VarlinkServerSocket event source to the event loop: %m");
3089
3090 LIST_PREPEND(sockets, s->sockets, TAKE_PTR(ss));
3091 return 0;
3092}