]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/libsystemd-bus/sd-bus.c
bus: implicitly collect ucred/label information
[thirdparty/systemd.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
43 unsigned i;
44
45 assert(b);
46
47 if (b->fd >= 0)
48 close_nointr_nofail(b->fd);
49
50 free(b->rbuffer);
51 free(b->unique_name);
52 free(b->auth_uid);
53 free(b->address);
54
55 for (i = 0; i < b->rqueue_size; i++)
56 sd_bus_message_unref(b->rqueue[i]);
57 free(b->rqueue);
58
59 for (i = 0; i < b->wqueue_size; i++)
60 sd_bus_message_unref(b->wqueue[i]);
61 free(b->wqueue);
62
63 hashmap_free_free(b->reply_callbacks);
64 prioq_free(b->reply_callbacks_prioq);
65
66 while ((f = b->filter_callbacks)) {
67 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
68 free(f);
69 }
70
71 free(b);
72 }
73
74 static sd_bus* bus_new(void) {
75 sd_bus *r;
76
77 r = new0(sd_bus, 1);
78 if (!r)
79 return NULL;
80
81 r->n_ref = 1;
82 r->fd = -1;
83 r->message_version = 1;
84
85 /* We guarantee that wqueue always has space for at least one
86 * entry */
87 r->wqueue = new(sd_bus_message*, 1);
88 if (!r->wqueue) {
89 free(r);
90 return NULL;
91 }
92
93 return r;
94 };
95
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
97 const char *s;
98 int r;
99
100 assert(bus);
101
102 if (error != 0)
103 return -error;
104
105 assert(reply);
106
107 r = sd_bus_message_read(reply, "s", &s);
108 if (r < 0)
109 return r;
110
111 if (!service_name_is_valid(s) || s[0] != ':')
112 return -EBADMSG;
113
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
116 return -ENOMEM;
117
118 bus->state = BUS_RUNNING;
119
120 return 1;
121 }
122
123 static int bus_send_hello(sd_bus *bus) {
124 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
125 int r;
126
127 assert(bus);
128
129 r = sd_bus_message_new_method_call(
130 bus,
131 "org.freedesktop.DBus",
132 "/",
133 "org.freedesktop.DBus",
134 "Hello",
135 &m);
136 if (r < 0)
137 return r;
138
139 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
140 if (r < 0)
141 return r;
142
143 bus->sent_hello = true;
144 return r;
145 }
146
147 static int bus_start_running(sd_bus *bus) {
148 assert(bus);
149
150 if (bus->sent_hello) {
151 bus->state = BUS_HELLO;
152 return 1;
153 }
154
155 bus->state = BUS_RUNNING;
156 return 1;
157 }
158
159 static int parse_address_key(const char **p, const char *key, char **value) {
160 size_t l, n = 0;
161 const char *a;
162 char *r = NULL;
163
164 assert(p);
165 assert(*p);
166 assert(key);
167 assert(value);
168
169 l = strlen(key);
170 if (strncmp(*p, key, l) != 0)
171 return 0;
172
173 if ((*p)[l] != '=')
174 return 0;
175
176 if (*value)
177 return -EINVAL;
178
179 a = *p + l + 1;
180 while (*a != ',' && *a != 0) {
181 char c, *t;
182
183 if (*a == '%') {
184 int x, y;
185
186 x = unhexchar(a[1]);
187 if (x < 0) {
188 free(r);
189 return x;
190 }
191
192 y = unhexchar(a[2]);
193 if (y < 0) {
194 free(r);
195 return y;
196 }
197
198 c = (char) ((x << 4) | y);
199 a += 3;
200 } else {
201 c = *a;
202 a++;
203 }
204
205 t = realloc(r, n + 2);
206 if (!t) {
207 free(r);
208 return -ENOMEM;
209 }
210
211 r = t;
212 r[n++] = c;
213 }
214
215 if (!r) {
216 r = strdup("");
217 if (!r)
218 return -ENOMEM;
219 } else
220 r[n] = 0;
221
222 if (*a == ',')
223 a++;
224
225 *p = a;
226 *value = r;
227 return 1;
228 }
229
230 static void skip_address_key(const char **p) {
231 assert(p);
232 assert(*p);
233
234 *p += strcspn(*p, ",");
235
236 if (**p == ',')
237 (*p) ++;
238 }
239
240 static int bus_parse_next_address(sd_bus *b) {
241 const char *a, *p;
242 _cleanup_free_ char *guid = NULL;
243 int r;
244
245 assert(b);
246
247 if (!b->address)
248 return 0;
249 if (b->address[b->address_index] == 0)
250 return 0;
251
252 a = b->address + b->address_index;
253
254 zero(b->sockaddr);
255 b->sockaddr_size = 0;
256 b->peer = SD_ID128_NULL;
257
258 if (startswith(a, "unix:")) {
259 _cleanup_free_ char *path = NULL, *abstract = NULL;
260
261 p = a + 5;
262 while (*p != 0) {
263 r = parse_address_key(&p, "guid", &guid);
264 if (r < 0)
265 return r;
266 else if (r > 0)
267 continue;
268
269 r = parse_address_key(&p, "path", &path);
270 if (r < 0)
271 return r;
272 else if (r > 0)
273 continue;
274
275 r = parse_address_key(&p, "abstract", &abstract);
276 if (r < 0)
277 return r;
278 else if (r > 0)
279 continue;
280
281 skip_address_key(&p);
282 }
283
284 if (!path && !abstract)
285 return -EINVAL;
286
287 if (path && abstract)
288 return -EINVAL;
289
290 if (path) {
291 size_t l;
292
293 l = strlen(path);
294 if (l > sizeof(b->sockaddr.un.sun_path))
295 return -E2BIG;
296
297 b->sockaddr.un.sun_family = AF_UNIX;
298 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
299 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
300 } else if (abstract) {
301 size_t l;
302
303 l = strlen(abstract);
304 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
305 return -E2BIG;
306
307 b->sockaddr.un.sun_family = AF_UNIX;
308 b->sockaddr.un.sun_path[0] = 0;
309 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
310 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
311 }
312
313 } else if (startswith(a, "tcp:")) {
314 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
315 struct addrinfo hints, *result;
316
317 p = a + 4;
318 while (*p != 0) {
319 r = parse_address_key(&p, "guid", &guid);
320 if (r < 0)
321 return r;
322 else if (r > 0)
323 continue;
324
325 r = parse_address_key(&p, "host", &host);
326 if (r < 0)
327 return r;
328 else if (r > 0)
329 continue;
330
331 r = parse_address_key(&p, "port", &port);
332 if (r < 0)
333 return r;
334 else if (r > 0)
335 continue;
336
337 r = parse_address_key(&p, "family", &family);
338 if (r < 0)
339 return r;
340 else if (r > 0)
341 continue;
342
343 skip_address_key(&p);
344 }
345
346 if (!host || !port)
347 return -EINVAL;
348
349 zero(hints);
350 hints.ai_socktype = SOCK_STREAM;
351 hints.ai_flags = AI_ADDRCONFIG;
352
353 if (family) {
354 if (streq(family, "ipv4"))
355 hints.ai_family = AF_INET;
356 else if (streq(family, "ipv6"))
357 hints.ai_family = AF_INET6;
358 else
359 return -EINVAL;
360 }
361
362 r = getaddrinfo(host, port, &hints, &result);
363 if (r == EAI_SYSTEM)
364 return -errno;
365 else if (r != 0)
366 return -EADDRNOTAVAIL;
367
368 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
369 b->sockaddr_size = result->ai_addrlen;
370
371 freeaddrinfo(result);
372 }
373
374 if (guid) {
375 r = sd_id128_from_string(guid, &b->peer);
376 if (r < 0)
377 return r;
378 }
379
380 b->address_index = p - b->address;
381 return 1;
382 }
383
384 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
385
386 while (size > 0) {
387 struct iovec *i = iov + *idx;
388
389 if (i->iov_len > size) {
390 i->iov_base = (uint8_t*) i->iov_base + size;
391 i->iov_len -= size;
392 return;
393 }
394
395 size -= i->iov_len;
396
397 i->iov_base = NULL;
398 i->iov_len = 0;
399
400 (*idx) ++;
401 }
402 }
403
404 static int bus_write_auth(sd_bus *b) {
405 struct msghdr mh;
406 ssize_t k;
407
408 assert(b);
409 assert(b->state == BUS_AUTHENTICATING);
410
411 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
412 return 0;
413
414 if (b->auth_timeout == 0)
415 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
416
417 zero(mh);
418 mh.msg_iov = b->auth_iovec + b->auth_index;
419 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
420
421 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
422 if (k < 0)
423 return errno == EAGAIN ? 0 : -errno;
424
425 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
426
427 return 1;
428 }
429
430 static int bus_auth_verify(sd_bus *b) {
431 char *e, *f;
432 sd_id128_t peer;
433 unsigned i;
434 int r;
435
436 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
437 * that's it */
438
439 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
440 if (!e)
441 return 0;
442
443 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
444 if (!f)
445 return 0;
446
447 if (e - (char*) b->rbuffer != 3 + 32)
448 return -EPERM;
449
450 if (memcmp(b->rbuffer, "OK ", 3))
451 return -EPERM;
452
453 for (i = 0; i < 32; i += 2) {
454 int x, y;
455
456 x = unhexchar(((char*) b->rbuffer)[3 + i]);
457 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
458
459 if (x < 0 || y < 0)
460 return -EINVAL;
461
462 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
463 }
464
465 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
466 !sd_id128_equal(b->peer, peer))
467 return -EPERM;
468
469 b->peer = peer;
470
471 b->can_fds =
472 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
473 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
474
475 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
476 memmove(b->rbuffer, f + 2, b->rbuffer_size);
477
478 r = bus_start_running(b);
479 if (r < 0)
480 return r;
481
482 return 1;
483 }
484
485 static int bus_read_auth(sd_bus *b) {
486 struct msghdr mh;
487 struct iovec iov;
488 size_t n;
489 ssize_t k;
490 int r;
491 void *p;
492
493 assert(b);
494
495 r = bus_auth_verify(b);
496 if (r != 0)
497 return r;
498
499 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
500
501 if (n > BUS_AUTH_SIZE_MAX)
502 n = BUS_AUTH_SIZE_MAX;
503
504 if (b->rbuffer_size >= n)
505 return -ENOBUFS;
506
507 p = realloc(b->rbuffer, n);
508 if (!p)
509 return -ENOMEM;
510
511 b->rbuffer = p;
512
513 zero(iov);
514 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
515 iov.iov_len = n - b->rbuffer_size;
516
517 zero(mh);
518 mh.msg_iov = &iov;
519 mh.msg_iovlen = 1;
520
521 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
522 if (k < 0)
523 return errno == EAGAIN ? 0 : -errno;
524
525 b->rbuffer_size += k;
526
527 r = bus_auth_verify(b);
528 if (r != 0)
529 return r;
530
531 return 1;
532 }
533
534 static int bus_setup_fd(sd_bus *b) {
535 int one;
536
537 assert(b);
538
539 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
540 * just in case. This is actually irrelavant for */
541 one = 1;
542 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
543 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
544
545 /* Increase the buffers to a MB */
546 fd_inc_rcvbuf(b->fd, 1024*1024);
547 fd_inc_sndbuf(b->fd, 1024*1024);
548
549 return 0;
550 }
551
552 static int bus_start_auth(sd_bus *b) {
553 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
554 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
555
556 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
557 size_t l;
558
559 assert(b);
560
561 b->state = BUS_AUTHENTICATING;
562
563 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
564 char_array_0(text);
565
566 l = strlen(text);
567 b->auth_uid = hexmem(text, l);
568 if (!b->auth_uid)
569 return -ENOMEM;
570
571 b->auth_iovec[0].iov_base = (void*) auth_prefix;
572 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
573 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
574 b->auth_iovec[1].iov_len = l * 2;
575 b->auth_iovec[2].iov_base = (void*) auth_suffix;
576 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
577 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
578
579 return bus_write_auth(b);
580 }
581
582 static int bus_start_connect(sd_bus *b) {
583 int r;
584
585 assert(b);
586 assert(b->fd < 0);
587
588 for (;;) {
589 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
590 r = bus_parse_next_address(b);
591 if (r < 0)
592 return r;
593 if (r == 0)
594 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
595 }
596
597 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
598 if (b->fd < 0) {
599 b->last_connect_error = errno;
600 goto try_again;
601 }
602
603 r = bus_setup_fd(b);
604 if (r < 0) {
605 b->last_connect_error = errno;
606 goto try_again;
607 }
608
609 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
610 if (r < 0) {
611 if (errno == EINPROGRESS)
612 return 1;
613
614 b->last_connect_error = errno;
615 goto try_again;
616 }
617
618 return bus_start_auth(b);
619
620 try_again:
621 zero(b->sockaddr);
622
623 if (b->fd >= 0) {
624 close_nointr_nofail(b->fd);
625 b->fd = -1;
626 }
627 }
628 }
629
630 int sd_bus_open_system(sd_bus **ret) {
631 const char *e;
632 sd_bus *b;
633 int r;
634
635 if (!ret)
636 return -EINVAL;
637
638 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
639 if (e) {
640 r = sd_bus_open_address(e, &b);
641 if (r < 0)
642 return r;
643 } else {
644 b = bus_new();
645 if (!b)
646 return -ENOMEM;
647
648 b->sockaddr.un.sun_family = AF_UNIX;
649 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
650 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
651
652 r = bus_start_connect(b);
653 if (r < 0) {
654 bus_free(b);
655 return r;
656 }
657 }
658
659 r = bus_send_hello(b);
660 if (r < 0) {
661 sd_bus_unref(b);
662 return r;
663 }
664
665 *ret = b;
666 return 0;
667 }
668
669 int sd_bus_open_user(sd_bus **ret) {
670 const char *e;
671 sd_bus *b;
672 size_t l;
673 int r;
674
675 if (!ret)
676 return -EINVAL;
677
678 e = getenv("DBUS_SESSION_BUS_ADDRESS");
679 if (e) {
680 r = sd_bus_open_address(e, &b);
681 if (r < 0)
682 return r;
683 } else {
684 e = getenv("XDG_RUNTIME_DIR");
685 if (!e)
686 return -ENOENT;
687
688 l = strlen(e);
689 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
690 return -E2BIG;
691
692 b = bus_new();
693 if (!b)
694 return -ENOMEM;
695
696 b->sockaddr.un.sun_family = AF_UNIX;
697 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
698 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
699
700 r = bus_start_connect(b);
701 if (r < 0) {
702 bus_free(b);
703 return r;
704 }
705 }
706
707 r = bus_send_hello(b);
708 if (r < 0) {
709 sd_bus_unref(b);
710 return r;
711 }
712
713 *ret = b;
714 return 0;
715 }
716
717 int sd_bus_open_address(const char *address, sd_bus **ret) {
718 sd_bus *b;
719 int r;
720
721 if (!address)
722 return -EINVAL;
723 if (!ret)
724 return -EINVAL;
725
726 b = bus_new();
727 if (!b)
728 return -ENOMEM;
729
730 b->address = strdup(address);
731 if (!b->address) {
732 bus_free(b);
733 return -ENOMEM;
734 }
735
736 r = bus_start_connect(b);
737 if (r < 0) {
738 bus_free(b);
739 return r;
740 }
741
742 *ret = b;
743 return 0;
744 }
745
746 int sd_bus_open_fd(int fd, sd_bus **ret) {
747 sd_bus *b;
748 int r;
749
750 if (fd < 0)
751 return -EINVAL;
752 if (!ret)
753 return -EINVAL;
754
755 b = bus_new();
756 if (!b)
757 return -ENOMEM;
758
759 b->fd = fd;
760
761 r = fd_nonblock(b->fd, true);
762 if (r < 0)
763 goto fail;
764
765 fd_cloexec(b->fd, true);
766 if (r < 0)
767 goto fail;
768
769 r = bus_setup_fd(b);
770 if (r < 0)
771 goto fail;
772
773 r = bus_start_auth(b);
774 if (r < 0)
775 goto fail;
776
777 *ret = b;
778 return 0;
779
780 fail:
781 bus_free(b);
782 return r;
783 }
784
785 void sd_bus_close(sd_bus *bus) {
786 if (!bus)
787 return;
788 if (bus->fd < 0)
789 return;
790
791 close_nointr_nofail(bus->fd);
792 bus->fd = -1;
793 }
794
795 sd_bus *sd_bus_ref(sd_bus *bus) {
796 if (!bus)
797 return NULL;
798
799 assert(bus->n_ref > 0);
800
801 bus->n_ref++;
802 return bus;
803 }
804
805 sd_bus *sd_bus_unref(sd_bus *bus) {
806 if (!bus)
807 return NULL;
808
809 assert(bus->n_ref > 0);
810 bus->n_ref--;
811
812 if (bus->n_ref <= 0)
813 bus_free(bus);
814
815 return NULL;
816 }
817
818 int sd_bus_is_open(sd_bus *bus) {
819 if (!bus)
820 return -EINVAL;
821
822 return bus->fd >= 0;
823 }
824
825 int sd_bus_can_send(sd_bus *bus, char type) {
826 int r;
827
828 if (!bus)
829 return -EINVAL;
830
831 if (type == SD_BUS_TYPE_UNIX_FD) {
832 r = bus_ensure_running(bus);
833 if (r < 0)
834 return r;
835
836 return bus->can_fds;
837 }
838
839 return bus_type_is_valid(type);
840 }
841
842 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
843 int r;
844
845 if (!bus)
846 return -EINVAL;
847 if (!peer)
848 return -EINVAL;
849
850 r = bus_ensure_running(bus);
851 if (r < 0)
852 return r;
853
854 *peer = bus->peer;
855 return 0;
856 }
857
858 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
859 assert(m);
860
861 if (m->header->version > b->message_version)
862 return -EPERM;
863
864 if (m->sealed)
865 return 0;
866
867 return bus_message_seal(m, ++b->serial);
868 }
869
870 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
871 struct msghdr mh;
872 struct iovec *iov;
873 ssize_t k;
874 size_t n;
875 unsigned j;
876
877 assert(bus);
878 assert(m);
879 assert(idx);
880 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
881
882 if (*idx >= m->size)
883 return 0;
884
885 n = m->n_iovec * sizeof(struct iovec);
886 iov = alloca(n);
887 memcpy(iov, m->iovec, n);
888
889 j = 0;
890 iovec_advance(iov, &j, *idx);
891
892 zero(mh);
893 mh.msg_iov = iov;
894 mh.msg_iovlen = m->n_iovec;
895
896 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
897 if (k < 0)
898 return errno == EAGAIN ? 0 : -errno;
899
900 *idx += (size_t) k;
901 return 1;
902 }
903
904 static int message_read_need(sd_bus *bus, size_t *need) {
905 uint32_t a, b;
906 uint8_t e;
907 uint64_t sum;
908
909 assert(bus);
910 assert(need);
911 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
912
913 if (bus->rbuffer_size < sizeof(struct bus_header)) {
914 *need = sizeof(struct bus_header) + 8;
915
916 /* Minimum message size:
917 *
918 * Header +
919 *
920 * Method Call: +2 string headers
921 * Signal: +3 string headers
922 * Method Error: +1 string headers
923 * +1 uint32 headers
924 * Method Reply: +1 uint32 headers
925 *
926 * A string header is at least 9 bytes
927 * A uint32 header is at least 8 bytes
928 *
929 * Hence the minimum message size of a valid message
930 * is header + 8 bytes */
931
932 return 0;
933 }
934
935 a = ((const uint32_t*) bus->rbuffer)[1];
936 b = ((const uint32_t*) bus->rbuffer)[3];
937
938 e = ((const uint8_t*) bus->rbuffer)[0];
939 if (e == SD_BUS_LITTLE_ENDIAN) {
940 a = le32toh(a);
941 b = le32toh(b);
942 } else if (e == SD_BUS_BIG_ENDIAN) {
943 a = be32toh(a);
944 b = be32toh(b);
945 } else
946 return -EBADMSG;
947
948 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
949 if (sum >= BUS_MESSAGE_SIZE_MAX)
950 return -ENOBUFS;
951
952 *need = (size_t) sum;
953 return 0;
954 }
955
956 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
957 sd_bus_message *t;
958 void *b = NULL;
959 int r;
960
961 assert(bus);
962 assert(m);
963 assert(bus->rbuffer_size >= size);
964 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
965
966 if (bus->rbuffer_size > size) {
967 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
968 if (!b) {
969 free(t);
970 return -ENOMEM;
971 }
972 }
973
974 r = bus_message_from_malloc(bus->rbuffer, size,
975 bus->ucred_valid ? &bus->ucred : NULL,
976 bus->label[0] ? bus->label : NULL, &t);
977 if (r < 0) {
978 free(b);
979 return r;
980 }
981
982 bus->rbuffer = b;
983 bus->rbuffer_size -= size;
984
985 *m = t;
986 return 1;
987 }
988
989 static int message_read(sd_bus *bus, sd_bus_message **m) {
990 struct msghdr mh;
991 struct iovec iov;
992 ssize_t k;
993 size_t need;
994 int r;
995 void *b;
996 union {
997 struct cmsghdr cmsghdr;
998 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
999 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1000 } control;
1001 struct cmsghdr *cmsg;
1002
1003 assert(bus);
1004 assert(m);
1005 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1006
1007 r = message_read_need(bus, &need);
1008 if (r < 0)
1009 return r;
1010
1011 if (bus->rbuffer_size >= need)
1012 return message_make(bus, need, m);
1013
1014 b = realloc(bus->rbuffer, need);
1015 if (!b)
1016 return -ENOMEM;
1017
1018 bus->rbuffer = b;
1019
1020 zero(iov);
1021 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1022 iov.iov_len = need - bus->rbuffer_size;
1023
1024 zero(mh);
1025 mh.msg_iov = &iov;
1026 mh.msg_iovlen = 1;
1027 mh.msg_control = &control;
1028 mh.msg_controllen = sizeof(control);
1029
1030 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1031 if (k < 0)
1032 return errno == EAGAIN ? 0 : -errno;
1033
1034 bus->rbuffer_size += k;
1035 bus->ucred_valid = false;
1036 bus->label[0] = 0;
1037
1038 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1039 if (cmsg->cmsg_level == SOL_SOCKET &&
1040 cmsg->cmsg_type == SCM_CREDENTIALS &&
1041 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1042
1043 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1044 bus->ucred_valid = true;
1045
1046 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1047 cmsg->cmsg_type == SCM_SECURITY) {
1048
1049 size_t l;
1050 l = cmsg->cmsg_len - CMSG_LEN(0);
1051 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1052 bus->label[l] = 0;
1053 }
1054 }
1055
1056 r = message_read_need(bus, &need);
1057 if (r < 0)
1058 return r;
1059
1060 if (bus->rbuffer_size >= need)
1061 return message_make(bus, need, m);
1062
1063 return 1;
1064 }
1065
1066 static int dispatch_wqueue(sd_bus *bus) {
1067 int r, ret = 0;
1068
1069 assert(bus);
1070 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1071
1072 if (bus->fd < 0)
1073 return -ENOTCONN;
1074
1075 while (bus->wqueue_size > 0) {
1076
1077 r = message_write(bus, bus->wqueue[0], &bus->windex);
1078 if (r < 0) {
1079 sd_bus_close(bus);
1080 return r;
1081 } else if (r == 0)
1082 /* Didn't do anything this time */
1083 return ret;
1084 else if (bus->windex >= bus->wqueue[0]->size) {
1085 /* Fully written. Let's drop the entry from
1086 * the queue.
1087 *
1088 * This isn't particularly optimized, but
1089 * well, this is supposed to be our worst-case
1090 * buffer only, and the socket buffer is
1091 * supposed to be our primary buffer, and if
1092 * it got full, then all bets are off
1093 * anyway. */
1094
1095 sd_bus_message_unref(bus->wqueue[0]);
1096 bus->wqueue_size --;
1097 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1098 bus->windex = 0;
1099
1100 ret = 1;
1101 }
1102 }
1103
1104 return ret;
1105 }
1106
1107 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1108 sd_bus_message *z = NULL;
1109 int r, ret = 0;
1110
1111 assert(bus);
1112 assert(m);
1113 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1114
1115 if (bus->fd < 0)
1116 return -ENOTCONN;
1117
1118 if (bus->rqueue_size > 0) {
1119 /* Dispatch a queued message */
1120
1121 *m = bus->rqueue[0];
1122 bus->rqueue_size --;
1123 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1124 return 1;
1125 }
1126
1127 /* Try to read a new message */
1128 do {
1129 r = message_read(bus, &z);
1130 if (r < 0) {
1131 sd_bus_close(bus);
1132 return r;
1133 }
1134 if (r == 0)
1135 return ret;
1136
1137 r = 1;
1138 } while (!z);
1139
1140 *m = z;
1141 return 1;
1142 }
1143
1144 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1145 int r;
1146
1147 if (!bus)
1148 return -EINVAL;
1149 if (bus->fd < 0)
1150 return -ENOTCONN;
1151 if (!m)
1152 return -EINVAL;
1153
1154 /* If the serial number isn't kept, then we know that no reply
1155 * is expected */
1156 if (!serial && !m->sealed)
1157 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1158
1159 r = bus_seal_message(bus, m);
1160 if (r < 0)
1161 return r;
1162
1163 /* If this is a reply and no reply was requested, then let's
1164 * suppress this, if we can */
1165 if (m->dont_send && !serial)
1166 return 0;
1167
1168 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1169 size_t idx = 0;
1170
1171 r = message_write(bus, m, &idx);
1172 if (r < 0) {
1173 sd_bus_close(bus);
1174 return r;
1175 } else if (idx < m->size) {
1176 /* Wasn't fully written. So let's remember how
1177 * much was written. Note that the first entry
1178 * of the wqueue array is always allocated so
1179 * that we always can remember how much was
1180 * written. */
1181 bus->wqueue[0] = sd_bus_message_ref(m);
1182 bus->wqueue_size = 1;
1183 bus->windex = idx;
1184 }
1185 } else {
1186 sd_bus_message **q;
1187
1188 /* Just append it to the queue. */
1189
1190 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1191 return -ENOBUFS;
1192
1193 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1194 if (!q)
1195 return -ENOMEM;
1196
1197 bus->wqueue = q;
1198 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1199 }
1200
1201 if (serial)
1202 *serial = BUS_MESSAGE_SERIAL(m);
1203
1204 return 0;
1205 }
1206
1207 static usec_t calc_elapse(uint64_t usec) {
1208 if (usec == (uint64_t) -1)
1209 return 0;
1210
1211 if (usec == 0)
1212 usec = BUS_DEFAULT_TIMEOUT;
1213
1214 return now(CLOCK_MONOTONIC) + usec;
1215 }
1216
1217 static int timeout_compare(const void *a, const void *b) {
1218 const struct reply_callback *x = a, *y = b;
1219
1220 if (x->timeout != 0 && y->timeout == 0)
1221 return -1;
1222
1223 if (x->timeout == 0 && y->timeout != 0)
1224 return 1;
1225
1226 if (x->timeout < y->timeout)
1227 return -1;
1228
1229 if (x->timeout > y->timeout)
1230 return 1;
1231
1232 return 0;
1233 }
1234
1235 int sd_bus_send_with_reply(
1236 sd_bus *bus,
1237 sd_bus_message *m,
1238 sd_message_handler_t callback,
1239 void *userdata,
1240 uint64_t usec,
1241 uint64_t *serial) {
1242
1243 struct reply_callback *c;
1244 int r;
1245
1246 if (!bus)
1247 return -EINVAL;
1248 if (bus->fd < 0)
1249 return -ENOTCONN;
1250 if (!m)
1251 return -EINVAL;
1252 if (!callback)
1253 return -EINVAL;
1254 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1255 return -EINVAL;
1256 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1257 return -EINVAL;
1258
1259 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1260 if (r < 0)
1261 return r;
1262
1263 if (usec != (uint64_t) -1) {
1264 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1265 if (r < 0)
1266 return r;
1267 }
1268
1269 r = bus_seal_message(bus, m);
1270 if (r < 0)
1271 return r;
1272
1273 c = new(struct reply_callback, 1);
1274 if (!c)
1275 return -ENOMEM;
1276
1277 c->callback = callback;
1278 c->userdata = userdata;
1279 c->serial = BUS_MESSAGE_SERIAL(m);
1280 c->timeout = calc_elapse(usec);
1281
1282 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1283 if (r < 0) {
1284 free(c);
1285 return r;
1286 }
1287
1288 if (c->timeout != 0) {
1289 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1290 if (r < 0) {
1291 c->timeout = 0;
1292 sd_bus_send_with_reply_cancel(bus, c->serial);
1293 return r;
1294 }
1295 }
1296
1297 r = sd_bus_send(bus, m, serial);
1298 if (r < 0) {
1299 sd_bus_send_with_reply_cancel(bus, c->serial);
1300 return r;
1301 }
1302
1303 return r;
1304 }
1305
1306 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1307 struct reply_callback *c;
1308
1309 if (!bus)
1310 return -EINVAL;
1311 if (serial == 0)
1312 return -EINVAL;
1313
1314 c = hashmap_remove(bus->reply_callbacks, &serial);
1315 if (!c)
1316 return 0;
1317
1318 if (c->timeout != 0)
1319 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1320
1321 free(c);
1322 return 1;
1323 }
1324
1325 int bus_ensure_running(sd_bus *bus) {
1326 int r;
1327
1328 assert(bus);
1329
1330 if (bus->state == BUS_RUNNING)
1331 return 1;
1332
1333 for (;;) {
1334 r = sd_bus_process(bus, NULL);
1335 if (r < 0)
1336 return r;
1337 if (bus->state == BUS_RUNNING)
1338 return 1;
1339 if (r > 0)
1340 continue;
1341
1342 r = sd_bus_wait(bus, (uint64_t) -1);
1343 if (r < 0)
1344 return r;
1345 }
1346 }
1347
1348 int sd_bus_send_with_reply_and_block(
1349 sd_bus *bus,
1350 sd_bus_message *m,
1351 uint64_t usec,
1352 sd_bus_error *error,
1353 sd_bus_message **reply) {
1354
1355 int r;
1356 usec_t timeout;
1357 uint64_t serial;
1358 bool room = false;
1359
1360 if (!bus)
1361 return -EINVAL;
1362 if (bus->fd < 0)
1363 return -ENOTCONN;
1364 if (!m)
1365 return -EINVAL;
1366 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1367 return -EINVAL;
1368 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1369 return -EINVAL;
1370 if (bus_error_is_dirty(error))
1371 return -EINVAL;
1372
1373 r = bus_ensure_running(bus);
1374 if (r < 0)
1375 return r;
1376
1377 r = sd_bus_send(bus, m, &serial);
1378 if (r < 0)
1379 return r;
1380
1381 timeout = calc_elapse(usec);
1382
1383 for (;;) {
1384 usec_t left;
1385 sd_bus_message *incoming = NULL;
1386
1387 if (!room) {
1388 sd_bus_message **q;
1389
1390 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1391 return -ENOBUFS;
1392
1393 /* Make sure there's room for queuing this
1394 * locally, before we read the message */
1395
1396 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1397 if (!q)
1398 return -ENOMEM;
1399
1400 bus->rqueue = q;
1401 room = true;
1402 }
1403
1404 r = message_read(bus, &incoming);
1405 if (r < 0)
1406 return r;
1407 if (incoming) {
1408
1409 if (incoming->reply_serial == serial) {
1410 /* Found a match! */
1411
1412 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1413 *reply = incoming;
1414 return 0;
1415 }
1416
1417 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1418 int k;
1419
1420 r = sd_bus_error_copy(error, &incoming->error);
1421 if (r < 0) {
1422 sd_bus_message_unref(incoming);
1423 return r;
1424 }
1425
1426 k = bus_error_to_errno(&incoming->error);
1427 sd_bus_message_unref(incoming);
1428 return k;
1429 }
1430
1431 sd_bus_message_unref(incoming);
1432 return -EIO;
1433 }
1434
1435 /* There's already guaranteed to be room for
1436 * this, so need to resize things here */
1437 bus->rqueue[bus->rqueue_size ++] = incoming;
1438 room = false;
1439
1440 /* Try to read more, right-away */
1441 continue;
1442 }
1443 if (r != 0)
1444 continue;
1445
1446 if (timeout > 0) {
1447 usec_t n;
1448
1449 n = now(CLOCK_MONOTONIC);
1450 if (n >= timeout)
1451 return -ETIMEDOUT;
1452
1453 left = timeout - n;
1454 } else
1455 left = (uint64_t) -1;
1456
1457 r = bus_poll(bus, true, left);
1458 if (r < 0)
1459 return r;
1460
1461 r = dispatch_wqueue(bus);
1462 if (r < 0)
1463 return r;
1464 }
1465 }
1466
1467 int sd_bus_get_fd(sd_bus *bus) {
1468 if (!bus)
1469 return -EINVAL;
1470
1471 if (bus->fd < 0)
1472 return -ENOTCONN;
1473
1474 return bus->fd;
1475 }
1476
1477 int sd_bus_get_events(sd_bus *bus) {
1478 int flags = 0;
1479
1480 if (!bus)
1481 return -EINVAL;
1482 if (bus->fd < 0)
1483 return -ENOTCONN;
1484
1485 if (bus->state == BUS_OPENING)
1486 flags |= POLLOUT;
1487 else if (bus->state == BUS_AUTHENTICATING) {
1488
1489 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1490 flags |= POLLOUT;
1491
1492 flags |= POLLIN;
1493
1494 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1495 if (bus->rqueue_size <= 0)
1496 flags |= POLLIN;
1497 if (bus->wqueue_size > 0)
1498 flags |= POLLOUT;
1499 }
1500
1501 return flags;
1502 }
1503
1504 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1505 struct reply_callback *c;
1506
1507 if (!bus)
1508 return -EINVAL;
1509 if (!timeout_usec)
1510 return -EINVAL;
1511 if (bus->fd < 0)
1512 return -ENOTCONN;
1513
1514 if (bus->state == BUS_AUTHENTICATING) {
1515 *timeout_usec = bus->auth_timeout;
1516 return 1;
1517 }
1518
1519 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1520 return 0;
1521
1522 c = prioq_peek(bus->reply_callbacks_prioq);
1523 if (!c)
1524 return 0;
1525
1526 *timeout_usec = c->timeout;
1527 return 1;
1528 }
1529
1530 static int process_timeout(sd_bus *bus) {
1531 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1532 struct reply_callback *c;
1533 usec_t n;
1534 int r;
1535
1536 assert(bus);
1537
1538 c = prioq_peek(bus->reply_callbacks_prioq);
1539 if (!c)
1540 return 0;
1541
1542 n = now(CLOCK_MONOTONIC);
1543 if (c->timeout > n)
1544 return 0;
1545
1546 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1547 hashmap_remove(bus->reply_callbacks, &c->serial);
1548
1549 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1550 free(c);
1551
1552 return r < 0 ? r : 1;
1553 }
1554
1555 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1556 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1557 int r;
1558
1559 assert(bus);
1560 assert(m);
1561
1562 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1563 return 0;
1564
1565 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1566 return 0;
1567
1568 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1569 return 1;
1570
1571 if (streq_ptr(m->member, "Ping"))
1572 r = sd_bus_message_new_method_return(bus, m, &reply);
1573 else if (streq_ptr(m->member, "GetMachineId")) {
1574 sd_id128_t id;
1575 char sid[33];
1576
1577 r = sd_id128_get_machine(&id);
1578 if (r < 0)
1579 return r;
1580
1581 r = sd_bus_message_new_method_return(bus, m, &reply);
1582 if (r < 0)
1583 return r;
1584
1585 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1586 } else {
1587 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1588
1589 sd_bus_error_set(&error,
1590 "org.freedesktop.DBus.Error.UnknownMethod",
1591 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1592
1593 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1594 }
1595
1596 if (r < 0)
1597 return r;
1598
1599 r = sd_bus_send(bus, reply, NULL);
1600 if (r < 0)
1601 return r;
1602
1603 return 1;
1604 }
1605
1606 static int process_message(sd_bus *bus, sd_bus_message *m) {
1607 struct filter_callback *l;
1608 int r;
1609
1610 assert(bus);
1611 assert(m);
1612
1613 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1614 struct reply_callback *c;
1615
1616 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1617 if (c) {
1618 if (c->timeout != 0)
1619 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1620
1621 r = c->callback(bus, 0, m, c->userdata);
1622 free(c);
1623
1624 if (r != 0)
1625 return r;
1626 }
1627 }
1628
1629 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1630 r = l->callback(bus, 0, m, l->userdata);
1631 if (r != 0)
1632 return r;
1633 }
1634
1635 return process_builtin(bus, m);
1636 }
1637
1638 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1639 int r;
1640
1641 /* Returns 0 when we didn't do anything. This should cause the
1642 * caller to invoke sd_bus_wait() before returning the next
1643 * time. Returns > 0 when we did something, which possibly
1644 * means *ret is filled in with an unprocessed message. */
1645
1646 if (!bus)
1647 return -EINVAL;
1648 if (bus->fd < 0)
1649 return -ENOTCONN;
1650
1651 if (bus->state == BUS_OPENING) {
1652 struct pollfd p;
1653
1654 zero(p);
1655 p.fd = bus->fd;
1656 p.events = POLLOUT;
1657
1658 r = poll(&p, 1, 0);
1659 if (r < 0)
1660 return -errno;
1661
1662 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1663 int error = 0;
1664 socklen_t slen = sizeof(error);
1665
1666 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1667 if (r < 0)
1668 bus->last_connect_error = errno;
1669 else if (error != 0)
1670 bus->last_connect_error = error;
1671 else if (p.revents & (POLLERR|POLLHUP))
1672 bus->last_connect_error = ECONNREFUSED;
1673 else {
1674 r = bus_start_auth(bus);
1675 goto null_message;
1676 }
1677
1678 /* Try next address */
1679 r = bus_start_connect(bus);
1680 goto null_message;
1681 }
1682
1683 r = 0;
1684 goto null_message;
1685
1686 } else if (bus->state == BUS_AUTHENTICATING) {
1687
1688 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1689 return -ETIMEDOUT;
1690
1691 r = bus_write_auth(bus);
1692 if (r != 0)
1693 goto null_message;
1694
1695 r = bus_read_auth(bus);
1696 goto null_message;
1697
1698 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1699 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1700 int k;
1701
1702 r = process_timeout(bus);
1703 if (r != 0)
1704 goto null_message;
1705
1706 r = dispatch_wqueue(bus);
1707 if (r != 0)
1708 goto null_message;
1709
1710 k = r;
1711 r = dispatch_rqueue(bus, &m);
1712 if (r < 0)
1713 return r;
1714 if (!m) {
1715 if (r == 0)
1716 r = k;
1717 goto null_message;
1718 }
1719
1720 r = process_message(bus, m);
1721 if (r != 0)
1722 goto null_message;
1723
1724 if (ret) {
1725 *ret = m;
1726 m = NULL;
1727 return 1;
1728 }
1729
1730 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1731 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1732 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1733
1734 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1735
1736 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1737 if (r < 0)
1738 return r;
1739
1740 r = sd_bus_send(bus, reply, NULL);
1741 if (r < 0)
1742 return r;
1743 }
1744
1745 return 1;
1746 }
1747
1748 assert_not_reached("Unknown state");
1749
1750 null_message:
1751 if (r >= 0 && ret)
1752 *ret = NULL;
1753
1754 return r;
1755 }
1756
1757 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1758 struct pollfd p;
1759 int r, e;
1760 struct timespec ts;
1761 usec_t until, m;
1762
1763 assert(bus);
1764
1765 if (bus->fd < 0)
1766 return -ENOTCONN;
1767
1768 e = sd_bus_get_events(bus);
1769 if (e < 0)
1770 return e;
1771
1772 if (need_more)
1773 e |= POLLIN;
1774
1775 r = sd_bus_get_timeout(bus, &until);
1776 if (r < 0)
1777 return r;
1778 if (r == 0)
1779 m = (uint64_t) -1;
1780 else {
1781 usec_t n;
1782 n = now(CLOCK_MONOTONIC);
1783 m = until > n ? until - n : 0;
1784 }
1785
1786 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1787 m = timeout_usec;
1788
1789 zero(p);
1790 p.fd = bus->fd;
1791 p.events = e;
1792
1793 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1794 if (r < 0)
1795 return -errno;
1796
1797 return r > 0 ? 1 : 0;
1798 }
1799
1800 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1801
1802 if (!bus)
1803 return -EINVAL;
1804 if (bus->fd < 0)
1805 return -ENOTCONN;
1806 if (bus->rqueue_size > 0)
1807 return 0;
1808
1809 return bus_poll(bus, false, timeout_usec);
1810 }
1811
1812 int sd_bus_flush(sd_bus *bus) {
1813 int r;
1814
1815 if (!bus)
1816 return -EINVAL;
1817 if (bus->fd < 0)
1818 return -ENOTCONN;
1819
1820 r = bus_ensure_running(bus);
1821 if (r < 0)
1822 return r;
1823
1824 if (bus->wqueue_size <= 0)
1825 return 0;
1826
1827 for (;;) {
1828 r = dispatch_wqueue(bus);
1829 if (r < 0)
1830 return r;
1831
1832 if (bus->wqueue_size <= 0)
1833 return 0;
1834
1835 r = bus_poll(bus, false, (uint64_t) -1);
1836 if (r < 0)
1837 return r;
1838 }
1839 }
1840
1841 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1842 struct filter_callback *f;
1843
1844 if (!bus)
1845 return -EINVAL;
1846 if (!callback)
1847 return -EINVAL;
1848
1849 f = new(struct filter_callback, 1);
1850 if (!f)
1851 return -ENOMEM;
1852 f->callback = callback;
1853 f->userdata = userdata;
1854
1855 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1856 return 0;
1857 }
1858
1859 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1860 struct filter_callback *f;
1861
1862 if (!bus)
1863 return -EINVAL;
1864 if (!callback)
1865 return -EINVAL;
1866
1867 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1868 if (f->callback == callback && f->userdata == userdata) {
1869 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1870 free(f);
1871 return 1;
1872 }
1873 }
1874
1875 return 0;
1876 }