]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/shared/varlink.c
tree-wide: fix spelling errors
[thirdparty/systemd.git] / src / shared / varlink.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2
3 #include <sys/poll.h>
4
5 #include "alloc-util.h"
6 #include "errno-util.h"
7 #include "fd-util.h"
8 #include "hashmap.h"
9 #include "list.h"
10 #include "process-util.h"
11 #include "set.h"
12 #include "socket-util.h"
13 #include "string-table.h"
14 #include "string-util.h"
15 #include "strv.h"
16 #include "time-util.h"
17 #include "umask-util.h"
18 #include "user-util.h"
19 #include "varlink.h"
20
21 #define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
22 #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
23
24 #define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
25 #define VARLINK_BUFFER_MAX (16U*1024U*1024U)
26 #define VARLINK_READ_SIZE (64U*1024U)
27
28 typedef enum VarlinkState {
29 /* Client side states */
30 VARLINK_IDLE_CLIENT,
31 VARLINK_AWAITING_REPLY,
32 VARLINK_AWAITING_REPLY_MORE,
33 VARLINK_CALLING,
34 VARLINK_CALLED,
35 VARLINK_PROCESSING_REPLY,
36
37 /* Server side states */
38 VARLINK_IDLE_SERVER,
39 VARLINK_PROCESSING_METHOD,
40 VARLINK_PROCESSING_METHOD_MORE,
41 VARLINK_PROCESSING_METHOD_ONEWAY,
42 VARLINK_PROCESSED_METHOD,
43 VARLINK_PENDING_METHOD,
44 VARLINK_PENDING_METHOD_MORE,
45
46 /* Common states (only during shutdown) */
47 VARLINK_PENDING_DISCONNECT,
48 VARLINK_PENDING_TIMEOUT,
49 VARLINK_PROCESSING_DISCONNECT,
50 VARLINK_PROCESSING_TIMEOUT,
51 VARLINK_PROCESSING_FAILURE,
52 VARLINK_DISCONNECTED,
53
54 _VARLINK_STATE_MAX,
55 _VARLINK_STATE_INVALID = -1
56 } VarlinkState;
57
58 /* Tests whether we are not yet disconnected. Note that this is true during all states where the connection
59 * is still good for something, and false only when it's dead for good. This means: when we are
60 * asynchronously connecting to a peer and the connect() is still pending, then this will return 'true', as
61 * the connection is still good, and we are likely to be able to properly operate on it soon. */
62 #define VARLINK_STATE_IS_ALIVE(state) \
63 IN_SET(state, \
64 VARLINK_IDLE_CLIENT, \
65 VARLINK_AWAITING_REPLY, \
66 VARLINK_AWAITING_REPLY_MORE, \
67 VARLINK_CALLING, \
68 VARLINK_CALLED, \
69 VARLINK_PROCESSING_REPLY, \
70 VARLINK_IDLE_SERVER, \
71 VARLINK_PROCESSING_METHOD, \
72 VARLINK_PROCESSING_METHOD_MORE, \
73 VARLINK_PROCESSING_METHOD_ONEWAY, \
74 VARLINK_PROCESSED_METHOD, \
75 VARLINK_PENDING_METHOD, \
76 VARLINK_PENDING_METHOD_MORE)
77
78 struct Varlink {
79 unsigned n_ref;
80
81 VarlinkServer *server;
82
83 VarlinkState state;
84 bool connecting; /* This boolean indicates whether the socket fd we are operating on is currently
85 * processing an asynchronous connect(). In that state we watch the socket for
86 * EPOLLOUT, but we refrain from calling read() or write() on the socket as that
87 * will trigger ENOTCONN. Note that this boolean is kept separate from the
88 * VarlinkState above on purpose: while the connect() is still not complete we
89 * already want to allow queuing of messages and similar. Thus it's nice to keep
90 * these two state concepts separate: the VarlinkState encodes what our own view of
91 * the connection is, i.e. whether we think it's a server, a client, and has
92 * something queued already, while 'connecting' tells us a detail about the
93 * transport used below, that should have no effect on how we otherwise accept and
94 * process operations from the user.
95 *
96 * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the
97 * connection is good to use, even if it might not be fully connected
98 * yet. connecting=true then informs you that actually we are still connecting, and
99 * the connection is actually not established yet and thus any requests you enqueue
100 * now will still work fine but will be queued only, not sent yet, but that
101 * shouldn't stop you from using the connection, since eventually whatever you queue
102 * *will* be sent.
103 *
104 * Or to say this even differently: 'state' is a high-level ("application layer"
105 * high, if you so will) state, while 'conecting' is a low-level ("transport layer"
106 * low, if you so will) state, and while they are not entirely unrelated and
107 * sometimes propagate effects to each other they are only asynchronously connected
108 * at most. */
109 unsigned n_pending;
110
111 int fd;
112
113 char *input_buffer; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */
114 size_t input_buffer_allocated;
115 size_t input_buffer_index;
116 size_t input_buffer_size;
117 size_t input_buffer_unscanned;
118
119 char *output_buffer; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */
120 size_t output_buffer_allocated;
121 size_t output_buffer_index;
122 size_t output_buffer_size;
123
124 VarlinkReply reply_callback;
125
126 JsonVariant *current;
127 JsonVariant *reply;
128
129 struct ucred ucred;
130 bool ucred_acquired:1;
131
132 bool write_disconnected:1;
133 bool read_disconnected:1;
134 bool prefer_read_write:1;
135 bool got_pollhup:1;
136
137 usec_t timestamp;
138 usec_t timeout;
139
140 void *userdata;
141 char *description;
142
143 sd_event *event;
144 sd_event_source *io_event_source;
145 sd_event_source *time_event_source;
146 sd_event_source *quit_event_source;
147 sd_event_source *defer_event_source;
148 };
149
150 typedef struct VarlinkServerSocket VarlinkServerSocket;
151
152 struct VarlinkServerSocket {
153 VarlinkServer *server;
154
155 int fd;
156 char *address;
157
158 sd_event_source *event_source;
159
160 LIST_FIELDS(VarlinkServerSocket, sockets);
161 };
162
163 struct VarlinkServer {
164 unsigned n_ref;
165 VarlinkServerFlags flags;
166
167 LIST_HEAD(VarlinkServerSocket, sockets);
168
169 Hashmap *methods;
170 VarlinkConnect connect_callback;
171 VarlinkDisconnect disconnect_callback;
172
173 sd_event *event;
174 int64_t event_priority;
175
176 unsigned n_connections;
177 Hashmap *by_uid;
178
179 void *userdata;
180 char *description;
181
182 unsigned connections_max;
183 unsigned connections_per_uid_max;
184 };
185
186 static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
187 [VARLINK_IDLE_CLIENT] = "idle-client",
188 [VARLINK_AWAITING_REPLY] = "awaiting-reply",
189 [VARLINK_AWAITING_REPLY_MORE] = "awaiting-reply-more",
190 [VARLINK_CALLING] = "calling",
191 [VARLINK_CALLED] = "called",
192 [VARLINK_PROCESSING_REPLY] = "processing-reply",
193 [VARLINK_IDLE_SERVER] = "idle-server",
194 [VARLINK_PROCESSING_METHOD] = "processing-method",
195 [VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more",
196 [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
197 [VARLINK_PROCESSED_METHOD] = "processed-method",
198 [VARLINK_PENDING_METHOD] = "pending-method",
199 [VARLINK_PENDING_METHOD_MORE] = "pending-method-more",
200 [VARLINK_PENDING_DISCONNECT] = "pending-disconnect",
201 [VARLINK_PENDING_TIMEOUT] = "pending-timeout",
202 [VARLINK_PROCESSING_DISCONNECT] = "processing-disconnect",
203 [VARLINK_PROCESSING_TIMEOUT] = "processing-timeout",
204 [VARLINK_PROCESSING_FAILURE] = "processing-failure",
205 [VARLINK_DISCONNECTED] = "disconnected",
206 };
207
208 DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state, VarlinkState);
209
210 #define varlink_log_errno(v, error, fmt, ...) \
211 log_debug_errno(error, "%s: " fmt, varlink_description(v), ##__VA_ARGS__)
212
213 #define varlink_log(v, fmt, ...) \
214 log_debug("%s: " fmt, varlink_description(v), ##__VA_ARGS__)
215
216 #define varlink_server_log_errno(s, error, fmt, ...) \
217 log_debug_errno(error, "%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
218
219 #define varlink_server_log(s, fmt, ...) \
220 log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__)
221
222 static inline const char *varlink_description(Varlink *v) {
223 return strna(v ? v->description : NULL);
224 }
225
226 static inline const char *varlink_server_description(VarlinkServer *s) {
227 return strna(s ? s->description : NULL);
228 }
229
230 static void varlink_set_state(Varlink *v, VarlinkState state) {
231 assert(v);
232 assert(state >= 0 && state < _VARLINK_STATE_MAX);
233
234 if (v->state < 0)
235 varlink_log(v, "varlink: setting state %s",
236 varlink_state_to_string(state));
237 else
238 varlink_log(v, "varlink: changing state %s → %s",
239 varlink_state_to_string(v->state),
240 varlink_state_to_string(state));
241
242 v->state = state;
243 }
244
245 static int varlink_new(Varlink **ret) {
246 Varlink *v;
247
248 assert(ret);
249
250 v = new(Varlink, 1);
251 if (!v)
252 return -ENOMEM;
253
254 *v = (Varlink) {
255 .n_ref = 1,
256 .fd = -1,
257
258 .state = _VARLINK_STATE_INVALID,
259
260 .ucred.uid = UID_INVALID,
261 .ucred.gid = GID_INVALID,
262
263 .timestamp = USEC_INFINITY,
264 .timeout = VARLINK_DEFAULT_TIMEOUT_USEC
265 };
266
267 *ret = v;
268 return 0;
269 }
270
271 int varlink_connect_address(Varlink **ret, const char *address) {
272 _cleanup_(varlink_unrefp) Varlink *v = NULL;
273 union sockaddr_union sockaddr;
274 socklen_t sockaddr_len;
275 int r;
276
277 assert_return(ret, -EINVAL);
278 assert_return(address, -EINVAL);
279
280 r = sockaddr_un_set_path(&sockaddr.un, address);
281 if (r < 0)
282 return r;
283 sockaddr_len = r;
284
285 r = varlink_new(&v);
286 if (r < 0)
287 return r;
288
289 v->fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
290 if (v->fd < 0)
291 return -errno;
292
293 v->fd = fd_move_above_stdio(v->fd);
294
295 if (connect(v->fd, &sockaddr.sa, sockaddr_len) < 0) {
296 if (!IN_SET(errno, EAGAIN, EINPROGRESS))
297 return -errno;
298
299 v->connecting = true; /* We are asynchronously connecting, i.e. the connect() is being
300 * processed in the background. As long as that's the case the socket
301 * is in a special state: it's there, we can poll it for EPOLLOUT, but
302 * if we attempt to write() to it before we see EPOLLOUT we'll get
303 * ENOTCONN (and not EAGAIN, like we would for a normal connected
304 * socket that isn't writable at the moment). Since ENOTCONN on write()
305 * hence can mean two different things (i.e. connection not complete
306 * yet vs. already disconnected again), we store as a boolean whether
307 * we are still in connect(). */
308 }
309
310 varlink_set_state(v, VARLINK_IDLE_CLIENT);
311
312 *ret = TAKE_PTR(v);
313 return r;
314 }
315
316 int varlink_connect_fd(Varlink **ret, int fd) {
317 Varlink *v;
318 int r;
319
320 assert_return(ret, -EINVAL);
321 assert_return(fd >= 0, -EBADF);
322
323 r = fd_nonblock(fd, true);
324 if (r < 0)
325 return r;
326
327 r = varlink_new(&v);
328 if (r < 0)
329 return r;
330
331 v->fd = fd;
332 varlink_set_state(v, VARLINK_IDLE_CLIENT);
333
334 /* Note that if this function is called we assume the passed socket (if it is one) is already
335 * properly connected, i.e. any asynchronous connect() done on it already completed. Because of that
336 * we'll not set the 'connecting' boolean here, i.e. we don't need to avoid write()ing to the socket
337 * until the connection is fully set up. Behaviour here is hence a bit different from
338 * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
339 * avoid doing write() on it before we saw EPOLLOUT for the first time. */
340
341 *ret = v;
342 return 0;
343 }
344
345 static void varlink_detach_event_sources(Varlink *v) {
346 assert(v);
347
348 v->io_event_source = sd_event_source_disable_unref(v->io_event_source);
349 v->time_event_source = sd_event_source_disable_unref(v->time_event_source);
350 v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
351 v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
352 }
353
354 static void varlink_clear(Varlink *v) {
355 assert(v);
356
357 varlink_detach_event_sources(v);
358
359 v->fd = safe_close(v->fd);
360
361 v->input_buffer = mfree(v->input_buffer);
362 v->output_buffer = mfree(v->output_buffer);
363
364 v->current = json_variant_unref(v->current);
365 v->reply = json_variant_unref(v->reply);
366
367 v->event = sd_event_unref(v->event);
368 }
369
370 static Varlink* varlink_destroy(Varlink *v) {
371 if (!v)
372 return NULL;
373
374 /* If this is called the server object must already been unreffed here. Why that? because when we
375 * linked up the varlink connection with the server object we took one ref in each direction */
376 assert(!v->server);
377
378 varlink_clear(v);
379
380 free(v->description);
381 return mfree(v);
382 }
383
384 DEFINE_TRIVIAL_REF_UNREF_FUNC(Varlink, varlink, varlink_destroy);
385
386 static int varlink_test_disconnect(Varlink *v) {
387 assert(v);
388
389 /* Tests whether we the the connection has been terminated. We are careful to not stop processing it
390 * prematurely, since we want to handle half-open connections as well as possible and want to flush
391 * out and read data before we close down if we can. */
392
393 /* Already disconnected? */
394 if (!VARLINK_STATE_IS_ALIVE(v->state))
395 return 0;
396
397 /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
398 if (v->connecting)
399 return 0;
400
401 /* Still something to write and we can write? Stay around */
402 if (v->output_buffer_size > 0 && !v->write_disconnected)
403 return 0;
404
405 /* Both sides gone already? Then there's no need to stick around */
406 if (v->read_disconnected && v->write_disconnected)
407 goto disconnect;
408
409 /* If we are waiting for incoming data but the read side is shut down, disconnect. */
410 if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
411 goto disconnect;
412
413 /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
414 * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
415 * being down if we never wrote anything. */
416 if (IN_SET(v->state, VARLINK_IDLE_CLIENT) && (v->write_disconnected || v->got_pollhup))
417 goto disconnect;
418
419 return 0;
420
421 disconnect:
422 varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
423 return 1;
424 }
425
426 static int varlink_write(Varlink *v) {
427 ssize_t n;
428
429 assert(v);
430
431 if (!VARLINK_STATE_IS_ALIVE(v->state))
432 return 0;
433 if (v->connecting) /* Writing while we are still wait for a non-blocking connect() to complete will
434 * result in ENOTCONN, hence exit early here */
435 return 0;
436 if (v->output_buffer_size == 0)
437 return 0;
438 if (v->write_disconnected)
439 return 0;
440
441 assert(v->fd >= 0);
442
443 /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
444 * with non-socket IO, hence fall back automatically */
445 if (!v->prefer_read_write) {
446 n = send(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL);
447 if (n < 0 && errno == ENOTSOCK)
448 v->prefer_read_write = true;
449 }
450 if (v->prefer_read_write)
451 n = write(v->fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size);
452 if (n < 0) {
453 if (errno == EAGAIN)
454 return 0;
455
456 if (ERRNO_IS_DISCONNECT(errno)) {
457 /* If we get informed about a disconnect on write, then let's remember that, but not
458 * act on it just yet. Let's wait for read() to report the issue first. */
459 v->write_disconnected = true;
460 return 1;
461 }
462
463 return -errno;
464 }
465
466 v->output_buffer_size -= n;
467
468 if (v->output_buffer_size == 0)
469 v->output_buffer_index = 0;
470 else
471 v->output_buffer_index += n;
472
473 v->timestamp = now(CLOCK_MONOTONIC);
474 return 1;
475 }
476
477 static int varlink_read(Varlink *v) {
478 size_t rs;
479 ssize_t n;
480
481 assert(v);
482
483 if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
484 return 0;
485 if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
486 return 0;
487 if (v->current)
488 return 0;
489 if (v->input_buffer_unscanned > 0)
490 return 0;
491 if (v->read_disconnected)
492 return 0;
493
494 if (v->input_buffer_size >= VARLINK_BUFFER_MAX)
495 return -ENOBUFS;
496
497 assert(v->fd >= 0);
498
499 if (v->input_buffer_allocated <= v->input_buffer_index + v->input_buffer_size) {
500 size_t add;
501
502 add = MIN(VARLINK_BUFFER_MAX - v->input_buffer_size, VARLINK_READ_SIZE);
503
504 if (v->input_buffer_index == 0) {
505
506 if (!GREEDY_REALLOC(v->input_buffer, v->input_buffer_allocated, v->input_buffer_size + add))
507 return -ENOMEM;
508
509 } else {
510 char *b;
511
512 b = new(char, v->input_buffer_size + add);
513 if (!b)
514 return -ENOMEM;
515
516 memcpy(b, v->input_buffer + v->input_buffer_index, v->input_buffer_size);
517
518 free_and_replace(v->input_buffer, b);
519
520 v->input_buffer_allocated = v->input_buffer_size + add;
521 v->input_buffer_index = 0;
522 }
523 }
524
525 rs = v->input_buffer_allocated - (v->input_buffer_index + v->input_buffer_size);
526
527 if (!v->prefer_read_write) {
528 n = recv(v->fd, v->input_buffer + v->input_buffer_index + v->input_buffer_size, rs, MSG_DONTWAIT);
529 if (n < 0 && errno == ENOTSOCK)
530 v->prefer_read_write = true;
531 }
532 if (v->prefer_read_write)
533 n = read(v->fd, v->input_buffer + v->input_buffer_index + v->input_buffer_size, rs);
534 if (n < 0) {
535 if (errno == EAGAIN)
536 return 0;
537
538 if (ERRNO_IS_DISCONNECT(errno)) {
539 v->read_disconnected = true;
540 return 1;
541 }
542
543 return -errno;
544 }
545 if (n == 0) { /* EOF */
546 v->read_disconnected = true;
547 return 1;
548 }
549
550 v->input_buffer_size += n;
551 v->input_buffer_unscanned += n;
552
553 return 1;
554 }
555
556 static int varlink_parse_message(Varlink *v) {
557 const char *e, *begin;
558 size_t sz;
559 int r;
560
561 assert(v);
562
563 if (v->current)
564 return 0;
565 if (v->input_buffer_unscanned <= 0)
566 return 0;
567
568 assert(v->input_buffer_unscanned <= v->input_buffer_size);
569 assert(v->input_buffer_index + v->input_buffer_size <= v->input_buffer_allocated);
570
571 begin = v->input_buffer + v->input_buffer_index;
572
573 e = memchr(begin + v->input_buffer_size - v->input_buffer_unscanned, 0, v->input_buffer_unscanned);
574 if (!e) {
575 v->input_buffer_unscanned = 0;
576 return 0;
577 }
578
579 sz = e - begin + 1;
580
581 varlink_log(v, "New incoming message: %s", begin);
582
583 r = json_parse(begin, 0, &v->current, NULL, NULL);
584 if (r < 0)
585 return r;
586
587 v->input_buffer_size -= sz;
588
589 if (v->input_buffer_size == 0)
590 v->input_buffer_index = 0;
591 else
592 v->input_buffer_index += sz;
593
594 v->input_buffer_unscanned = v->input_buffer_size;
595 return 1;
596 }
597
598 static int varlink_test_timeout(Varlink *v) {
599 assert(v);
600
601 if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
602 return 0;
603 if (v->timeout == USEC_INFINITY)
604 return 0;
605
606 if (now(CLOCK_MONOTONIC) < usec_add(v->timestamp, v->timeout))
607 return 0;
608
609 varlink_set_state(v, VARLINK_PENDING_TIMEOUT);
610
611 return 1;
612 }
613
614 static int varlink_dispatch_local_error(Varlink *v, const char *error) {
615 int r;
616
617 assert(v);
618 assert(error);
619
620 if (!v->reply_callback)
621 return 0;
622
623 r = v->reply_callback(v, NULL, error, VARLINK_REPLY_ERROR|VARLINK_REPLY_LOCAL, v->userdata);
624 if (r < 0)
625 log_debug_errno(r, "Reply callback returned error, ignoring: %m");
626
627 return 1;
628 }
629
630 static int varlink_dispatch_timeout(Varlink *v) {
631 assert(v);
632
633 if (v->state != VARLINK_PENDING_TIMEOUT)
634 return 0;
635
636 varlink_set_state(v, VARLINK_PROCESSING_TIMEOUT);
637 varlink_dispatch_local_error(v, VARLINK_ERROR_TIMEOUT);
638 varlink_close(v);
639
640 return 1;
641 }
642
643 static int varlink_dispatch_disconnect(Varlink *v) {
644 assert(v);
645
646 if (v->state != VARLINK_PENDING_DISCONNECT)
647 return 0;
648
649 varlink_set_state(v, VARLINK_PROCESSING_DISCONNECT);
650 varlink_dispatch_local_error(v, VARLINK_ERROR_DISCONNECTED);
651 varlink_close(v);
652
653 return 1;
654 }
655
656 static int varlink_sanitize_parameters(JsonVariant **v) {
657 assert(v);
658
659 /* Varlink always wants a parameters list, hence make one if the caller doesn't want any */
660 if (!*v)
661 return json_variant_new_object(v, NULL, 0);
662 else if (!json_variant_is_object(*v))
663 return -EINVAL;
664
665 return 0;
666 }
667
668 static int varlink_dispatch_reply(Varlink *v) {
669 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
670 VarlinkReplyFlags flags = 0;
671 const char *error = NULL;
672 JsonVariant *e;
673 const char *k;
674 int r;
675
676 assert(v);
677
678 if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
679 return 0;
680 if (!v->current)
681 return 0;
682
683 assert(v->n_pending > 0);
684
685 if (!json_variant_is_object(v->current))
686 goto invalid;
687
688 JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) {
689
690 if (streq(k, "error")) {
691 if (error)
692 goto invalid;
693 if (!json_variant_is_string(e))
694 goto invalid;
695
696 error = json_variant_string(e);
697 flags |= VARLINK_REPLY_ERROR;
698
699 } else if (streq(k, "parameters")) {
700 if (parameters)
701 goto invalid;
702 if (!json_variant_is_object(e))
703 goto invalid;
704
705 parameters = json_variant_ref(e);
706
707 } else if (streq(k, "continues")) {
708 if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
709 goto invalid;
710
711 if (!json_variant_is_boolean(e))
712 goto invalid;
713
714 if (json_variant_boolean(e))
715 flags |= VARLINK_REPLY_CONTINUES;
716 } else
717 goto invalid;
718 }
719
720 /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
721 if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
722 goto invalid;
723
724 /* An error is final */
725 if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
726 goto invalid;
727
728 r = varlink_sanitize_parameters(&parameters);
729 if (r < 0)
730 goto invalid;
731
732 if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) {
733 varlink_set_state(v, VARLINK_PROCESSING_REPLY);
734
735 if (v->reply_callback) {
736 r = v->reply_callback(v, parameters, error, flags, v->userdata);
737 if (r < 0)
738 log_debug_errno(r, "Reply callback returned error, ignoring: %m");
739 }
740
741 v->current = json_variant_unref(v->current);
742
743 if (v->state == VARLINK_PROCESSING_REPLY) {
744
745 assert(v->n_pending > 0);
746
747 if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
748 v->n_pending--;
749
750 varlink_set_state(v,
751 FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
752 v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
753 }
754 } else {
755 assert(v->state == VARLINK_CALLING);
756 varlink_set_state(v, VARLINK_CALLED);
757 }
758
759 return 1;
760
761 invalid:
762 varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
763 varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL);
764 varlink_close(v);
765
766 return 1;
767 }
768
769 static int varlink_dispatch_method(Varlink *v) {
770 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
771 VarlinkMethodFlags flags = 0;
772 const char *method = NULL, *error;
773 JsonVariant *e;
774 VarlinkMethod callback;
775 const char *k;
776 int r;
777
778 assert(v);
779
780 if (v->state != VARLINK_IDLE_SERVER)
781 return 0;
782 if (!v->current)
783 return 0;
784
785 if (!json_variant_is_object(v->current))
786 goto invalid;
787
788 JSON_VARIANT_OBJECT_FOREACH(k, e, v->current) {
789
790 if (streq(k, "method")) {
791 if (method)
792 goto invalid;
793 if (!json_variant_is_string(e))
794 goto invalid;
795
796 method = json_variant_string(e);
797
798 } else if (streq(k, "parameters")) {
799 if (parameters)
800 goto invalid;
801 if (!json_variant_is_object(e))
802 goto invalid;
803
804 parameters = json_variant_ref(e);
805
806 } else if (streq(k, "oneway")) {
807
808 if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0)
809 goto invalid;
810
811 if (!json_variant_is_boolean(e))
812 goto invalid;
813
814 if (json_variant_boolean(e))
815 flags |= VARLINK_METHOD_ONEWAY;
816
817 } else if (streq(k, "more")) {
818
819 if ((flags & (VARLINK_METHOD_ONEWAY|VARLINK_METHOD_MORE)) != 0)
820 goto invalid;
821
822 if (!json_variant_is_boolean(e))
823 goto invalid;
824
825 if (json_variant_boolean(e))
826 flags |= VARLINK_METHOD_MORE;
827
828 } else
829 goto invalid;
830 }
831
832 if (!method)
833 goto invalid;
834
835 r = varlink_sanitize_parameters(&parameters);
836 if (r < 0)
837 goto fail;
838
839 varlink_set_state(v, (flags & VARLINK_METHOD_MORE) ? VARLINK_PROCESSING_METHOD_MORE :
840 (flags & VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
841 VARLINK_PROCESSING_METHOD);
842
843 assert(v->server);
844
845 if (STR_IN_SET(method, "org.varlink.service.GetInfo", "org.varlink.service.GetInterface")) {
846 /* For now, we don't implement a single of varlink's own methods */
847 callback = NULL;
848 error = VARLINK_ERROR_METHOD_NOT_IMPLEMENTED;
849 } else if (startswith(method, "org.varlink.service.")) {
850 callback = NULL;
851 error = VARLINK_ERROR_METHOD_NOT_FOUND;
852 } else {
853 callback = hashmap_get(v->server->methods, method);
854 error = VARLINK_ERROR_METHOD_NOT_FOUND;
855 }
856
857 if (callback) {
858 r = callback(v, parameters, flags, v->userdata);
859 if (r < 0) {
860 log_debug_errno(r, "Callback for %s returned error: %m", method);
861
862 /* We got an error back from the callback. Propagate it to the client if the method call remains unanswered. */
863 if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) {
864 r = varlink_errorb(v, VARLINK_ERROR_SYSTEM, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("errno", JSON_BUILD_INTEGER(-r))));
865 if (r < 0)
866 return r;
867 }
868 }
869 } else if (!FLAGS_SET(flags, VARLINK_METHOD_ONEWAY)) {
870 assert(error);
871
872 r = varlink_errorb(v, error, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method))));
873 if (r < 0)
874 return r;
875 }
876
877 switch (v->state) {
878
879 case VARLINK_PROCESSED_METHOD: /* Method call is fully processed */
880 case VARLINK_PROCESSING_METHOD_ONEWAY: /* ditto */
881 v->current = json_variant_unref(v->current);
882 varlink_set_state(v, VARLINK_IDLE_SERVER);
883 break;
884
885 case VARLINK_PROCESSING_METHOD: /* Method call wasn't replied to, will be replied to later */
886 varlink_set_state(v, VARLINK_PENDING_METHOD);
887 break;
888
889 case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
890 varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
891 break;
892
893 default:
894 assert_not_reached("Unexpected state");
895
896 }
897
898 return r;
899
900 invalid:
901 r = -EINVAL;
902
903 fail:
904 varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
905 varlink_dispatch_local_error(v, VARLINK_ERROR_PROTOCOL);
906 varlink_close(v);
907
908 return r;
909 }
910
911 int varlink_process(Varlink *v) {
912 int r;
913
914 assert_return(v, -EINVAL);
915
916 if (v->state == VARLINK_DISCONNECTED)
917 return -ENOTCONN;
918
919 varlink_ref(v);
920
921 r = varlink_write(v);
922 if (r != 0)
923 goto finish;
924
925 r = varlink_dispatch_reply(v);
926 if (r != 0)
927 goto finish;
928
929 r = varlink_dispatch_method(v);
930 if (r != 0)
931 goto finish;
932
933 r = varlink_parse_message(v);
934 if (r != 0)
935 goto finish;
936
937 r = varlink_read(v);
938 if (r != 0)
939 goto finish;
940
941 r = varlink_test_disconnect(v);
942 if (r != 0)
943 goto finish;
944
945 r = varlink_dispatch_disconnect(v);
946 if (r != 0)
947 goto finish;
948
949 r = varlink_test_timeout(v);
950 if (r != 0)
951 goto finish;
952
953 r = varlink_dispatch_timeout(v);
954 if (r != 0)
955 goto finish;
956
957 finish:
958 if (r >= 0 && v->defer_event_source) {
959 int q;
960
961 /* If we did some processing, make sure we are called again soon */
962 q = sd_event_source_set_enabled(v->defer_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF);
963 if (q < 0)
964 r = q;
965 }
966
967 if (r < 0) {
968 if (VARLINK_STATE_IS_ALIVE(v->state))
969 /* Initiate disconnection */
970 varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
971 else
972 /* We failed while disconnecting, in that case close right away */
973 varlink_close(v);
974 }
975
976 varlink_unref(v);
977 return r;
978 }
979
980 static void handle_revents(Varlink *v, int revents) {
981 assert(v);
982
983 if (v->connecting) {
984 /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
985 * to complete on, we know we are ready. We don't read the connection error here though,
986 * we'll get the error on the next read() or write(). */
987 if ((revents & (POLLOUT|POLLHUP)) == 0)
988 return;
989
990 varlink_log(v, "Anynchronous connection completed.");
991 v->connecting = false;
992 } else {
993 /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
994 * what we can. However, we do care about POLLHUP to detect connection termination even if we
995 * momentarily don't want to read nor write anything. */
996
997 if (!FLAGS_SET(revents, POLLHUP))
998 return;
999
1000 varlink_log(v, "Got POLLHUP from socket.");
1001 v->got_pollhup = true;
1002 }
1003 }
1004
1005 int varlink_wait(Varlink *v, usec_t timeout) {
1006 struct timespec ts;
1007 struct pollfd pfd;
1008 int r, fd, events;
1009 usec_t t;
1010
1011 assert_return(v, -EINVAL);
1012
1013 if (v->state == VARLINK_DISCONNECTED)
1014 return -ENOTCONN;
1015
1016 r = varlink_get_timeout(v, &t);
1017 if (r < 0)
1018 return r;
1019 if (t != USEC_INFINITY) {
1020 usec_t n;
1021
1022 n = now(CLOCK_MONOTONIC);
1023 if (t < n)
1024 t = 0;
1025 else
1026 t = usec_sub_unsigned(t, n);
1027 }
1028
1029 if (timeout != USEC_INFINITY &&
1030 (t == USEC_INFINITY || timeout < t))
1031 t = timeout;
1032
1033 fd = varlink_get_fd(v);
1034 if (fd < 0)
1035 return fd;
1036
1037 events = varlink_get_events(v);
1038 if (events < 0)
1039 return events;
1040
1041 pfd = (struct pollfd) {
1042 .fd = fd,
1043 .events = events,
1044 };
1045
1046 r = ppoll(&pfd, 1,
1047 t == USEC_INFINITY ? NULL : timespec_store(&ts, t),
1048 NULL);
1049 if (r < 0)
1050 return -errno;
1051
1052 handle_revents(v, pfd.revents);
1053
1054 return r > 0 ? 1 : 0;
1055 }
1056
1057 int varlink_get_fd(Varlink *v) {
1058
1059 assert_return(v, -EINVAL);
1060
1061 if (v->state == VARLINK_DISCONNECTED)
1062 return -ENOTCONN;
1063 if (v->fd < 0)
1064 return -EBADF;
1065
1066 return v->fd;
1067 }
1068
1069 int varlink_get_events(Varlink *v) {
1070 int ret = 0;
1071
1072 assert_return(v, -EINVAL);
1073
1074 if (v->state == VARLINK_DISCONNECTED)
1075 return -ENOTCONN;
1076
1077 if (v->connecting) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
1078 * tells us that the connection is now complete. Before that we should neither
1079 * write() or read() from the fd. */
1080 return EPOLLOUT;
1081
1082 if (!v->read_disconnected &&
1083 IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
1084 !v->current &&
1085 v->input_buffer_unscanned <= 0)
1086 ret |= EPOLLIN;
1087
1088 if (!v->write_disconnected &&
1089 v->output_buffer_size > 0)
1090 ret |= EPOLLOUT;
1091
1092 return ret;
1093 }
1094
1095 int varlink_get_timeout(Varlink *v, usec_t *ret) {
1096 assert_return(v, -EINVAL);
1097
1098 if (v->state == VARLINK_DISCONNECTED)
1099 return -ENOTCONN;
1100
1101 if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
1102 v->timeout != USEC_INFINITY) {
1103 if (ret)
1104 *ret = usec_add(v->timestamp, v->timeout);
1105 return 1;
1106 } else {
1107 if (ret)
1108 *ret = USEC_INFINITY;
1109 return 0;
1110 }
1111 }
1112
1113 int varlink_flush(Varlink *v) {
1114 int ret = 0, r;
1115
1116 assert_return(v, -EINVAL);
1117
1118 if (v->state == VARLINK_DISCONNECTED)
1119 return -ENOTCONN;
1120
1121 for (;;) {
1122 struct pollfd pfd;
1123
1124 if (v->output_buffer_size == 0)
1125 break;
1126 if (v->write_disconnected)
1127 return -ECONNRESET;
1128
1129 r = varlink_write(v);
1130 if (r < 0)
1131 return r;
1132 if (r > 0) {
1133 ret = 1;
1134 continue;
1135 }
1136
1137 pfd = (struct pollfd) {
1138 .fd = v->fd,
1139 .events = POLLOUT,
1140 };
1141
1142 if (poll(&pfd, 1, -1) < 0)
1143 return -errno;
1144
1145 handle_revents(v, pfd.revents);
1146 }
1147
1148 return ret;
1149 }
1150
1151 static void varlink_detach_server(Varlink *v) {
1152 VarlinkServer *saved_server;
1153 assert(v);
1154
1155 if (!v->server)
1156 return;
1157
1158 if (v->server->by_uid &&
1159 v->ucred_acquired &&
1160 uid_is_valid(v->ucred.uid)) {
1161 unsigned c;
1162
1163 c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(v->ucred.uid)));
1164 assert(c > 0);
1165
1166 if (c == 1)
1167 (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(v->ucred.uid));
1168 else
1169 (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(v->ucred.uid), UINT_TO_PTR(c - 1));
1170 }
1171
1172 assert(v->server->n_connections > 0);
1173 v->server->n_connections--;
1174
1175 /* If this is a connection associated to a server, then let's disconnect the server and the
1176 * connection from each other. This drops the dangling reference that connect_callback() set up. But
1177 * before we release the references, let's call the disconnection callback if it is defined. */
1178
1179 saved_server = TAKE_PTR(v->server);
1180
1181 if (saved_server->disconnect_callback)
1182 saved_server->disconnect_callback(saved_server, v, saved_server->userdata);
1183
1184 varlink_server_unref(saved_server);
1185 varlink_unref(v);
1186 }
1187
1188 int varlink_close(Varlink *v) {
1189
1190 assert_return(v, -EINVAL);
1191
1192 if (v->state == VARLINK_DISCONNECTED)
1193 return 0;
1194
1195 varlink_set_state(v, VARLINK_DISCONNECTED);
1196
1197 /* Let's take a reference first, since varlink_detach_server() might drop the final (dangling) ref
1198 * which would destroy us before we can call varlink_clear() */
1199 varlink_ref(v);
1200 varlink_detach_server(v);
1201 varlink_clear(v);
1202 varlink_unref(v);
1203
1204 return 1;
1205 }
1206
1207 Varlink* varlink_close_unref(Varlink *v) {
1208
1209 if (!v)
1210 return NULL;
1211
1212 (void) varlink_close(v);
1213 return varlink_unref(v);
1214 }
1215
1216 Varlink* varlink_flush_close_unref(Varlink *v) {
1217
1218 if (!v)
1219 return NULL;
1220
1221 (void) varlink_flush(v);
1222 (void) varlink_close(v);
1223 return varlink_unref(v);
1224 }
1225
1226 static int varlink_enqueue_json(Varlink *v, JsonVariant *m) {
1227 _cleanup_free_ char *text = NULL;
1228 int r;
1229
1230 assert(v);
1231 assert(m);
1232
1233 r = json_variant_format(m, 0, &text);
1234 if (r < 0)
1235 return r;
1236 assert(text[r] == '\0');
1237
1238 if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX)
1239 return -ENOBUFS;
1240
1241 varlink_log(v, "Sending message: %s", text);
1242
1243 if (v->output_buffer_size == 0) {
1244
1245 free_and_replace(v->output_buffer, text);
1246
1247 v->output_buffer_size = v->output_buffer_allocated = r + 1;
1248 v->output_buffer_index = 0;
1249
1250 } else if (v->output_buffer_index == 0) {
1251
1252 if (!GREEDY_REALLOC(v->output_buffer, v->output_buffer_allocated, v->output_buffer_size + r + 1))
1253 return -ENOMEM;
1254
1255 memcpy(v->output_buffer + v->output_buffer_size, text, r + 1);
1256 v->output_buffer_size += r + 1;
1257
1258 } else {
1259 char *n;
1260 const size_t new_size = v->output_buffer_size + r + 1;
1261
1262 n = new(char, new_size);
1263 if (!n)
1264 return -ENOMEM;
1265
1266 memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, r + 1);
1267
1268 free_and_replace(v->output_buffer, n);
1269 v->output_buffer_allocated = v->output_buffer_size = new_size;
1270 v->output_buffer_index = 0;
1271 }
1272
1273 return 0;
1274 }
1275
1276 int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) {
1277 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1278 int r;
1279
1280 assert_return(v, -EINVAL);
1281 assert_return(method, -EINVAL);
1282
1283 if (v->state == VARLINK_DISCONNECTED)
1284 return -ENOTCONN;
1285
1286 /* We allow enqueuing multiple method calls at once! */
1287 if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
1288 return -EBUSY;
1289
1290 r = varlink_sanitize_parameters(&parameters);
1291 if (r < 0)
1292 return r;
1293
1294 r = json_build(&m, JSON_BUILD_OBJECT(
1295 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1296 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1297 JSON_BUILD_PAIR("oneway", JSON_BUILD_BOOLEAN(true))));
1298 if (r < 0)
1299 return r;
1300
1301 r = varlink_enqueue_json(v, m);
1302 if (r < 0)
1303 return r;
1304
1305 /* No state change here, this is one-way only after all */
1306 v->timestamp = now(CLOCK_MONOTONIC);
1307 return 0;
1308 }
1309
1310 int varlink_sendb(Varlink *v, const char *method, ...) {
1311 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1312 va_list ap;
1313 int r;
1314
1315 assert_return(v, -EINVAL);
1316
1317 va_start(ap, method);
1318 r = json_buildv(&parameters, ap);
1319 va_end(ap);
1320
1321 if (r < 0)
1322 return r;
1323
1324 return varlink_send(v, method, parameters);
1325 }
1326
1327 int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) {
1328 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1329 int r;
1330
1331 assert_return(v, -EINVAL);
1332 assert_return(method, -EINVAL);
1333
1334 if (v->state == VARLINK_DISCONNECTED)
1335 return -ENOTCONN;
1336
1337 /* We allow enqueuing multiple method calls at once! */
1338 if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
1339 return -EBUSY;
1340
1341 r = varlink_sanitize_parameters(&parameters);
1342 if (r < 0)
1343 return r;
1344
1345 r = json_build(&m, JSON_BUILD_OBJECT(
1346 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1347 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1348 if (r < 0)
1349 return r;
1350
1351 r = varlink_enqueue_json(v, m);
1352 if (r < 0)
1353 return r;
1354
1355 varlink_set_state(v, VARLINK_AWAITING_REPLY);
1356 v->n_pending++;
1357 v->timestamp = now(CLOCK_MONOTONIC);
1358
1359 return 0;
1360 }
1361
1362 int varlink_invokeb(Varlink *v, const char *method, ...) {
1363 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1364 va_list ap;
1365 int r;
1366
1367 assert_return(v, -EINVAL);
1368
1369 va_start(ap, method);
1370 r = json_buildv(&parameters, ap);
1371 va_end(ap);
1372
1373 if (r < 0)
1374 return r;
1375
1376 return varlink_invoke(v, method, parameters);
1377 }
1378
1379 int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) {
1380 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1381 int r;
1382
1383 assert_return(v, -EINVAL);
1384 assert_return(method, -EINVAL);
1385
1386 if (v->state == VARLINK_DISCONNECTED)
1387 return -ENOTCONN;
1388 /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
1389 * thus insist on an idle client here. */
1390 if (v->state != VARLINK_IDLE_CLIENT)
1391 return -EBUSY;
1392
1393 r = varlink_sanitize_parameters(&parameters);
1394 if (r < 0)
1395 return r;
1396
1397 r = json_build(&m, JSON_BUILD_OBJECT(
1398 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1399 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1400 JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
1401 if (r < 0)
1402 return r;
1403
1404 r = varlink_enqueue_json(v, m);
1405 if (r < 0)
1406 return r;
1407
1408
1409 varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
1410 v->n_pending++;
1411 v->timestamp = now(CLOCK_MONOTONIC);
1412
1413 return 0;
1414 }
1415
1416 int varlink_observeb(Varlink *v, const char *method, ...) {
1417 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1418 va_list ap;
1419 int r;
1420
1421 assert_return(v, -EINVAL);
1422
1423 va_start(ap, method);
1424 r = json_buildv(&parameters, ap);
1425 va_end(ap);
1426
1427 if (r < 0)
1428 return r;
1429
1430 return varlink_observe(v, method, parameters);
1431 }
1432
1433 int varlink_call(
1434 Varlink *v,
1435 const char *method,
1436 JsonVariant *parameters,
1437 JsonVariant **ret_parameters,
1438 const char **ret_error_id,
1439 VarlinkReplyFlags *ret_flags) {
1440
1441 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1442 int r;
1443
1444 assert_return(v, -EINVAL);
1445 assert_return(method, -EINVAL);
1446
1447 if (v->state == VARLINK_DISCONNECTED)
1448 return -ENOTCONN;
1449 if (!IN_SET(v->state, VARLINK_IDLE_CLIENT))
1450 return -EBUSY;
1451
1452 assert(v->n_pending == 0); /* n_pending can't be > 0 if we are in VARLINK_IDLE_CLIENT state */
1453
1454 r = varlink_sanitize_parameters(&parameters);
1455 if (r < 0)
1456 return r;
1457
1458 r = json_build(&m, JSON_BUILD_OBJECT(
1459 JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
1460 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1461 if (r < 0)
1462 return r;
1463
1464 r = varlink_enqueue_json(v, m);
1465 if (r < 0)
1466 return r;
1467
1468 varlink_set_state(v, VARLINK_CALLING);
1469 v->n_pending++;
1470 v->timestamp = now(CLOCK_MONOTONIC);
1471
1472 while (v->state == VARLINK_CALLING) {
1473
1474 r = varlink_process(v);
1475 if (r < 0)
1476 return r;
1477 if (r > 0)
1478 continue;
1479
1480 r = varlink_wait(v, USEC_INFINITY);
1481 if (r < 0)
1482 return r;
1483 }
1484
1485 switch (v->state) {
1486
1487 case VARLINK_CALLED:
1488 assert(v->current);
1489
1490 json_variant_unref(v->reply);
1491 v->reply = TAKE_PTR(v->current);
1492
1493 varlink_set_state(v, VARLINK_IDLE_CLIENT);
1494 assert(v->n_pending == 1);
1495 v->n_pending--;
1496
1497 if (ret_parameters)
1498 *ret_parameters = json_variant_by_key(v->reply, "parameters");
1499 if (ret_error_id)
1500 *ret_error_id = json_variant_string(json_variant_by_key(v->reply, "error"));
1501 if (ret_flags)
1502 *ret_flags = 0;
1503
1504 return 1;
1505
1506 case VARLINK_PENDING_DISCONNECT:
1507 case VARLINK_DISCONNECTED:
1508 return -ECONNRESET;
1509
1510 case VARLINK_PENDING_TIMEOUT:
1511 return -ETIME;
1512
1513 default:
1514 assert_not_reached("Unexpected state after method call.");
1515 }
1516 }
1517
1518 int varlink_callb(
1519 Varlink *v,
1520 const char *method,
1521 JsonVariant **ret_parameters,
1522 const char **ret_error_id,
1523 VarlinkReplyFlags *ret_flags, ...) {
1524
1525 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1526 va_list ap;
1527 int r;
1528
1529 assert_return(v, -EINVAL);
1530
1531 va_start(ap, ret_flags);
1532 r = json_buildv(&parameters, ap);
1533 va_end(ap);
1534
1535 if (r < 0)
1536 return r;
1537
1538 return varlink_call(v, method, parameters, ret_parameters, ret_error_id, ret_flags);
1539 }
1540
1541 int varlink_reply(Varlink *v, JsonVariant *parameters) {
1542 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1543 int r;
1544
1545 assert_return(v, -EINVAL);
1546
1547 if (v->state == VARLINK_DISCONNECTED)
1548 return -ENOTCONN;
1549 if (!IN_SET(v->state,
1550 VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
1551 VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1552 return -EBUSY;
1553
1554 r = varlink_sanitize_parameters(&parameters);
1555 if (r < 0)
1556 return r;
1557
1558 r = json_build(&m, JSON_BUILD_OBJECT(JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1559 if (r < 0)
1560 return r;
1561
1562 r = varlink_enqueue_json(v, m);
1563 if (r < 0)
1564 return r;
1565
1566 if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
1567 /* We just replied to a method call that was let hanging for a while (i.e. we were outside of
1568 * the varlink_dispatch_method() stack frame), which means with this reply we are ready to
1569 * process further messages. */
1570 v->current = json_variant_unref(v->current);
1571 varlink_set_state(v, VARLINK_IDLE_SERVER);
1572 } else
1573 /* We replied to a method call from within the varlink_dispatch_method() stack frame), which
1574 * means we should it handle the rest of the state engine. */
1575 varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1576
1577 return 1;
1578 }
1579
1580 int varlink_replyb(Varlink *v, ...) {
1581 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1582 va_list ap;
1583 int r;
1584
1585 assert_return(v, -EINVAL);
1586
1587 va_start(ap, v);
1588 r = json_buildv(&parameters, ap);
1589 va_end(ap);
1590
1591 if (r < 0)
1592 return r;
1593
1594 return varlink_reply(v, parameters);
1595 }
1596
1597 int varlink_error(Varlink *v, const char *error_id, JsonVariant *parameters) {
1598 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1599 int r;
1600
1601 assert_return(v, -EINVAL);
1602 assert_return(error_id, -EINVAL);
1603
1604 if (v->state == VARLINK_DISCONNECTED)
1605 return -ENOTCONN;
1606 if (!IN_SET(v->state,
1607 VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
1608 VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1609 return -EBUSY;
1610
1611 r = varlink_sanitize_parameters(&parameters);
1612 if (r < 0)
1613 return r;
1614
1615 r = json_build(&m, JSON_BUILD_OBJECT(
1616 JSON_BUILD_PAIR("error", JSON_BUILD_STRING(error_id)),
1617 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters))));
1618 if (r < 0)
1619 return r;
1620
1621 r = varlink_enqueue_json(v, m);
1622 if (r < 0)
1623 return r;
1624
1625 if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
1626 v->current = json_variant_unref(v->current);
1627 varlink_set_state(v, VARLINK_IDLE_SERVER);
1628 } else
1629 varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1630
1631 return 1;
1632 }
1633
1634 int varlink_errorb(Varlink *v, const char *error_id, ...) {
1635 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1636 va_list ap;
1637 int r;
1638
1639 assert_return(v, -EINVAL);
1640 assert_return(error_id, -EINVAL);
1641
1642 va_start(ap, error_id);
1643 r = json_buildv(&parameters, ap);
1644 va_end(ap);
1645
1646 if (r < 0)
1647 return r;
1648
1649 return varlink_error(v, error_id, parameters);
1650 }
1651
1652 int varlink_error_invalid_parameter(Varlink *v, JsonVariant *parameters) {
1653
1654 assert_return(v, -EINVAL);
1655 assert_return(parameters, -EINVAL);
1656
1657 /* We expect to be called in one of two ways: the 'parameters' argument is a string variant in which
1658 * case it is the parameter key name that is invalid. Or the 'parameters' argument is an object
1659 * variant in which case we'll pull out the first key. The latter mode is useful in functions that
1660 * don't expect any arguments. */
1661
1662 if (json_variant_is_string(parameters))
1663 return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER, parameters);
1664
1665 if (json_variant_is_object(parameters) &&
1666 json_variant_elements(parameters) > 0)
1667 return varlink_error(v, VARLINK_ERROR_INVALID_PARAMETER,
1668 json_variant_by_index(parameters, 0));
1669
1670 return -EINVAL;
1671 }
1672
1673 int varlink_notify(Varlink *v, JsonVariant *parameters) {
1674 _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
1675 int r;
1676
1677 assert_return(v, -EINVAL);
1678
1679 if (v->state == VARLINK_DISCONNECTED)
1680 return -ENOTCONN;
1681 if (!IN_SET(v->state, VARLINK_PROCESSING_METHOD_MORE, VARLINK_PENDING_METHOD_MORE))
1682 return -EBUSY;
1683
1684 r = varlink_sanitize_parameters(&parameters);
1685 if (r < 0)
1686 return r;
1687
1688 r = json_build(&m, JSON_BUILD_OBJECT(
1689 JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
1690 JSON_BUILD_PAIR("continues", JSON_BUILD_BOOLEAN(true))));
1691 if (r < 0)
1692 return r;
1693
1694 r = varlink_enqueue_json(v, m);
1695 if (r < 0)
1696 return r;
1697
1698 /* No state change, as more is coming */
1699 return 1;
1700 }
1701
1702 int varlink_notifyb(Varlink *v, ...) {
1703 _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
1704 va_list ap;
1705 int r;
1706
1707 assert_return(v, -EINVAL);
1708
1709 va_start(ap, v);
1710 r = json_buildv(&parameters, ap);
1711 va_end(ap);
1712
1713 if (r < 0)
1714 return r;
1715
1716 return varlink_notify(v, parameters);
1717 }
1718
1719 int varlink_bind_reply(Varlink *v, VarlinkReply callback) {
1720 assert_return(v, -EINVAL);
1721
1722 if (callback && v->reply_callback && callback != v->reply_callback)
1723 return -EBUSY;
1724
1725 v->reply_callback = callback;
1726
1727 return 0;
1728 }
1729
1730 void* varlink_set_userdata(Varlink *v, void *userdata) {
1731 void *old;
1732
1733 assert_return(v, NULL);
1734
1735 old = v->userdata;
1736 v->userdata = userdata;
1737
1738 return old;
1739 }
1740
1741 void* varlink_get_userdata(Varlink *v) {
1742 assert_return(v, NULL);
1743
1744 return v->userdata;
1745 }
1746
1747 static int varlink_acquire_ucred(Varlink *v) {
1748 int r;
1749
1750 assert(v);
1751
1752 if (v->ucred_acquired)
1753 return 0;
1754
1755 r = getpeercred(v->fd, &v->ucred);
1756 if (r < 0)
1757 return r;
1758
1759 v->ucred_acquired = true;
1760 return 0;
1761 }
1762
1763 int varlink_get_peer_uid(Varlink *v, uid_t *ret) {
1764 int r;
1765
1766 assert_return(v, -EINVAL);
1767 assert_return(ret, -EINVAL);
1768
1769 r = varlink_acquire_ucred(v);
1770 if (r < 0)
1771 return r;
1772
1773 if (!uid_is_valid(v->ucred.uid))
1774 return -ENODATA;
1775
1776 *ret = v->ucred.uid;
1777 return 0;
1778 }
1779
1780 int varlink_get_peer_pid(Varlink *v, pid_t *ret) {
1781 int r;
1782
1783 assert_return(v, -EINVAL);
1784 assert_return(ret, -EINVAL);
1785
1786 r = varlink_acquire_ucred(v);
1787 if (r < 0)
1788 return r;
1789
1790 if (!pid_is_valid(v->ucred.pid))
1791 return -ENODATA;
1792
1793 *ret = v->ucred.pid;
1794 return 0;
1795 }
1796
1797 int varlink_set_relative_timeout(Varlink *v, usec_t timeout) {
1798 assert_return(v, -EINVAL);
1799 assert_return(timeout > 0, -EINVAL);
1800
1801 v->timeout = timeout;
1802 return 0;
1803 }
1804
1805 VarlinkServer *varlink_get_server(Varlink *v) {
1806 assert_return(v, NULL);
1807
1808 return v->server;
1809 }
1810
1811 int varlink_set_description(Varlink *v, const char *description) {
1812 assert_return(v, -EINVAL);
1813
1814 return free_and_strdup(&v->description, description);
1815 }
1816
1817 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
1818 Varlink *v = userdata;
1819
1820 assert(s);
1821 assert(v);
1822
1823 handle_revents(v, revents);
1824 (void) varlink_process(v);
1825
1826 return 1;
1827 }
1828
1829 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
1830 Varlink *v = userdata;
1831
1832 assert(s);
1833 assert(v);
1834
1835 (void) varlink_process(v);
1836 return 1;
1837 }
1838
1839 static int defer_callback(sd_event_source *s, void *userdata) {
1840 Varlink *v = userdata;
1841
1842 assert(s);
1843 assert(v);
1844
1845 (void) varlink_process(v);
1846 return 1;
1847 }
1848
1849 static int prepare_callback(sd_event_source *s, void *userdata) {
1850 Varlink *v = userdata;
1851 int r, e;
1852 usec_t until;
1853 bool have_timeout;
1854
1855 assert(s);
1856 assert(v);
1857
1858 e = varlink_get_events(v);
1859 if (e < 0)
1860 return e;
1861
1862 r = sd_event_source_set_io_events(v->io_event_source, e);
1863 if (r < 0)
1864 return r;
1865
1866 r = varlink_get_timeout(v, &until);
1867 if (r < 0)
1868 return r;
1869 have_timeout = r > 0;
1870
1871 if (have_timeout) {
1872 r = sd_event_source_set_time(v->time_event_source, until);
1873 if (r < 0)
1874 return r;
1875 }
1876
1877 r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF);
1878 if (r < 0)
1879 return r;
1880
1881 return 1;
1882 }
1883
1884 static int quit_callback(sd_event_source *event, void *userdata) {
1885 Varlink *v = userdata;
1886
1887 assert(event);
1888 assert(v);
1889
1890 varlink_flush(v);
1891 varlink_close(v);
1892
1893 return 1;
1894 }
1895
1896 int varlink_attach_event(Varlink *v, sd_event *e, int64_t priority) {
1897 int r;
1898
1899 assert_return(v, -EINVAL);
1900 assert_return(!v->event, -EBUSY);
1901
1902 if (e)
1903 v->event = sd_event_ref(e);
1904 else {
1905 r = sd_event_default(&v->event);
1906 if (r < 0)
1907 return r;
1908 }
1909
1910 r = sd_event_add_time(v->event, &v->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, v);
1911 if (r < 0)
1912 goto fail;
1913
1914 r = sd_event_source_set_priority(v->time_event_source, priority);
1915 if (r < 0)
1916 goto fail;
1917
1918 (void) sd_event_source_set_description(v->time_event_source, "varlink-time");
1919
1920 r = sd_event_add_exit(v->event, &v->quit_event_source, quit_callback, v);
1921 if (r < 0)
1922 goto fail;
1923
1924 r = sd_event_source_set_priority(v->quit_event_source, priority);
1925 if (r < 0)
1926 goto fail;
1927
1928 (void) sd_event_source_set_description(v->quit_event_source, "varlink-quit");
1929
1930 r = sd_event_add_io(v->event, &v->io_event_source, v->fd, 0, io_callback, v);
1931 if (r < 0)
1932 goto fail;
1933
1934 r = sd_event_source_set_prepare(v->io_event_source, prepare_callback);
1935 if (r < 0)
1936 goto fail;
1937
1938 r = sd_event_source_set_priority(v->io_event_source, priority);
1939 if (r < 0)
1940 goto fail;
1941
1942 (void) sd_event_source_set_description(v->io_event_source, "varlink-io");
1943
1944 r = sd_event_add_defer(v->event, &v->defer_event_source, defer_callback, v);
1945 if (r < 0)
1946 goto fail;
1947
1948 r = sd_event_source_set_priority(v->defer_event_source, priority);
1949 if (r < 0)
1950 goto fail;
1951
1952 (void) sd_event_source_set_description(v->defer_event_source, "varlink-defer");
1953
1954 return 0;
1955
1956 fail:
1957 varlink_detach_event(v);
1958 return r;
1959 }
1960
1961 void varlink_detach_event(Varlink *v) {
1962 if (!v)
1963 return;
1964
1965 varlink_detach_event_sources(v);
1966
1967 v->event = sd_event_unref(v->event);
1968 }
1969
1970 sd_event *varlink_get_event(Varlink *v) {
1971 assert_return(v, NULL);
1972
1973 return v->event;
1974 }
1975
1976 int varlink_server_new(VarlinkServer **ret, VarlinkServerFlags flags) {
1977 VarlinkServer *s;
1978
1979 assert_return(ret, -EINVAL);
1980 assert_return((flags & ~_VARLINK_SERVER_FLAGS_ALL) == 0, -EINVAL);
1981
1982 s = new(VarlinkServer, 1);
1983 if (!s)
1984 return -ENOMEM;
1985
1986 *s = (VarlinkServer) {
1987 .n_ref = 1,
1988 .flags = flags,
1989 .connections_max = varlink_server_connections_max(NULL),
1990 .connections_per_uid_max = varlink_server_connections_per_uid_max(NULL),
1991 };
1992
1993 *ret = s;
1994 return 0;
1995 }
1996
1997 static VarlinkServer* varlink_server_destroy(VarlinkServer *s) {
1998 char *m;
1999
2000 if (!s)
2001 return NULL;
2002
2003 varlink_server_shutdown(s);
2004
2005 while ((m = hashmap_steal_first_key(s->methods)))
2006 free(m);
2007
2008 hashmap_free(s->methods);
2009 hashmap_free(s->by_uid);
2010
2011 sd_event_unref(s->event);
2012
2013 free(s->description);
2014
2015 return mfree(s);
2016 }
2017
2018 DEFINE_TRIVIAL_REF_UNREF_FUNC(VarlinkServer, varlink_server, varlink_server_destroy);
2019
2020 static int validate_connection(VarlinkServer *server, const struct ucred *ucred) {
2021 int allowed = -1;
2022
2023 assert(server);
2024 assert(ucred);
2025
2026 if (FLAGS_SET(server->flags, VARLINK_SERVER_ROOT_ONLY))
2027 allowed = ucred->uid == 0;
2028
2029 if (FLAGS_SET(server->flags, VARLINK_SERVER_MYSELF_ONLY))
2030 allowed = allowed > 0 || ucred->uid == getuid();
2031
2032 if (allowed == 0) { /* Allow access when it is explicitly allowed or when neither
2033 * VARLINK_SERVER_ROOT_ONLY nor VARLINK_SERVER_MYSELF_ONLY are specified. */
2034 varlink_server_log(server, "Unprivileged client attempted connection, refusing.");
2035 return 0;
2036 }
2037
2038 if (server->n_connections >= server->connections_max) {
2039 varlink_server_log(server, "Connection limit of %u reached, refusing.", server->connections_max);
2040 return 0;
2041 }
2042
2043 if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) {
2044 unsigned c;
2045
2046 if (!uid_is_valid(ucred->uid)) {
2047 varlink_server_log(server, "Client with invalid UID attempted connection, refusing.");
2048 return 0;
2049 }
2050
2051 c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid)));
2052 if (c >= server->connections_per_uid_max) {
2053 varlink_server_log(server, "Per-UID connection limit of %u reached, refusing.",
2054 server->connections_per_uid_max);
2055 return 0;
2056 }
2057 }
2058
2059 return 1;
2060 }
2061
2062 static int count_connection(VarlinkServer *server, struct ucred *ucred) {
2063 unsigned c;
2064 int r;
2065
2066 assert(server);
2067 assert(ucred);
2068
2069 server->n_connections++;
2070
2071 if (FLAGS_SET(server->flags, VARLINK_SERVER_ACCOUNT_UID)) {
2072 r = hashmap_ensure_allocated(&server->by_uid, NULL);
2073 if (r < 0)
2074 return log_debug_errno(r, "Failed to allocate UID hash table: %m");
2075
2076 c = PTR_TO_UINT(hashmap_get(server->by_uid, UID_TO_PTR(ucred->uid)));
2077
2078 varlink_server_log(server, "Connections of user " UID_FMT ": %u (of %u max)",
2079 ucred->uid, c, server->connections_per_uid_max);
2080
2081 r = hashmap_replace(server->by_uid, UID_TO_PTR(ucred->uid), UINT_TO_PTR(c + 1));
2082 if (r < 0)
2083 return log_debug_errno(r, "Failed to increment counter in UID hash table: %m");
2084 }
2085
2086 return 0;
2087 }
2088
2089 int varlink_server_add_connection(VarlinkServer *server, int fd, Varlink **ret) {
2090 _cleanup_(varlink_unrefp) Varlink *v = NULL;
2091 bool ucred_acquired;
2092 struct ucred ucred;
2093 int r;
2094
2095 assert_return(server, -EINVAL);
2096 assert_return(fd >= 0, -EBADF);
2097
2098 if ((server->flags & (VARLINK_SERVER_ROOT_ONLY|VARLINK_SERVER_ACCOUNT_UID)) != 0) {
2099 r = getpeercred(fd, &ucred);
2100 if (r < 0)
2101 return varlink_server_log_errno(server, r, "Failed to acquire peer credentials of incoming socket, refusing: %m");
2102
2103 ucred_acquired = true;
2104
2105 r = validate_connection(server, &ucred);
2106 if (r < 0)
2107 return r;
2108 if (r == 0)
2109 return -EPERM;
2110 } else
2111 ucred_acquired = false;
2112
2113 r = varlink_new(&v);
2114 if (r < 0)
2115 return varlink_server_log_errno(server, r, "Failed to allocate connection object: %m");
2116
2117 r = count_connection(server, &ucred);
2118 if (r < 0)
2119 return r;
2120
2121 v->fd = fd;
2122 v->userdata = server->userdata;
2123 if (ucred_acquired) {
2124 v->ucred = ucred;
2125 v->ucred_acquired = true;
2126 }
2127
2128 (void) asprintf(&v->description, "%s-%i", server->description ?: "varlink", v->fd);
2129
2130 /* Link up the server and the connection, and take reference in both directions. Note that the
2131 * reference on the connection is left dangling. It will be dropped when the connection is closed,
2132 * which happens in varlink_close(), including in the event loop quit callback. */
2133 v->server = varlink_server_ref(server);
2134 varlink_ref(v);
2135
2136 varlink_set_state(v, VARLINK_IDLE_SERVER);
2137
2138 if (server->event) {
2139 r = varlink_attach_event(v, server->event, server->event_priority);
2140 if (r < 0) {
2141 varlink_log_errno(v, r, "Failed to attach new connection: %m");
2142 v->fd = -1; /* take the fd out of the connection again */
2143 varlink_close(v);
2144 return r;
2145 }
2146 }
2147
2148 if (ret)
2149 *ret = v;
2150
2151 return 0;
2152 }
2153
2154 static int connect_callback(sd_event_source *source, int fd, uint32_t revents, void *userdata) {
2155 VarlinkServerSocket *ss = userdata;
2156 _cleanup_close_ int cfd = -1;
2157 Varlink *v = NULL;
2158 int r;
2159
2160 assert(source);
2161 assert(ss);
2162
2163 varlink_server_log(ss->server, "New incoming connection.");
2164
2165 cfd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
2166 if (cfd < 0) {
2167 if (ERRNO_IS_ACCEPT_AGAIN(errno))
2168 return 0;
2169
2170 return varlink_server_log_errno(ss->server, errno, "Failed to accept incoming socket: %m");
2171 }
2172
2173 r = varlink_server_add_connection(ss->server, cfd, &v);
2174 if (r < 0)
2175 return 0;
2176
2177 TAKE_FD(cfd);
2178
2179 if (ss->server->connect_callback) {
2180 r = ss->server->connect_callback(ss->server, v, ss->server->userdata);
2181 if (r < 0) {
2182 varlink_log_errno(v, r, "Connection callback returned error, disconnecting client: %m");
2183 varlink_close(v);
2184 return 0;
2185 }
2186 }
2187
2188 return 0;
2189 }
2190
2191 int varlink_server_listen_fd(VarlinkServer *s, int fd) {
2192 _cleanup_free_ VarlinkServerSocket *ss = NULL;
2193 int r;
2194
2195 assert_return(s, -EINVAL);
2196 assert_return(fd >= 0, -EBADF);
2197
2198 r = fd_nonblock(fd, true);
2199 if (r < 0)
2200 return r;
2201
2202 ss = new(VarlinkServerSocket, 1);
2203 if (!ss)
2204 return -ENOMEM;
2205
2206 *ss = (VarlinkServerSocket) {
2207 .server = s,
2208 .fd = fd,
2209 };
2210
2211 if (s->event) {
2212 _cleanup_(sd_event_source_unrefp) sd_event_source *es = NULL;
2213
2214 r = sd_event_add_io(s->event, &es, fd, EPOLLIN, connect_callback, ss);
2215 if (r < 0)
2216 return r;
2217
2218 r = sd_event_source_set_priority(ss->event_source, s->event_priority);
2219 if (r < 0)
2220 return r;
2221 }
2222
2223 LIST_PREPEND(sockets, s->sockets, TAKE_PTR(ss));
2224 return 0;
2225 }
2226
2227 int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t m) {
2228 union sockaddr_union sockaddr;
2229 socklen_t sockaddr_len;
2230 _cleanup_close_ int fd = -1;
2231 int r;
2232
2233 assert_return(s, -EINVAL);
2234 assert_return(address, -EINVAL);
2235 assert_return((m & ~0777) == 0, -EINVAL);
2236
2237 r = sockaddr_un_set_path(&sockaddr.un, address);
2238 if (r < 0)
2239 return r;
2240 sockaddr_len = r;
2241
2242 fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
2243 if (fd < 0)
2244 return -errno;
2245
2246 fd = fd_move_above_stdio(fd);
2247
2248 (void) sockaddr_un_unlink(&sockaddr.un);
2249
2250 RUN_WITH_UMASK(~m & 0777)
2251 if (bind(fd, &sockaddr.sa, sockaddr_len) < 0)
2252 return -errno;
2253
2254 if (listen(fd, SOMAXCONN) < 0)
2255 return -errno;
2256
2257 r = varlink_server_listen_fd(s, fd);
2258 if (r < 0)
2259 return r;
2260
2261 TAKE_FD(fd);
2262 return 0;
2263 }
2264
2265 void* varlink_server_set_userdata(VarlinkServer *s, void *userdata) {
2266 void *ret;
2267
2268 assert_return(s, NULL);
2269
2270 ret = s->userdata;
2271 s->userdata = userdata;
2272
2273 return ret;
2274 }
2275
2276 void* varlink_server_get_userdata(VarlinkServer *s) {
2277 assert_return(s, NULL);
2278
2279 return s->userdata;
2280 }
2281
2282 static VarlinkServerSocket* varlink_server_socket_destroy(VarlinkServerSocket *ss) {
2283 if (!ss)
2284 return NULL;
2285
2286 if (ss->server)
2287 LIST_REMOVE(sockets, ss->server->sockets, ss);
2288
2289 sd_event_source_disable_unref(ss->event_source);
2290
2291 free(ss->address);
2292 safe_close(ss->fd);
2293
2294 return mfree(ss);
2295 }
2296
2297 int varlink_server_shutdown(VarlinkServer *s) {
2298 assert_return(s, -EINVAL);
2299
2300 while (s->sockets)
2301 varlink_server_socket_destroy(s->sockets);
2302
2303 return 0;
2304 }
2305
2306 int varlink_server_attach_event(VarlinkServer *s, sd_event *e, int64_t priority) {
2307 VarlinkServerSocket *ss;
2308 int r;
2309
2310 assert_return(s, -EINVAL);
2311 assert_return(!s->event, -EBUSY);
2312
2313 if (e)
2314 s->event = sd_event_ref(e);
2315 else {
2316 r = sd_event_default(&s->event);
2317 if (r < 0)
2318 return r;
2319 }
2320
2321 LIST_FOREACH(sockets, ss, s->sockets) {
2322 assert(!ss->event_source);
2323
2324 r = sd_event_add_io(s->event, &ss->event_source, ss->fd, EPOLLIN, connect_callback, ss);
2325 if (r < 0)
2326 goto fail;
2327
2328 r = sd_event_source_set_priority(ss->event_source, priority);
2329 if (r < 0)
2330 goto fail;
2331 }
2332
2333 s->event_priority = priority;
2334 return 0;
2335
2336 fail:
2337 varlink_server_detach_event(s);
2338 return r;
2339 }
2340
2341 int varlink_server_detach_event(VarlinkServer *s) {
2342 VarlinkServerSocket *ss;
2343
2344 assert_return(s, -EINVAL);
2345
2346 LIST_FOREACH(sockets, ss, s->sockets) {
2347
2348 if (!ss->event_source)
2349 continue;
2350
2351 (void) sd_event_source_set_enabled(ss->event_source, SD_EVENT_OFF);
2352 ss->event_source = sd_event_source_unref(ss->event_source);
2353 }
2354
2355 sd_event_unref(s->event);
2356 return 0;
2357 }
2358
2359 sd_event *varlink_server_get_event(VarlinkServer *s) {
2360 assert_return(s, NULL);
2361
2362 return s->event;
2363 }
2364
2365 int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMethod callback) {
2366 char *m;
2367 int r;
2368
2369 assert_return(s, -EINVAL);
2370 assert_return(method, -EINVAL);
2371 assert_return(callback, -EINVAL);
2372
2373 if (startswith(method, "org.varlink.service."))
2374 return -EEXIST;
2375
2376 r = hashmap_ensure_allocated(&s->methods, &string_hash_ops);
2377 if (r < 0)
2378 return r;
2379
2380 m = strdup(method);
2381 if (!m)
2382 return -ENOMEM;
2383
2384 r = hashmap_put(s->methods, m, callback);
2385 if (r < 0) {
2386 free(m);
2387 return r;
2388 }
2389
2390 return 0;
2391 }
2392
2393 int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) {
2394 va_list ap;
2395 int r = 0;
2396
2397 assert_return(s, -EINVAL);
2398
2399 va_start(ap, s);
2400 for (;;) {
2401 VarlinkMethod callback;
2402 const char *method;
2403
2404 method = va_arg(ap, const char *);
2405 if (!method)
2406 break;
2407
2408 callback = va_arg(ap, VarlinkMethod);
2409
2410 r = varlink_server_bind_method(s, method, callback);
2411 if (r < 0)
2412 break;
2413 }
2414 va_end(ap);
2415
2416 return r;
2417 }
2418
2419 int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) {
2420 assert_return(s, -EINVAL);
2421
2422 if (callback && s->connect_callback && callback != s->connect_callback)
2423 return -EBUSY;
2424
2425 s->connect_callback = callback;
2426 return 0;
2427 }
2428
2429 int varlink_server_bind_disconnect(VarlinkServer *s, VarlinkDisconnect callback) {
2430 assert_return(s, -EINVAL);
2431
2432 if (callback && s->disconnect_callback && callback != s->disconnect_callback)
2433 return -EBUSY;
2434
2435 s->disconnect_callback = callback;
2436 return 0;
2437 }
2438
2439 unsigned varlink_server_connections_max(VarlinkServer *s) {
2440 int dts;
2441
2442 /* If a server is specified, return the setting for that server, otherwise the default value */
2443 if (s)
2444 return s->connections_max;
2445
2446 dts = getdtablesize();
2447 assert_se(dts > 0);
2448
2449 /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
2450 if (VARLINK_DEFAULT_CONNECTIONS_MAX > (unsigned) dts / 4 * 3)
2451 return dts / 4 * 3;
2452
2453 return VARLINK_DEFAULT_CONNECTIONS_MAX;
2454 }
2455
2456 unsigned varlink_server_connections_per_uid_max(VarlinkServer *s) {
2457 unsigned m;
2458
2459 if (s)
2460 return s->connections_per_uid_max;
2461
2462 /* Make sure to never use up more than ¾th of available connections for a single user */
2463 m = varlink_server_connections_max(NULL);
2464 if (VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX > m)
2465 return m / 4 * 3;
2466
2467 return VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX;
2468 }
2469
2470 int varlink_server_set_connections_per_uid_max(VarlinkServer *s, unsigned m) {
2471 assert_return(s, -EINVAL);
2472 assert_return(m > 0, -EINVAL);
2473
2474 s->connections_per_uid_max = m;
2475 return 0;
2476 }
2477
2478 int varlink_server_set_connections_max(VarlinkServer *s, unsigned m) {
2479 assert_return(s, -EINVAL);
2480 assert_return(m > 0, -EINVAL);
2481
2482 s->connections_max = m;
2483 return 0;
2484 }
2485
2486 unsigned varlink_server_current_connections(VarlinkServer *s) {
2487 assert_return(s, UINT_MAX);
2488
2489 return s->n_connections;
2490 }
2491
2492 int varlink_server_set_description(VarlinkServer *s, const char *description) {
2493 assert_return(s, -EINVAL);
2494
2495 return free_and_strdup(&s->description, description);
2496 }