]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/libsystemd-bus/sd-bus.c
bus: consider it an error if the first message we get on the bus is not a reply to...
[thirdparty/systemd.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42
43 static void bus_free(sd_bus *b) {
44 struct filter_callback *f;
45 struct object_callback *c;
46 unsigned i;
47
48 assert(b);
49
50 if (b->fd >= 0)
51 close_nointr_nofail(b->fd);
52
53 free(b->rbuffer);
54 free(b->unique_name);
55 free(b->auth_uid);
56 free(b->address);
57
58 free(b->exec_path);
59 strv_free(b->exec_argv);
60
61 close_many(b->fds, b->n_fds);
62 free(b->fds);
63
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
66 free(b->rqueue);
67
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
70 free(b->wqueue);
71
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
74
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77 free(f);
78 }
79
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
81 free(c->path);
82 free(c);
83 }
84
85 hashmap_free(b->object_callbacks);
86
87 free(b);
88 }
89
90 int sd_bus_new(sd_bus **ret) {
91 sd_bus *r;
92
93 if (!ret)
94 return -EINVAL;
95
96 r = new0(sd_bus, 1);
97 if (!r)
98 return -ENOMEM;
99
100 r->n_ref = 1;
101 r->fd = -1;
102 r->message_version = 1;
103 r->negotiate_fds = true;
104
105 /* We guarantee that wqueue always has space for at least one
106 * entry */
107 r->wqueue = new(sd_bus_message*, 1);
108 if (!r->wqueue) {
109 free(r);
110 return -ENOMEM;
111 }
112
113 *ret = r;
114 return 0;
115 }
116
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
118 char *a;
119
120 if (!bus)
121 return -EINVAL;
122 if (bus->state != BUS_UNSET)
123 return -EPERM;
124 if (!address)
125 return -EINVAL;
126
127 a = strdup(address);
128 if (!a)
129 return -ENOMEM;
130
131 free(bus->address);
132 bus->address = a;
133
134 return 0;
135 }
136
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
138 if (!bus)
139 return -EINVAL;
140 if (bus->state != BUS_UNSET)
141 return -EPERM;
142 if (fd < 0)
143 return -EINVAL;
144
145 bus->fd = fd;
146 return 0;
147 }
148
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
150 char *p, **a;
151
152 if (!bus)
153 return -EINVAL;
154 if (bus->state != BUS_UNSET)
155 return -EPERM;
156 if (!path)
157 return -EINVAL;
158 if (strv_isempty(argv))
159 return -EINVAL;
160
161 p = strdup(path);
162 if (!p)
163 return -ENOMEM;
164
165 a = strv_copy(argv);
166 if (!a) {
167 free(p);
168 return -ENOMEM;
169 }
170
171 free(bus->exec_path);
172 strv_free(bus->exec_argv);
173
174 bus->exec_path = p;
175 bus->exec_argv = a;
176
177 return 0;
178 }
179
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
181 if (!bus)
182 return -EINVAL;
183 if (bus->state != BUS_UNSET)
184 return -EPERM;
185
186 bus->bus_client = !!b;
187 return 0;
188 }
189
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
191 if (!bus)
192 return -EINVAL;
193 if (bus->state != BUS_UNSET)
194 return -EPERM;
195
196 bus->negotiate_fds = !!b;
197 return 0;
198 }
199
200 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
201 const char *s;
202 int r;
203
204 assert(bus);
205 assert(bus->state == BUS_HELLO);
206
207 if (error != 0)
208 return -error;
209
210 assert(reply);
211
212 r = sd_bus_message_read(reply, "s", &s);
213 if (r < 0)
214 return r;
215
216 if (!service_name_is_valid(s) || s[0] != ':')
217 return -EBADMSG;
218
219 bus->unique_name = strdup(s);
220 if (!bus->unique_name)
221 return -ENOMEM;
222
223 bus->state = BUS_RUNNING;
224
225 return 1;
226 }
227
228 static int bus_send_hello(sd_bus *bus) {
229 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
230 int r;
231
232 assert(bus);
233
234 if (!bus->bus_client)
235 return 0;
236
237 r = sd_bus_message_new_method_call(
238 bus,
239 "org.freedesktop.DBus",
240 "/",
241 "org.freedesktop.DBus",
242 "Hello",
243 &m);
244 if (r < 0)
245 return r;
246
247 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
248 }
249
250 int bus_start_running(sd_bus *bus) {
251 assert(bus);
252
253 if (bus->bus_client) {
254 bus->state = BUS_HELLO;
255 return 1;
256 }
257
258 bus->state = BUS_RUNNING;
259 return 1;
260 }
261
262 static int parse_address_key(const char **p, const char *key, char **value) {
263 size_t l, n = 0;
264 const char *a;
265 char *r = NULL;
266
267 assert(p);
268 assert(*p);
269 assert(value);
270
271 if (key) {
272 l = strlen(key);
273 if (strncmp(*p, key, l) != 0)
274 return 0;
275
276 if ((*p)[l] != '=')
277 return 0;
278
279 if (*value)
280 return -EINVAL;
281
282 a = *p + l + 1;
283 } else
284 a = *p;
285
286 while (*a != ';' && *a != ',' && *a != 0) {
287 char c, *t;
288
289 if (*a == '%') {
290 int x, y;
291
292 x = unhexchar(a[1]);
293 if (x < 0) {
294 free(r);
295 return x;
296 }
297
298 y = unhexchar(a[2]);
299 if (y < 0) {
300 free(r);
301 return y;
302 }
303
304 c = (char) ((x << 4) | y);
305 a += 3;
306 } else {
307 c = *a;
308 a++;
309 }
310
311 t = realloc(r, n + 2);
312 if (!t) {
313 free(r);
314 return -ENOMEM;
315 }
316
317 r = t;
318 r[n++] = c;
319 }
320
321 if (!r) {
322 r = strdup("");
323 if (!r)
324 return -ENOMEM;
325 } else
326 r[n] = 0;
327
328 if (*a == ',')
329 a++;
330
331 *p = a;
332
333 free(*value);
334 *value = r;
335
336 return 1;
337 }
338
339 static void skip_address_key(const char **p) {
340 assert(p);
341 assert(*p);
342
343 *p += strcspn(*p, ",");
344
345 if (**p == ',')
346 (*p) ++;
347 }
348
349 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
350 _cleanup_free_ char *path = NULL, *abstract = NULL;
351 size_t l;
352 int r;
353
354 assert(b);
355 assert(p);
356 assert(*p);
357 assert(guid);
358
359 while (**p != 0 && **p != ';') {
360 r = parse_address_key(p, "guid", guid);
361 if (r < 0)
362 return r;
363 else if (r > 0)
364 continue;
365
366 r = parse_address_key(p, "path", &path);
367 if (r < 0)
368 return r;
369 else if (r > 0)
370 continue;
371
372 r = parse_address_key(p, "abstract", &abstract);
373 if (r < 0)
374 return r;
375 else if (r > 0)
376 continue;
377
378 skip_address_key(p);
379 }
380
381 if (!path && !abstract)
382 return -EINVAL;
383
384 if (path && abstract)
385 return -EINVAL;
386
387 if (path) {
388 l = strlen(path);
389 if (l > sizeof(b->sockaddr.un.sun_path))
390 return -E2BIG;
391
392 b->sockaddr.un.sun_family = AF_UNIX;
393 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
394 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
395 } else if (abstract) {
396 l = strlen(abstract);
397 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
398 return -E2BIG;
399
400 b->sockaddr.un.sun_family = AF_UNIX;
401 b->sockaddr.un.sun_path[0] = 0;
402 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
403 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
404 }
405
406 return 0;
407 }
408
409 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
410 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
411 struct addrinfo hints, *result;
412 int r;
413
414 assert(b);
415 assert(p);
416 assert(*p);
417 assert(guid);
418
419 while (**p != 0 && **p != ';') {
420 r = parse_address_key(p, "guid", guid);
421 if (r < 0)
422 return r;
423 else if (r > 0)
424 continue;
425
426 r = parse_address_key(p, "host", &host);
427 if (r < 0)
428 return r;
429 else if (r > 0)
430 continue;
431
432 r = parse_address_key(p, "port", &port);
433 if (r < 0)
434 return r;
435 else if (r > 0)
436 continue;
437
438 r = parse_address_key(p, "family", &family);
439 if (r < 0)
440 return r;
441 else if (r > 0)
442 continue;
443
444 skip_address_key(p);
445 }
446
447 if (!host || !port)
448 return -EINVAL;
449
450 zero(hints);
451 hints.ai_socktype = SOCK_STREAM;
452 hints.ai_flags = AI_ADDRCONFIG;
453
454 if (family) {
455 if (streq(family, "ipv4"))
456 hints.ai_family = AF_INET;
457 else if (streq(family, "ipv6"))
458 hints.ai_family = AF_INET6;
459 else
460 return -EINVAL;
461 }
462
463 r = getaddrinfo(host, port, &hints, &result);
464 if (r == EAI_SYSTEM)
465 return -errno;
466 else if (r != 0)
467 return -EADDRNOTAVAIL;
468
469 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
470 b->sockaddr_size = result->ai_addrlen;
471
472 freeaddrinfo(result);
473
474 return 0;
475 }
476
477 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
478 char *path = NULL;
479 unsigned n_argv = 0, j;
480 char **argv = NULL;
481 int r;
482
483 assert(b);
484 assert(p);
485 assert(*p);
486 assert(guid);
487
488 while (**p != 0 && **p != ';') {
489 r = parse_address_key(p, "guid", guid);
490 if (r < 0)
491 goto fail;
492 else if (r > 0)
493 continue;
494
495 r = parse_address_key(p, "path", &path);
496 if (r < 0)
497 goto fail;
498 else if (r > 0)
499 continue;
500
501 if (startswith(*p, "argv")) {
502 unsigned ul;
503
504 errno = 0;
505 ul = strtoul(*p + 4, (char**) p, 10);
506 if (errno > 0 || **p != '=' || ul > 256) {
507 r = -EINVAL;
508 goto fail;
509 }
510
511 (*p) ++;
512
513 if (ul >= n_argv) {
514 char **x;
515
516 x = realloc(argv, sizeof(char*) * (ul + 2));
517 if (!x) {
518 r = -ENOMEM;
519 goto fail;
520 }
521
522 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
523
524 argv = x;
525 n_argv = ul + 1;
526 }
527
528 r = parse_address_key(p, NULL, argv + ul);
529 if (r < 0)
530 goto fail;
531
532 continue;
533 }
534
535 skip_address_key(p);
536 }
537
538 if (!path) {
539 r = -EINVAL;
540 goto fail;
541 }
542
543 /* Make sure there are no holes in the array, with the
544 * exception of argv[0] */
545 for (j = 1; j < n_argv; j++)
546 if (!argv[j]) {
547 r = -EINVAL;
548 goto fail;
549 }
550
551 if (argv && argv[0] == NULL) {
552 argv[0] = strdup(path);
553 if (!argv[0]) {
554 r = -ENOMEM;
555 goto fail;
556 }
557 }
558
559 b->exec_path = path;
560 b->exec_argv = argv;
561 return 0;
562
563 fail:
564 for (j = 0; j < n_argv; j++)
565 free(argv[j]);
566
567 free(argv);
568 free(path);
569 return r;
570 }
571
572 static void bus_reset_parsed_address(sd_bus *b) {
573 assert(b);
574
575 zero(b->sockaddr);
576 b->sockaddr_size = 0;
577 strv_free(b->exec_argv);
578 free(b->exec_path);
579 b->exec_path = NULL;
580 b->exec_argv = NULL;
581 b->peer = SD_ID128_NULL;
582 }
583
584 static int bus_parse_next_address(sd_bus *b) {
585 _cleanup_free_ char *guid = NULL;
586 const char *a;
587 int r;
588
589 assert(b);
590
591 if (!b->address)
592 return 0;
593 if (b->address[b->address_index] == 0)
594 return 0;
595
596 bus_reset_parsed_address(b);
597
598 a = b->address + b->address_index;
599
600 while (*a != 0) {
601
602 if (*a == ';') {
603 a++;
604 continue;
605 }
606
607 if (startswith(a, "unix:")) {
608 a += 5;
609
610 r = parse_unix_address(b, &a, &guid);
611 if (r < 0)
612 return r;
613 break;
614
615 } else if (startswith(a, "tcp:")) {
616
617 a += 4;
618 r = parse_tcp_address(b, &a, &guid);
619 if (r < 0)
620 return r;
621
622 break;
623
624 } else if (startswith(a, "unixexec:")) {
625
626 a += 9;
627 r = parse_exec_address(b, &a, &guid);
628 if (r < 0)
629 return r;
630
631 break;
632
633 }
634
635 a = strchr(a, ';');
636 if (!a)
637 return 0;
638 }
639
640 if (guid) {
641 r = sd_id128_from_string(guid, &b->peer);
642 if (r < 0)
643 return r;
644 }
645
646 b->address_index = a - b->address;
647 return 1;
648 }
649
650 static int bus_start_address(sd_bus *b) {
651 int r;
652
653 assert(b);
654
655 for (;;) {
656 if (b->fd >= 0) {
657 close_nointr_nofail(b->fd);
658 b->fd = -1;
659 }
660
661 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
662
663 r = bus_socket_connect(b);
664 if (r >= 0)
665 return r;
666
667 b->last_connect_error = -r;
668
669 } else if (b->exec_path) {
670
671 r = bus_socket_exec(b);
672 if (r >= 0)
673 return r;
674
675 b->last_connect_error = -r;
676 }
677
678 r = bus_parse_next_address(b);
679 if (r < 0)
680 return r;
681 if (r == 0)
682 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
683 }
684 }
685
686 int bus_next_address(sd_bus *b) {
687 assert(b);
688
689 bus_reset_parsed_address(b);
690 return bus_start_address(b);
691 }
692
693 static int bus_start_fd(sd_bus *b) {
694 int r;
695
696 assert(b);
697
698 r = fd_nonblock(b->fd, true);
699 if (r < 0)
700 return r;
701
702 r = fd_cloexec(b->fd, true);
703 if (r < 0)
704 return r;
705
706 return bus_socket_take_fd(b);
707 }
708
709 int sd_bus_start(sd_bus *bus) {
710 int r;
711
712 if (!bus)
713 return -EINVAL;
714 if (bus->state != BUS_UNSET)
715 return -EPERM;
716
717 bus->state = BUS_OPENING;
718
719 if (bus->fd >= 0)
720 r = bus_start_fd(bus);
721 else if (bus->address)
722 r = bus_start_address(bus);
723 else
724 return -EINVAL;
725
726 if (r < 0)
727 return r;
728
729 return bus_send_hello(bus);
730 }
731
732 int sd_bus_open_system(sd_bus **ret) {
733 const char *e;
734 sd_bus *b;
735 int r;
736
737 if (!ret)
738 return -EINVAL;
739
740 r = sd_bus_new(&b);
741 if (r < 0)
742 return r;
743
744 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
745 if (e) {
746 r = sd_bus_set_address(b, e);
747 if (r < 0)
748 goto fail;
749 } else {
750 b->sockaddr.un.sun_family = AF_UNIX;
751 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
752 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
753 }
754
755 b->bus_client = true;
756
757 r = sd_bus_start(b);
758 if (r < 0)
759 goto fail;
760
761 *ret = b;
762 return 0;
763
764 fail:
765 bus_free(b);
766 return r;
767 }
768
769 int sd_bus_open_user(sd_bus **ret) {
770 const char *e;
771 sd_bus *b;
772 size_t l;
773 int r;
774
775 if (!ret)
776 return -EINVAL;
777
778 r = sd_bus_new(&b);
779 if (r < 0)
780 return r;
781
782 e = getenv("DBUS_SESSION_BUS_ADDRESS");
783 if (e) {
784 r = sd_bus_set_address(b, e);
785 if (r < 0)
786 goto fail;
787 } else {
788 e = getenv("XDG_RUNTIME_DIR");
789 if (!e) {
790 r = -ENOENT;
791 goto fail;
792 }
793
794 l = strlen(e);
795 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
796 r = -E2BIG;
797 goto fail;
798 }
799
800 b->sockaddr.un.sun_family = AF_UNIX;
801 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
802 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
803 }
804
805 b->bus_client = true;
806
807 r = sd_bus_start(b);
808 if (r < 0)
809 goto fail;
810
811 *ret = b;
812 return 0;
813
814 fail:
815 bus_free(b);
816 return r;
817 }
818
819 void sd_bus_close(sd_bus *bus) {
820 if (!bus)
821 return;
822 if (bus->fd < 0)
823 return;
824
825 close_nointr_nofail(bus->fd);
826 bus->fd = -1;
827 }
828
829 sd_bus *sd_bus_ref(sd_bus *bus) {
830 if (!bus)
831 return NULL;
832
833 assert(bus->n_ref > 0);
834
835 bus->n_ref++;
836 return bus;
837 }
838
839 sd_bus *sd_bus_unref(sd_bus *bus) {
840 if (!bus)
841 return NULL;
842
843 assert(bus->n_ref > 0);
844 bus->n_ref--;
845
846 if (bus->n_ref <= 0)
847 bus_free(bus);
848
849 return NULL;
850 }
851
852 int sd_bus_is_open(sd_bus *bus) {
853 if (!bus)
854 return -EINVAL;
855
856 return bus->state != BUS_UNSET && bus->fd >= 0;
857 }
858
859 int sd_bus_can_send(sd_bus *bus, char type) {
860 int r;
861
862 if (!bus)
863 return -EINVAL;
864 if (bus->fd < 0)
865 return -ENOTCONN;
866
867 if (type == SD_BUS_TYPE_UNIX_FD) {
868 if (!bus->negotiate_fds)
869 return 0;
870
871 r = bus_ensure_running(bus);
872 if (r < 0)
873 return r;
874
875 return bus->can_fds;
876 }
877
878 return bus_type_is_valid(type);
879 }
880
881 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
882 int r;
883
884 if (!bus)
885 return -EINVAL;
886 if (!peer)
887 return -EINVAL;
888
889 r = bus_ensure_running(bus);
890 if (r < 0)
891 return r;
892
893 *peer = bus->peer;
894 return 0;
895 }
896
897 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
898 assert(m);
899
900 if (m->header->version > b->message_version)
901 return -EPERM;
902
903 if (m->sealed)
904 return 0;
905
906 return bus_message_seal(m, ++b->serial);
907 }
908
909 static int dispatch_wqueue(sd_bus *bus) {
910 int r, ret = 0;
911
912 assert(bus);
913 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
914
915 if (bus->fd < 0)
916 return -ENOTCONN;
917
918 while (bus->wqueue_size > 0) {
919
920 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
921 if (r < 0) {
922 sd_bus_close(bus);
923 return r;
924 } else if (r == 0)
925 /* Didn't do anything this time */
926 return ret;
927 else if (bus->windex >= bus->wqueue[0]->size) {
928 /* Fully written. Let's drop the entry from
929 * the queue.
930 *
931 * This isn't particularly optimized, but
932 * well, this is supposed to be our worst-case
933 * buffer only, and the socket buffer is
934 * supposed to be our primary buffer, and if
935 * it got full, then all bets are off
936 * anyway. */
937
938 sd_bus_message_unref(bus->wqueue[0]);
939 bus->wqueue_size --;
940 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
941 bus->windex = 0;
942
943 ret = 1;
944 }
945 }
946
947 return ret;
948 }
949
950 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
951 sd_bus_message *z = NULL;
952 int r, ret = 0;
953
954 assert(bus);
955 assert(m);
956 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957
958 if (bus->fd < 0)
959 return -ENOTCONN;
960
961 if (bus->rqueue_size > 0) {
962 /* Dispatch a queued message */
963
964 *m = bus->rqueue[0];
965 bus->rqueue_size --;
966 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
967 return 1;
968 }
969
970 /* Try to read a new message */
971 do {
972 r = bus_socket_read_message(bus, &z);
973 if (r < 0) {
974 sd_bus_close(bus);
975 return r;
976 }
977 if (r == 0)
978 return ret;
979
980 r = 1;
981 } while (!z);
982
983 *m = z;
984 return 1;
985 }
986
987 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
988 int r;
989
990 if (!bus)
991 return -EINVAL;
992 if (bus->state == BUS_UNSET)
993 return -ENOTCONN;
994 if (bus->fd < 0)
995 return -ENOTCONN;
996 if (!m)
997 return -EINVAL;
998
999 if (m->n_fds > 0) {
1000 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1001 if (r < 0)
1002 return r;
1003 if (r == 0)
1004 return -ENOTSUP;
1005 }
1006
1007 /* If the serial number isn't kept, then we know that no reply
1008 * is expected */
1009 if (!serial && !m->sealed)
1010 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1011
1012 r = bus_seal_message(bus, m);
1013 if (r < 0)
1014 return r;
1015
1016 /* If this is a reply and no reply was requested, then let's
1017 * suppress this, if we can */
1018 if (m->dont_send && !serial)
1019 return 0;
1020
1021 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1022 size_t idx = 0;
1023
1024 r = bus_socket_write_message(bus, m, &idx);
1025 if (r < 0) {
1026 sd_bus_close(bus);
1027 return r;
1028 } else if (idx < m->size) {
1029 /* Wasn't fully written. So let's remember how
1030 * much was written. Note that the first entry
1031 * of the wqueue array is always allocated so
1032 * that we always can remember how much was
1033 * written. */
1034 bus->wqueue[0] = sd_bus_message_ref(m);
1035 bus->wqueue_size = 1;
1036 bus->windex = idx;
1037 }
1038 } else {
1039 sd_bus_message **q;
1040
1041 /* Just append it to the queue. */
1042
1043 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1044 return -ENOBUFS;
1045
1046 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1047 if (!q)
1048 return -ENOMEM;
1049
1050 bus->wqueue = q;
1051 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1052 }
1053
1054 if (serial)
1055 *serial = BUS_MESSAGE_SERIAL(m);
1056
1057 return 0;
1058 }
1059
1060 static usec_t calc_elapse(uint64_t usec) {
1061 if (usec == (uint64_t) -1)
1062 return 0;
1063
1064 if (usec == 0)
1065 usec = BUS_DEFAULT_TIMEOUT;
1066
1067 return now(CLOCK_MONOTONIC) + usec;
1068 }
1069
1070 static int timeout_compare(const void *a, const void *b) {
1071 const struct reply_callback *x = a, *y = b;
1072
1073 if (x->timeout != 0 && y->timeout == 0)
1074 return -1;
1075
1076 if (x->timeout == 0 && y->timeout != 0)
1077 return 1;
1078
1079 if (x->timeout < y->timeout)
1080 return -1;
1081
1082 if (x->timeout > y->timeout)
1083 return 1;
1084
1085 return 0;
1086 }
1087
1088 int sd_bus_send_with_reply(
1089 sd_bus *bus,
1090 sd_bus_message *m,
1091 sd_message_handler_t callback,
1092 void *userdata,
1093 uint64_t usec,
1094 uint64_t *serial) {
1095
1096 struct reply_callback *c;
1097 int r;
1098
1099 if (!bus)
1100 return -EINVAL;
1101 if (bus->state == BUS_UNSET)
1102 return -ENOTCONN;
1103 if (bus->fd < 0)
1104 return -ENOTCONN;
1105 if (!m)
1106 return -EINVAL;
1107 if (!callback)
1108 return -EINVAL;
1109 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1110 return -EINVAL;
1111 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1112 return -EINVAL;
1113
1114 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1115 if (r < 0)
1116 return r;
1117
1118 if (usec != (uint64_t) -1) {
1119 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1120 if (r < 0)
1121 return r;
1122 }
1123
1124 r = bus_seal_message(bus, m);
1125 if (r < 0)
1126 return r;
1127
1128 c = new(struct reply_callback, 1);
1129 if (!c)
1130 return -ENOMEM;
1131
1132 c->callback = callback;
1133 c->userdata = userdata;
1134 c->serial = BUS_MESSAGE_SERIAL(m);
1135 c->timeout = calc_elapse(usec);
1136
1137 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1138 if (r < 0) {
1139 free(c);
1140 return r;
1141 }
1142
1143 if (c->timeout != 0) {
1144 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1145 if (r < 0) {
1146 c->timeout = 0;
1147 sd_bus_send_with_reply_cancel(bus, c->serial);
1148 return r;
1149 }
1150 }
1151
1152 r = sd_bus_send(bus, m, serial);
1153 if (r < 0) {
1154 sd_bus_send_with_reply_cancel(bus, c->serial);
1155 return r;
1156 }
1157
1158 return r;
1159 }
1160
1161 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1162 struct reply_callback *c;
1163
1164 if (!bus)
1165 return -EINVAL;
1166 if (serial == 0)
1167 return -EINVAL;
1168
1169 c = hashmap_remove(bus->reply_callbacks, &serial);
1170 if (!c)
1171 return 0;
1172
1173 if (c->timeout != 0)
1174 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1175
1176 free(c);
1177 return 1;
1178 }
1179
1180 int bus_ensure_running(sd_bus *bus) {
1181 int r;
1182
1183 assert(bus);
1184
1185 if (bus->fd < 0)
1186 return -ENOTCONN;
1187 if (bus->state == BUS_UNSET)
1188 return -ENOTCONN;
1189
1190 if (bus->state == BUS_RUNNING)
1191 return 1;
1192
1193 for (;;) {
1194 r = sd_bus_process(bus, NULL);
1195 if (r < 0)
1196 return r;
1197 if (bus->state == BUS_RUNNING)
1198 return 1;
1199 if (r > 0)
1200 continue;
1201
1202 r = sd_bus_wait(bus, (uint64_t) -1);
1203 if (r < 0)
1204 return r;
1205 }
1206 }
1207
1208 int sd_bus_send_with_reply_and_block(
1209 sd_bus *bus,
1210 sd_bus_message *m,
1211 uint64_t usec,
1212 sd_bus_error *error,
1213 sd_bus_message **reply) {
1214
1215 int r;
1216 usec_t timeout;
1217 uint64_t serial;
1218 bool room = false;
1219
1220 if (!bus)
1221 return -EINVAL;
1222 if (bus->fd < 0)
1223 return -ENOTCONN;
1224 if (bus->state == BUS_UNSET)
1225 return -ENOTCONN;
1226 if (!m)
1227 return -EINVAL;
1228 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1229 return -EINVAL;
1230 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1231 return -EINVAL;
1232 if (bus_error_is_dirty(error))
1233 return -EINVAL;
1234
1235 r = bus_ensure_running(bus);
1236 if (r < 0)
1237 return r;
1238
1239 r = sd_bus_send(bus, m, &serial);
1240 if (r < 0)
1241 return r;
1242
1243 timeout = calc_elapse(usec);
1244
1245 for (;;) {
1246 usec_t left;
1247 sd_bus_message *incoming = NULL;
1248
1249 if (!room) {
1250 sd_bus_message **q;
1251
1252 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1253 return -ENOBUFS;
1254
1255 /* Make sure there's room for queuing this
1256 * locally, before we read the message */
1257
1258 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1259 if (!q)
1260 return -ENOMEM;
1261
1262 bus->rqueue = q;
1263 room = true;
1264 }
1265
1266 r = bus_socket_read_message(bus, &incoming);
1267 if (r < 0)
1268 return r;
1269 if (incoming) {
1270
1271 if (incoming->reply_serial == serial) {
1272 /* Found a match! */
1273
1274 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1275 *reply = incoming;
1276 return 0;
1277 }
1278
1279 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1280 int k;
1281
1282 r = sd_bus_error_copy(error, &incoming->error);
1283 if (r < 0) {
1284 sd_bus_message_unref(incoming);
1285 return r;
1286 }
1287
1288 k = bus_error_to_errno(&incoming->error);
1289 sd_bus_message_unref(incoming);
1290 return k;
1291 }
1292
1293 sd_bus_message_unref(incoming);
1294 return -EIO;
1295 }
1296
1297 /* There's already guaranteed to be room for
1298 * this, so need to resize things here */
1299 bus->rqueue[bus->rqueue_size ++] = incoming;
1300 room = false;
1301
1302 /* Try to read more, right-away */
1303 continue;
1304 }
1305 if (r != 0)
1306 continue;
1307
1308 if (timeout > 0) {
1309 usec_t n;
1310
1311 n = now(CLOCK_MONOTONIC);
1312 if (n >= timeout)
1313 return -ETIMEDOUT;
1314
1315 left = timeout - n;
1316 } else
1317 left = (uint64_t) -1;
1318
1319 r = bus_poll(bus, true, left);
1320 if (r < 0)
1321 return r;
1322
1323 r = dispatch_wqueue(bus);
1324 if (r < 0)
1325 return r;
1326 }
1327 }
1328
1329 int sd_bus_get_fd(sd_bus *bus) {
1330 if (!bus)
1331 return -EINVAL;
1332
1333 if (bus->fd < 0)
1334 return -ENOTCONN;
1335
1336 return bus->fd;
1337 }
1338
1339 int sd_bus_get_events(sd_bus *bus) {
1340 int flags = 0;
1341
1342 if (!bus)
1343 return -EINVAL;
1344 if (bus->state == BUS_UNSET)
1345 return -ENOTCONN;
1346 if (bus->fd < 0)
1347 return -ENOTCONN;
1348
1349 if (bus->state == BUS_OPENING)
1350 flags |= POLLOUT;
1351 else if (bus->state == BUS_AUTHENTICATING) {
1352
1353 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1354 flags |= POLLOUT;
1355
1356 flags |= POLLIN;
1357
1358 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1359 if (bus->rqueue_size <= 0)
1360 flags |= POLLIN;
1361 if (bus->wqueue_size > 0)
1362 flags |= POLLOUT;
1363 }
1364
1365 return flags;
1366 }
1367
1368 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1369 struct reply_callback *c;
1370
1371 if (!bus)
1372 return -EINVAL;
1373 if (!timeout_usec)
1374 return -EINVAL;
1375 if (bus->state == BUS_UNSET)
1376 return -ENOTCONN;
1377 if (bus->fd < 0)
1378 return -ENOTCONN;
1379
1380 if (bus->state == BUS_AUTHENTICATING) {
1381 *timeout_usec = bus->auth_timeout;
1382 return 1;
1383 }
1384
1385 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1386 return 0;
1387
1388 c = prioq_peek(bus->reply_callbacks_prioq);
1389 if (!c)
1390 return 0;
1391
1392 *timeout_usec = c->timeout;
1393 return 1;
1394 }
1395
1396 static int process_timeout(sd_bus *bus) {
1397 struct reply_callback *c;
1398 usec_t n;
1399 int r;
1400
1401 assert(bus);
1402
1403 c = prioq_peek(bus->reply_callbacks_prioq);
1404 if (!c)
1405 return 0;
1406
1407 n = now(CLOCK_MONOTONIC);
1408 if (c->timeout > n)
1409 return 0;
1410
1411 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1412 hashmap_remove(bus->reply_callbacks, &c->serial);
1413
1414 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1415 free(c);
1416
1417 return r < 0 ? r : 1;
1418 }
1419
1420 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1421 assert(bus);
1422 assert(m);
1423
1424 if (bus->state != BUS_HELLO)
1425 return 0;
1426
1427 /* Let's make sure the first message on the bus is the HELLO
1428 * reply. But note that we don't actually parse the message
1429 * here (we leave that to the usual reply handling), we just
1430 * verify we don't let any earlier msg through. */
1431
1432 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1433 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1434 return -EIO;
1435
1436 if (m->reply_serial != bus->hello_serial)
1437 return -EIO;
1438
1439 return 0;
1440 }
1441
1442 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1443 struct reply_callback *c;
1444 int r;
1445
1446 assert(bus);
1447 assert(m);
1448
1449 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1450 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1451 return 0;
1452
1453 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1454 if (!c)
1455 return 0;
1456
1457 if (c->timeout != 0)
1458 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1459
1460 r = c->callback(bus, 0, m, c->userdata);
1461 free(c);
1462
1463 return r;
1464 }
1465
1466 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1467 struct filter_callback *l;
1468 int r;
1469
1470 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1471 r = l->callback(bus, 0, m, l->userdata);
1472 if (r != 0)
1473 return r;
1474 }
1475
1476 return 0;
1477 }
1478
1479 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1480 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1481 int r;
1482
1483 assert(bus);
1484 assert(m);
1485
1486 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1487 return 0;
1488
1489 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1490 return 0;
1491
1492 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1493 return 1;
1494
1495 if (streq_ptr(m->member, "Ping"))
1496 r = sd_bus_message_new_method_return(bus, m, &reply);
1497 else if (streq_ptr(m->member, "GetMachineId")) {
1498 sd_id128_t id;
1499 char sid[33];
1500
1501 r = sd_id128_get_machine(&id);
1502 if (r < 0)
1503 return r;
1504
1505 r = sd_bus_message_new_method_return(bus, m, &reply);
1506 if (r < 0)
1507 return r;
1508
1509 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1510 } else {
1511 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1512
1513 sd_bus_error_set(&error,
1514 "org.freedesktop.DBus.Error.UnknownMethod",
1515 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1516
1517 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1518 }
1519
1520 if (r < 0)
1521 return r;
1522
1523 r = sd_bus_send(bus, reply, NULL);
1524 if (r < 0)
1525 return r;
1526
1527 return 1;
1528 }
1529
1530 static int process_object(sd_bus *bus, sd_bus_message *m) {
1531 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1532 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1533 struct object_callback *c;
1534 char *p;
1535 int r;
1536 bool found = false;
1537
1538 assert(bus);
1539 assert(m);
1540
1541 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1542 return 0;
1543
1544 if (hashmap_isempty(bus->object_callbacks))
1545 return 0;
1546
1547 c = hashmap_get(bus->object_callbacks, m->path);
1548 if (c) {
1549 r = c->callback(bus, 0, m, c->userdata);
1550 if (r != 0)
1551 return r;
1552
1553 found = true;
1554 }
1555
1556 /* Look for fallback prefixes */
1557 p = strdupa(m->path);
1558 for (;;) {
1559 char *e;
1560
1561 e = strrchr(p, '/');
1562 if (e == p || !e)
1563 break;
1564
1565 *e = 0;
1566
1567 c = hashmap_get(bus->object_callbacks, p);
1568 if (c && c->is_fallback) {
1569 r = c->callback(bus, 0, m, c->userdata);
1570 if (r != 0)
1571 return r;
1572
1573 found = true;
1574 }
1575 }
1576
1577 /* We found some handlers but none wanted to take this, then
1578 * return this -- with one exception, we can handle
1579 * introspection minimally ourselves */
1580 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1581 return 0;
1582
1583 sd_bus_error_set(&error,
1584 "org.freedesktop.DBus.Error.UnknownMethod",
1585 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1586
1587 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1588 if (r < 0)
1589 return r;
1590
1591 r = sd_bus_send(bus, reply, NULL);
1592 if (r < 0)
1593 return r;
1594
1595 return 1;
1596 }
1597
1598 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1599 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1600 _cleanup_free_ char *introspection = NULL;
1601 _cleanup_set_free_free_ Set *s = NULL;
1602 _cleanup_fclose_ FILE *f = NULL;
1603 struct object_callback *c;
1604 Iterator i;
1605 size_t size = 0;
1606 char *node;
1607 int r;
1608
1609 assert(bus);
1610 assert(m);
1611
1612 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1613 return 0;
1614
1615 if (!m->path)
1616 return 0;
1617
1618 s = set_new(string_hash_func, string_compare_func);
1619 if (!s)
1620 return -ENOMEM;
1621
1622 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1623 const char *e;
1624 char *a, *p;
1625
1626 if (streq(c->path, "/"))
1627 continue;
1628
1629 if (streq(m->path, "/"))
1630 e = c->path;
1631 else {
1632 e = startswith(c->path, m->path);
1633 if (!e || *e != '/')
1634 continue;
1635 }
1636
1637 a = strdup(e+1);
1638 if (!a)
1639 return -ENOMEM;
1640
1641 p = strchr(a, '/');
1642 if (p)
1643 *p = 0;
1644
1645 r = set_put(s, a);
1646 if (r < 0) {
1647 free(a);
1648
1649 if (r != -EEXIST)
1650 return r;
1651 }
1652 }
1653
1654 f = open_memstream(&introspection, &size);
1655 if (!f)
1656 return -ENOMEM;
1657
1658 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1659 fputs("<node>\n", f);
1660 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1661 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1662
1663 while ((node = set_steal_first(s))) {
1664 fprintf(f, " <node name=\"%s\"/>\n", node);
1665 free(node);
1666 }
1667
1668 fputs("</node>\n", f);
1669
1670 fflush(f);
1671
1672 if (ferror(f))
1673 return -ENOMEM;
1674
1675 r = sd_bus_message_new_method_return(bus, m, &reply);
1676 if (r < 0)
1677 return r;
1678
1679 r = sd_bus_message_append(reply, "s", introspection);
1680 if (r < 0)
1681 return r;
1682
1683 r = sd_bus_send(bus, reply, NULL);
1684 if (r < 0)
1685 return r;
1686
1687 return 1;
1688 }
1689
1690 static int process_message(sd_bus *bus, sd_bus_message *m) {
1691 int r;
1692
1693 assert(bus);
1694 assert(m);
1695
1696 r = process_hello(bus, m);
1697 if (r != 0)
1698 return r;
1699
1700 r = process_reply(bus, m);
1701 if (r != 0)
1702 return r;
1703
1704 r = process_filter(bus, m);
1705 if (r != 0)
1706 return r;
1707
1708 r = process_builtin(bus, m);
1709 if (r != 0)
1710 return r;
1711
1712 r = process_object(bus, m);
1713 if (r != 0)
1714 return r;
1715
1716 return process_introspect(bus, m);
1717 }
1718
1719 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1720 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1721 int r;
1722
1723 assert(bus);
1724 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1725
1726 r = process_timeout(bus);
1727 if (r != 0)
1728 goto null_message;
1729
1730 r = dispatch_wqueue(bus);
1731 if (r != 0)
1732 goto null_message;
1733
1734 r = dispatch_rqueue(bus, &m);
1735 if (r < 0)
1736 return r;
1737 if (!m)
1738 goto null_message;
1739
1740 r = process_message(bus, m);
1741 if (r != 0)
1742 goto null_message;
1743
1744 if (ret) {
1745 *ret = m;
1746 m = NULL;
1747 return 1;
1748 }
1749
1750 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1751 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1752 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1753
1754 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1755
1756 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1757 if (r < 0)
1758 return r;
1759
1760 r = sd_bus_send(bus, reply, NULL);
1761 if (r < 0)
1762 return r;
1763 }
1764
1765 return 1;
1766
1767 null_message:
1768 if (r >= 0 && ret)
1769 *ret = NULL;
1770
1771 return r;
1772 }
1773
1774 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1775 int r;
1776
1777 /* Returns 0 when we didn't do anything. This should cause the
1778 * caller to invoke sd_bus_wait() before returning the next
1779 * time. Returns > 0 when we did something, which possibly
1780 * means *ret is filled in with an unprocessed message. */
1781
1782 if (!bus)
1783 return -EINVAL;
1784 if (bus->fd < 0)
1785 return -ENOTCONN;
1786
1787 switch (bus->state) {
1788
1789 case BUS_UNSET:
1790 return -ENOTCONN;
1791
1792 case BUS_OPENING:
1793 r = bus_socket_process_opening(bus);
1794 if (r < 0)
1795 return r;
1796 if (ret)
1797 *ret = NULL;
1798 return r;
1799
1800 case BUS_AUTHENTICATING:
1801
1802 r = bus_socket_process_authenticating(bus);
1803 if (r < 0)
1804 return r;
1805 if (ret)
1806 *ret = NULL;
1807 return r;
1808
1809 case BUS_RUNNING:
1810 case BUS_HELLO:
1811
1812 return process_running(bus, ret);
1813 }
1814
1815 assert_not_reached("Unknown state");
1816 }
1817
1818 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1819 struct pollfd p;
1820 int r, e;
1821 struct timespec ts;
1822 usec_t until, m;
1823
1824 assert(bus);
1825
1826 if (bus->fd < 0)
1827 return -ENOTCONN;
1828
1829 e = sd_bus_get_events(bus);
1830 if (e < 0)
1831 return e;
1832
1833 if (need_more)
1834 e |= POLLIN;
1835
1836 r = sd_bus_get_timeout(bus, &until);
1837 if (r < 0)
1838 return r;
1839 if (r == 0)
1840 m = (uint64_t) -1;
1841 else {
1842 usec_t n;
1843 n = now(CLOCK_MONOTONIC);
1844 m = until > n ? until - n : 0;
1845 }
1846
1847 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1848 m = timeout_usec;
1849
1850 zero(p);
1851 p.fd = bus->fd;
1852 p.events = e;
1853
1854 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1855 if (r < 0)
1856 return -errno;
1857
1858 return r > 0 ? 1 : 0;
1859 }
1860
1861 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1862
1863 if (!bus)
1864 return -EINVAL;
1865 if (bus->state == BUS_UNSET)
1866 return -ENOTCONN;
1867 if (bus->fd < 0)
1868 return -ENOTCONN;
1869 if (bus->rqueue_size > 0)
1870 return 0;
1871
1872 return bus_poll(bus, false, timeout_usec);
1873 }
1874
1875 int sd_bus_flush(sd_bus *bus) {
1876 int r;
1877
1878 if (!bus)
1879 return -EINVAL;
1880 if (bus->state == BUS_UNSET)
1881 return -ENOTCONN;
1882 if (bus->fd < 0)
1883 return -ENOTCONN;
1884
1885 r = bus_ensure_running(bus);
1886 if (r < 0)
1887 return r;
1888
1889 if (bus->wqueue_size <= 0)
1890 return 0;
1891
1892 for (;;) {
1893 r = dispatch_wqueue(bus);
1894 if (r < 0)
1895 return r;
1896
1897 if (bus->wqueue_size <= 0)
1898 return 0;
1899
1900 r = bus_poll(bus, false, (uint64_t) -1);
1901 if (r < 0)
1902 return r;
1903 }
1904 }
1905
1906 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1907 struct filter_callback *f;
1908
1909 if (!bus)
1910 return -EINVAL;
1911 if (!callback)
1912 return -EINVAL;
1913
1914 f = new(struct filter_callback, 1);
1915 if (!f)
1916 return -ENOMEM;
1917 f->callback = callback;
1918 f->userdata = userdata;
1919
1920 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1921 return 0;
1922 }
1923
1924 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1925 struct filter_callback *f;
1926
1927 if (!bus)
1928 return -EINVAL;
1929 if (!callback)
1930 return -EINVAL;
1931
1932 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1933 if (f->callback == callback && f->userdata == userdata) {
1934 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1935 free(f);
1936 return 1;
1937 }
1938 }
1939
1940 return 0;
1941 }
1942
1943 static int bus_add_object(
1944 sd_bus *bus,
1945 bool fallback,
1946 const char *path,
1947 sd_message_handler_t callback,
1948 void *userdata) {
1949
1950 struct object_callback *c;
1951 int r;
1952
1953 if (!bus)
1954 return -EINVAL;
1955 if (!path)
1956 return -EINVAL;
1957 if (!callback)
1958 return -EINVAL;
1959
1960 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1961 if (r < 0)
1962 return r;
1963
1964 c = new(struct object_callback, 1);
1965 if (!c)
1966 return -ENOMEM;
1967
1968 c->path = strdup(path);
1969 if (!c->path) {
1970 free(c);
1971 return -ENOMEM;
1972 }
1973
1974 c->callback = callback;
1975 c->userdata = userdata;
1976 c->is_fallback = fallback;
1977
1978 r = hashmap_put(bus->object_callbacks, c->path, c);
1979 if (r < 0) {
1980 free(c->path);
1981 free(c);
1982 return r;
1983 }
1984
1985 return 0;
1986 }
1987
1988 static int bus_remove_object(
1989 sd_bus *bus,
1990 bool fallback,
1991 const char *path,
1992 sd_message_handler_t callback,
1993 void *userdata) {
1994
1995 struct object_callback *c;
1996
1997 if (!bus)
1998 return -EINVAL;
1999 if (!path)
2000 return -EINVAL;
2001 if (!callback)
2002 return -EINVAL;
2003
2004 c = hashmap_get(bus->object_callbacks, path);
2005 if (!c)
2006 return 0;
2007
2008 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2009 return 0;
2010
2011 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2012
2013 free(c->path);
2014 free(c);
2015
2016 return 1;
2017 }
2018
2019 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2020 return bus_add_object(bus, false, path, callback, userdata);
2021 }
2022
2023 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2024 return bus_remove_object(bus, false, path, callback, userdata);
2025 }
2026
2027 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2028 return bus_add_object(bus, true, prefix, callback, userdata);
2029 }
2030
2031 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2032 return bus_remove_object(bus, true, prefix, callback, userdata);
2033 }