]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/libsystemd-bus/sd-bus.c
bus: properly verify recursion depth of signatures
[thirdparty/systemd.git] / src / libsystemd-bus / sd-bus.c
CommitLineData
de1c301e
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include <endian.h>
23#include <assert.h>
24#include <stdlib.h>
25#include <unistd.h>
26#include <netdb.h>
27#include <sys/poll.h>
28#include <byteswap.h>
29
30#include "util.h"
31#include "macro.h"
32
33#include "sd-bus.h"
34#include "bus-internal.h"
35#include "bus-message.h"
36#include "bus-type.h"
37
d728d708 38static int ensure_running(sd_bus *bus);
e3017af9
LP
39static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
de1c301e
LP
41static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
89ffcd2a 43 unsigned i;
de1c301e
LP
44
45 assert(b);
46
47 if (b->fd >= 0)
48 close_nointr_nofail(b->fd);
49
50 free(b->rbuffer);
89ffcd2a
LP
51 free(b->unique_name);
52 free(b->auth_uid);
53 free(b->address);
54
55 for (i = 0; i < b->rqueue_size; i++)
56 sd_bus_message_unref(b->rqueue[i]);
de1c301e 57 free(b->rqueue);
89ffcd2a
LP
58
59 for (i = 0; i < b->wqueue_size; i++)
60 sd_bus_message_unref(b->wqueue[i]);
de1c301e 61 free(b->wqueue);
de1c301e
LP
62
63 hashmap_free_free(b->reply_callbacks);
e3017af9 64 prioq_free(b->reply_callbacks_prioq);
de1c301e
LP
65
66 while ((f = b->filter_callbacks)) {
67 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
68 free(f);
69 }
70
71 free(b);
72}
73
74static sd_bus* bus_new(void) {
75 sd_bus *r;
76
77 r = new0(sd_bus, 1);
78 if (!r)
79 return NULL;
80
81 r->n_ref = 1;
82 r->fd = -1;
83 r->message_version = 1;
84
85 /* We guarantee that wqueue always has space for at least one
86 * entry */
87 r->wqueue = new(sd_bus_message*, 1);
88 if (!r->wqueue) {
89 free(r);
90 return NULL;
91 }
92
93 return r;
94};
95
e3017af9 96static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
de1c301e
LP
97 const char *s;
98 int r;
99
100 assert(bus);
e3017af9
LP
101
102 if (error != 0)
103 return -error;
104
de1c301e
LP
105 assert(reply);
106
107 bus->state = BUS_RUNNING;
108
109 r = sd_bus_message_read(reply, "s", &s);
110 if (r < 0)
111 return r;
112
113 bus->unique_name = strdup(s);
114 if (!bus->unique_name)
115 return -ENOMEM;
116
117 return 1;
118}
119
120static int bus_send_hello(sd_bus *bus) {
121 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
122 int r;
123
124 assert(bus);
125
126 r = sd_bus_message_new_method_call(
127 bus,
128 "org.freedesktop.DBus",
129 "/",
130 "org.freedesktop.DBus",
131 "Hello",
132 &m);
133 if (r < 0)
134 return r;
135
e3017af9 136 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
de1c301e
LP
137 if (r < 0)
138 return r;
139
89ffcd2a
LP
140 bus->sent_hello = true;
141 return r;
de1c301e
LP
142}
143
144static int bus_start_running(sd_bus *bus) {
de1c301e
LP
145 assert(bus);
146
89ffcd2a 147 if (bus->sent_hello) {
de1c301e 148 bus->state = BUS_HELLO;
e3017af9 149 return 1;
de1c301e
LP
150 }
151
152 bus->state = BUS_RUNNING;
e3017af9 153 return 1;
de1c301e
LP
154}
155
156static int parse_address_key(const char **p, const char *key, char **value) {
157 size_t l, n = 0;
158 const char *a;
159 char *r = NULL;
160
161 assert(p);
162 assert(*p);
163 assert(key);
164 assert(value);
165
166 l = strlen(key);
89ffcd2a 167 if (strncmp(*p, key, l) != 0)
de1c301e
LP
168 return 0;
169
170 if ((*p)[l] != '=')
171 return 0;
172
173 if (*value)
174 return -EINVAL;
175
176 a = *p + l + 1;
89ffcd2a 177 while (*a != ',' && *a != 0) {
de1c301e
LP
178 char c, *t;
179
180 if (*a == '%') {
181 int x, y;
182
183 x = unhexchar(a[1]);
184 if (x < 0) {
185 free(r);
186 return x;
187 }
188
189 y = unhexchar(a[2]);
190 if (y < 0) {
191 free(r);
192 return y;
193 }
194
de1c301e 195 c = (char) ((x << 4) | y);
89ffcd2a
LP
196 a += 3;
197 } else {
de1c301e 198 c = *a;
89ffcd2a
LP
199 a++;
200 }
de1c301e 201
89ffcd2a 202 t = realloc(r, n + 2);
de1c301e
LP
203 if (!t) {
204 free(r);
205 return -ENOMEM;
206 }
207
208 r = t;
209 r[n++] = c;
210 }
211
89ffcd2a
LP
212 if (!r) {
213 r = strdup("");
214 if (!r)
215 return -ENOMEM;
216 } else
217 r[n] = 0;
218
219 if (*a == ',')
220 a++;
221
de1c301e
LP
222 *p = a;
223 *value = r;
224 return 1;
225}
226
227static void skip_address_key(const char **p) {
228 assert(p);
229 assert(*p);
230
89ffcd2a
LP
231 *p += strcspn(*p, ",");
232
233 if (**p == ',')
234 (*p) ++;
de1c301e
LP
235}
236
237static int bus_parse_next_address(sd_bus *b) {
238 const char *a, *p;
239 _cleanup_free_ char *guid = NULL;
240 int r;
241
242 assert(b);
243
244 if (!b->address)
245 return 0;
246 if (b->address[b->address_index] == 0)
247 return 0;
248
249 a = b->address + b->address_index;
250
251 zero(b->sockaddr);
252 b->sockaddr_size = 0;
253 b->peer = SD_ID128_NULL;
254
255 if (startswith(a, "unix:")) {
256 _cleanup_free_ char *path = NULL, *abstract = NULL;
257
258 p = a + 5;
89ffcd2a 259 while (*p != 0) {
de1c301e
LP
260 r = parse_address_key(&p, "guid", &guid);
261 if (r < 0)
262 return r;
263 else if (r > 0)
264 continue;
265
266 r = parse_address_key(&p, "path", &path);
267 if (r < 0)
268 return r;
269 else if (r > 0)
270 continue;
271
272 r = parse_address_key(&p, "abstract", &abstract);
273 if (r < 0)
274 return r;
275 else if (r > 0)
276 continue;
277
278 skip_address_key(&p);
279 }
280
281 if (!path && !abstract)
282 return -EINVAL;
283
284 if (path && abstract)
285 return -EINVAL;
286
287 if (path) {
288 size_t l;
289
290 l = strlen(path);
291 if (l > sizeof(b->sockaddr.un.sun_path))
292 return -E2BIG;
293
294 b->sockaddr.un.sun_family = AF_UNIX;
295 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
296 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
297 } else if (abstract) {
298 size_t l;
299
89ffcd2a 300 l = strlen(abstract);
de1c301e
LP
301 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
302 return -E2BIG;
303
304 b->sockaddr.un.sun_family = AF_UNIX;
305 b->sockaddr.un.sun_path[0] = 0;
89ffcd2a 306 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
de1c301e
LP
307 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
308 }
309
310 } else if (startswith(a, "tcp:")) {
311 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
312 struct addrinfo hints, *result;
313
314 p = a + 4;
89ffcd2a 315 while (*p != 0) {
de1c301e
LP
316 r = parse_address_key(&p, "guid", &guid);
317 if (r < 0)
318 return r;
319 else if (r > 0)
320 continue;
321
322 r = parse_address_key(&p, "host", &host);
323 if (r < 0)
324 return r;
325 else if (r > 0)
326 continue;
327
328 r = parse_address_key(&p, "port", &port);
329 if (r < 0)
330 return r;
331 else if (r > 0)
332 continue;
333
334 r = parse_address_key(&p, "family", &family);
335 if (r < 0)
336 return r;
337 else if (r > 0)
338 continue;
339
340 skip_address_key(&p);
341 }
342
343 if (!host || !port)
344 return -EINVAL;
345
346 zero(hints);
347 hints.ai_socktype = SOCK_STREAM;
348 hints.ai_flags = AI_ADDRCONFIG;
349
350 if (family) {
351 if (streq(family, "ipv4"))
352 hints.ai_family = AF_INET;
353 else if (streq(family, "ipv6"))
354 hints.ai_family = AF_INET6;
355 else
356 return -EINVAL;
357 }
358
359 r = getaddrinfo(host, port, &hints, &result);
360 if (r == EAI_SYSTEM)
361 return -errno;
362 else if (r != 0)
363 return -EADDRNOTAVAIL;
364
365 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
366 b->sockaddr_size = result->ai_addrlen;
367
368 freeaddrinfo(result);
369 }
370
371 if (guid) {
372 r = sd_id128_from_string(guid, &b->peer);
373 if (r < 0)
374 return r;
375 }
376
377 b->address_index = p - b->address;
378 return 1;
379}
380
381static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
382
383 while (size > 0) {
384 struct iovec *i = iov + *idx;
385
386 if (i->iov_len > size) {
387 i->iov_base = (uint8_t*) i->iov_base + size;
388 i->iov_len -= size;
389 return;
390 }
391
392 size -= i->iov_len;
393
394 i->iov_base = NULL;
395 i->iov_len = 0;
396
397 (*idx) ++;
398 }
399}
400
401static int bus_write_auth(sd_bus *b) {
402 struct msghdr mh;
403 ssize_t k;
404
405 assert(b);
406 assert(b->state == BUS_AUTHENTICATING);
407
408 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
409 return 0;
410
e3017af9
LP
411 if (b->auth_timeout == 0)
412 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
413
de1c301e
LP
414 zero(mh);
415 mh.msg_iov = b->auth_iovec + b->auth_index;
416 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
417
418 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
419 if (k < 0)
420 return errno == EAGAIN ? 0 : -errno;
421
422 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
423
424 return 1;
425}
426
427static int bus_auth_verify(sd_bus *b) {
428 char *e, *f;
429 sd_id128_t peer;
430 unsigned i;
431 int r;
432
433 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
434 * that's it */
435
436 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
437 if (!e)
438 return 0;
439
89ffcd2a 440 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
de1c301e
LP
441 if (!f)
442 return 0;
443
444 if (e - (char*) b->rbuffer != 3 + 32)
445 return -EPERM;
446
447 if (memcmp(b->rbuffer, "OK ", 3))
448 return -EPERM;
449
450 for (i = 0; i < 32; i += 2) {
451 int x, y;
452
453 x = unhexchar(((char*) b->rbuffer)[3 + i]);
89ffcd2a 454 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
de1c301e
LP
455
456 if (x < 0 || y < 0)
457 return -EINVAL;
458
459 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
460 }
461
462 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
463 !sd_id128_equal(b->peer, peer))
464 return -EPERM;
465
466 b->peer = peer;
467
468 b->can_fds =
469 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
470 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
471
e3017af9
LP
472 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
473 memmove(b->rbuffer, f + 2, b->rbuffer_size);
89ffcd2a 474
de1c301e
LP
475 r = bus_start_running(b);
476 if (r < 0)
477 return r;
478
479 return 1;
480}
481
482static int bus_read_auth(sd_bus *b) {
483 struct msghdr mh;
484 struct iovec iov;
485 size_t n;
486 ssize_t k;
487 int r;
89ffcd2a 488 void *p;
de1c301e
LP
489
490 assert(b);
491
492 r = bus_auth_verify(b);
493 if (r != 0)
494 return r;
495
496 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
25220239
LP
497
498 if (n > BUS_AUTH_SIZE_MAX)
499 n = BUS_AUTH_SIZE_MAX;
500
501 if (b->rbuffer_size >= n)
502 return -ENOBUFS;
503
89ffcd2a
LP
504 p = realloc(b->rbuffer, n);
505 if (!p)
506 return -ENOMEM;
507
508 b->rbuffer = p;
de1c301e
LP
509
510 zero(iov);
511 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
512 iov.iov_len = n - b->rbuffer_size;
513
514 zero(mh);
515 mh.msg_iov = &iov;
516 mh.msg_iovlen = 1;
517
518 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
519 if (k < 0)
520 return errno == EAGAIN ? 0 : -errno;
521
522 b->rbuffer_size += k;
523
524 r = bus_auth_verify(b);
525 if (r != 0)
526 return r;
527
e3017af9 528 return 1;
de1c301e
LP
529}
530
531static int bus_start_auth(sd_bus *b) {
89ffcd2a 532 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
de1c301e
LP
533 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
534
535 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
536 size_t l;
537
538 assert(b);
539
540 b->state = BUS_AUTHENTICATING;
541
542 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
543 char_array_0(text);
544
545 l = strlen(text);
546 b->auth_uid = hexmem(text, l);
547 if (!b->auth_uid)
548 return -ENOMEM;
549
550 b->auth_iovec[0].iov_base = (void*) auth_prefix;
551 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
552 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
553 b->auth_iovec[1].iov_len = l * 2;
554 b->auth_iovec[2].iov_base = (void*) auth_suffix;
555 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
556 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
557
558 return bus_write_auth(b);
559}
560
561static int bus_start_connect(sd_bus *b) {
562 int r;
563
564 assert(b);
565 assert(b->fd < 0);
566
567 for (;;) {
568 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
569 r = bus_parse_next_address(b);
570 if (r < 0)
571 return r;
572 if (r == 0)
e3017af9 573 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
de1c301e
LP
574 }
575
576 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
577 if (b->fd < 0) {
e3017af9 578 b->last_connect_error = errno;
de1c301e
LP
579 zero(b->sockaddr);
580 continue;
581 }
582
583 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
584 if (r < 0) {
585 if (errno == EINPROGRESS)
e3017af9 586 return 1;
de1c301e 587
e3017af9 588 b->last_connect_error = errno;
de1c301e
LP
589 close_nointr_nofail(b->fd);
590 b->fd = -1;
591 zero(b->sockaddr);
592 continue;
593 }
594
595 return bus_start_auth(b);
596 }
597}
598
599int sd_bus_open_system(sd_bus **ret) {
600 const char *e;
601 sd_bus *b;
602 int r;
603
604 if (!ret)
605 return -EINVAL;
606
607 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
608 if (e) {
609 r = sd_bus_open_address(e, &b);
610 if (r < 0)
611 return r;
89ffcd2a
LP
612 } else {
613 b = bus_new();
614 if (!b)
615 return -ENOMEM;
de1c301e 616
89ffcd2a
LP
617 b->sockaddr.un.sun_family = AF_UNIX;
618 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
619 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
de1c301e 620
89ffcd2a
LP
621 r = bus_start_connect(b);
622 if (r < 0) {
623 bus_free(b);
624 return r;
625 }
626 }
de1c301e 627
89ffcd2a 628 r = bus_send_hello(b);
de1c301e 629 if (r < 0) {
89ffcd2a 630 sd_bus_unref(b);
de1c301e
LP
631 return r;
632 }
633
634 *ret = b;
635 return 0;
636}
637
638int sd_bus_open_user(sd_bus **ret) {
639 const char *e;
640 sd_bus *b;
641 size_t l;
642 int r;
643
644 if (!ret)
645 return -EINVAL;
646
647 e = getenv("DBUS_SESSION_BUS_ADDRESS");
648 if (e) {
649 r = sd_bus_open_address(e, &b);
650 if (r < 0)
651 return r;
89ffcd2a
LP
652 } else {
653 e = getenv("XDG_RUNTIME_DIR");
654 if (!e)
655 return -ENOENT;
de1c301e 656
89ffcd2a
LP
657 l = strlen(e);
658 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
659 return -E2BIG;
de1c301e 660
89ffcd2a
LP
661 b = bus_new();
662 if (!b)
663 return -ENOMEM;
de1c301e 664
89ffcd2a
LP
665 b->sockaddr.un.sun_family = AF_UNIX;
666 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
667 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
de1c301e 668
89ffcd2a
LP
669 r = bus_start_connect(b);
670 if (r < 0) {
671 bus_free(b);
672 return r;
673 }
674 }
de1c301e 675
89ffcd2a 676 r = bus_send_hello(b);
de1c301e 677 if (r < 0) {
89ffcd2a 678 sd_bus_unref(b);
de1c301e
LP
679 return r;
680 }
681
682 *ret = b;
683 return 0;
684}
685
686int sd_bus_open_address(const char *address, sd_bus **ret) {
687 sd_bus *b;
688 int r;
689
690 if (!address)
691 return -EINVAL;
692 if (!ret)
693 return -EINVAL;
694
695 b = bus_new();
696 if (!b)
697 return -ENOMEM;
698
699 b->address = strdup(address);
700 if (!b->address) {
701 bus_free(b);
702 return -ENOMEM;
703 }
704
705 r = bus_start_connect(b);
706 if (r < 0) {
707 bus_free(b);
708 return r;
709 }
710
711 *ret = b;
712 return 0;
713}
714
715int sd_bus_open_fd(int fd, sd_bus **ret) {
716 sd_bus *b;
717 int r;
718
719 if (fd < 0)
720 return -EINVAL;
721 if (!ret)
722 return -EINVAL;
723
724 b = bus_new();
725 if (!b)
726 return -ENOMEM;
727
728 b->fd = fd;
729 fd_nonblock(b->fd, true);
730 fd_cloexec(b->fd, true);
731
732 r = bus_start_auth(b);
733 if (r < 0) {
734 bus_free(b);
735 return r;
736 }
737
738 *ret = b;
739 return 0;
740}
741
742void sd_bus_close(sd_bus *bus) {
743 if (!bus)
744 return;
745 if (bus->fd < 0)
746 return;
747
748 close_nointr_nofail(bus->fd);
749 bus->fd = -1;
750}
751
752sd_bus *sd_bus_ref(sd_bus *bus) {
753 if (!bus)
754 return NULL;
755
756 assert(bus->n_ref > 0);
757
758 bus->n_ref++;
759 return bus;
760}
761
762sd_bus *sd_bus_unref(sd_bus *bus) {
763 if (!bus)
764 return NULL;
765
766 assert(bus->n_ref > 0);
767 bus->n_ref--;
768
769 if (bus->n_ref <= 0)
770 bus_free(bus);
771
772 return NULL;
773}
774
e3017af9
LP
775int sd_bus_is_open(sd_bus *bus) {
776 if (!bus)
777 return -EINVAL;
778
779 return bus->fd >= 0;
780}
781
d728d708
LP
782int sd_bus_can_send(sd_bus *bus, char type) {
783 int r;
784
de1c301e
LP
785 if (!bus)
786 return -EINVAL;
787
d728d708
LP
788 if (type == SD_BUS_TYPE_UNIX_FD) {
789 r = ensure_running(bus);
790 if (r < 0)
791 return r;
de1c301e 792
d728d708
LP
793 return bus->can_fds;
794 }
795
796 return bus_type_is_valid(type);
de1c301e
LP
797}
798
d728d708
LP
799int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
800 int r;
de1c301e
LP
801
802 if (!bus)
803 return -EINVAL;
d728d708
LP
804 if (!peer)
805 return -EINVAL;
de1c301e 806
d728d708
LP
807 r = ensure_running(bus);
808 if (r < 0)
809 return r;
de1c301e 810
d728d708
LP
811 *peer = bus->peer;
812 return 0;
de1c301e
LP
813}
814
815static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
816 assert(m);
817
89ffcd2a
LP
818 if (m->header->version > b->message_version)
819 return -EPERM;
820
de1c301e
LP
821 if (m->sealed)
822 return 0;
823
9a17484d 824 return bus_message_seal(m, ++b->serial);
de1c301e
LP
825}
826
827static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
828 struct msghdr mh;
829 struct iovec *iov;
830 ssize_t k;
831 size_t n;
832 unsigned j;
833
834 assert(bus);
835 assert(m);
836 assert(idx);
89ffcd2a 837 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 838
e3017af9
LP
839 if (*idx >= m->size)
840 return 0;
841
de1c301e
LP
842 n = m->n_iovec * sizeof(struct iovec);
843 iov = alloca(n);
844 memcpy(iov, m->iovec, n);
845
846 j = 0;
847 iovec_advance(iov, &j, *idx);
848
849 zero(mh);
850 mh.msg_iov = iov;
851 mh.msg_iovlen = m->n_iovec;
852
853 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
854 if (k < 0)
e3017af9 855 return errno == EAGAIN ? 0 : -errno;
de1c301e
LP
856
857 *idx += (size_t) k;
e3017af9 858 return 1;
de1c301e
LP
859}
860
861static int message_read_need(sd_bus *bus, size_t *need) {
862 uint32_t a, b;
863 uint8_t e;
25220239 864 uint64_t sum;
de1c301e
LP
865
866 assert(bus);
867 assert(need);
89ffcd2a 868 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 869
89ffcd2a 870 if (bus->rbuffer_size < sizeof(struct bus_header)) {
e3017af9
LP
871 *need = sizeof(struct bus_header) + 8;
872
873 /* Minimum message size:
874 *
875 * Header +
876 *
877 * Method Call: +2 string headers
878 * Signal: +3 string headers
879 * Method Error: +1 string headers
880 * +1 uint32 headers
881 * Method Reply: +1 uint32 headers
882 *
883 * A string header is at least 9 bytes
884 * A uint32 header is at least 8 bytes
885 *
886 * Hence the minimum message size of a valid message
887 * is header + 8 bytes */
888
de1c301e
LP
889 return 0;
890 }
891
892 a = ((const uint32_t*) bus->rbuffer)[1];
893 b = ((const uint32_t*) bus->rbuffer)[3];
894
895 e = ((const uint8_t*) bus->rbuffer)[0];
896 if (e == SD_BUS_LITTLE_ENDIAN) {
897 a = le32toh(a);
898 b = le32toh(b);
899 } else if (e == SD_BUS_BIG_ENDIAN) {
900 a = be32toh(a);
901 b = be32toh(b);
902 } else
89ffcd2a 903 return -EBADMSG;
de1c301e 904
25220239
LP
905 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
906 if (sum >= BUS_MESSAGE_SIZE_MAX)
907 return -ENOBUFS;
908
909 *need = (size_t) sum;
de1c301e
LP
910 return 0;
911}
912
913static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
914 sd_bus_message *t;
915 void *b = NULL;
916 int r;
917
918 assert(bus);
919 assert(m);
920 assert(bus->rbuffer_size >= size);
89ffcd2a 921 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 922
de1c301e
LP
923 if (bus->rbuffer_size > size) {
924 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
925 if (!b) {
926 free(t);
927 return -ENOMEM;
928 }
929 }
930
80a46c73
LP
931 r = bus_message_from_malloc(bus->rbuffer, size, &t);
932 if (r < 0) {
933 free(b);
934 return r;
935 }
de1c301e
LP
936
937 bus->rbuffer = b;
938 bus->rbuffer_size -= size;
939
de1c301e
LP
940 *m = t;
941 return 1;
942}
943
944static int message_read(sd_bus *bus, sd_bus_message **m) {
945 struct msghdr mh;
946 struct iovec iov;
947 ssize_t k;
948 size_t need;
949 int r;
950 void *b;
951
952 assert(bus);
953 assert(m);
89ffcd2a 954 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
955
956 r = message_read_need(bus, &need);
957 if (r < 0)
958 return r;
959
960 if (bus->rbuffer_size >= need)
961 return message_make(bus, need, m);
962
963 b = realloc(bus->rbuffer, need);
964 if (!b)
965 return -ENOMEM;
966
89ffcd2a
LP
967 bus->rbuffer = b;
968
de1c301e
LP
969 zero(iov);
970 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
971 iov.iov_len = need - bus->rbuffer_size;
972
973 zero(mh);
974 mh.msg_iov = &iov;
975 mh.msg_iovlen = 1;
976
977 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
978 if (k < 0)
979 return errno == EAGAIN ? 0 : -errno;
980
981 bus->rbuffer_size += k;
982
983 r = message_read_need(bus, &need);
984 if (r < 0)
985 return r;
986
987 if (bus->rbuffer_size >= need)
988 return message_make(bus, need, m);
989
e3017af9 990 return 1;
de1c301e
LP
991}
992
993static int dispatch_wqueue(sd_bus *bus) {
e3017af9 994 int r, ret = 0;
de1c301e
LP
995
996 assert(bus);
89ffcd2a 997 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
998
999 if (bus->fd < 0)
1000 return -ENOTCONN;
1001
1002 while (bus->wqueue_size > 0) {
1003
1004 r = message_write(bus, bus->wqueue[0], &bus->windex);
1005 if (r < 0) {
1006 sd_bus_close(bus);
1007 return r;
1008 } else if (r == 0)
e3017af9
LP
1009 /* Didn't do anything this time */
1010 return ret;
1011 else if (bus->windex >= bus->wqueue[0]->size) {
de1c301e
LP
1012 /* Fully written. Let's drop the entry from
1013 * the queue.
1014 *
1015 * This isn't particularly optimized, but
1016 * well, this is supposed to be our worst-case
1017 * buffer only, and the socket buffer is
1018 * supposed to be our primary buffer, and if
1019 * it got full, then all bets are off
1020 * anyway. */
1021
1022 sd_bus_message_unref(bus->wqueue[0]);
1023 bus->wqueue_size --;
1024 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1025 bus->windex = 0;
1026
e3017af9 1027 ret = 1;
de1c301e
LP
1028 }
1029 }
1030
e3017af9 1031 return ret;
de1c301e
LP
1032}
1033
1034static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
e3017af9
LP
1035 sd_bus_message *z;
1036 int r, ret = 0;
de1c301e
LP
1037
1038 assert(bus);
1039 assert(m);
89ffcd2a 1040 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
1041
1042 if (bus->fd < 0)
1043 return -ENOTCONN;
1044
1045 if (bus->rqueue_size > 0) {
1046 /* Dispatch a queued message */
1047
1048 *m = bus->rqueue[0];
1049 bus->rqueue_size --;
1050 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1051 return 1;
1052 }
1053
1054 /* Try to read a new message */
e3017af9
LP
1055 do {
1056 r = message_read(bus, &z);
1057 if (r < 0) {
1058 sd_bus_close(bus);
1059 return r;
1060 }
1061 if (r == 0)
1062 return ret;
de1c301e 1063
e3017af9
LP
1064 r = 1;
1065 } while (!z);
1066
1067 *m = z;
1068 return 1;
de1c301e
LP
1069}
1070
1071int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1072 int r;
1073
1074 if (!bus)
1075 return -EINVAL;
1076 if (bus->fd < 0)
1077 return -ENOTCONN;
1078 if (!m)
1079 return -EINVAL;
de1c301e 1080
29f6aadd
LP
1081 /* If the serial number isn't kept, then we know that no reply
1082 * is expected */
1083 if (!serial && !m->sealed)
1084 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1085
de1c301e
LP
1086 r = bus_seal_message(bus, m);
1087 if (r < 0)
1088 return r;
1089
5407f2de
LP
1090 /* If this is a reply and no reply was requested, then let's
1091 * suppress this, if we can */
1092 if (m->dont_send && !serial)
1093 return 0;
1094
89ffcd2a 1095 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
de1c301e
LP
1096 size_t idx = 0;
1097
1098 r = message_write(bus, m, &idx);
1099 if (r < 0) {
1100 sd_bus_close(bus);
1101 return r;
e3017af9 1102 } else if (idx < m->size) {
de1c301e
LP
1103 /* Wasn't fully written. So let's remember how
1104 * much was written. Note that the first entry
1105 * of the wqueue array is always allocated so
1106 * that we always can remember how much was
1107 * written. */
1108 bus->wqueue[0] = sd_bus_message_ref(m);
1109 bus->wqueue_size = 1;
1110 bus->windex = idx;
1111 }
1112 } else {
1113 sd_bus_message **q;
1114
1115 /* Just append it to the queue. */
1116
25220239 1117 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
de1c301e
LP
1118 return -ENOBUFS;
1119
1120 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1121 if (!q)
1122 return -ENOMEM;
1123
1124 bus->wqueue = q;
1125 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1126 }
1127
1128 if (serial)
1129 *serial = BUS_MESSAGE_SERIAL(m);
1130
1131 return 0;
1132}
1133
1134static usec_t calc_elapse(uint64_t usec) {
1135 if (usec == (uint64_t) -1)
1136 return 0;
1137
1138 if (usec == 0)
e3017af9 1139 usec = BUS_DEFAULT_TIMEOUT;
de1c301e
LP
1140
1141 return now(CLOCK_MONOTONIC) + usec;
1142}
1143
e3017af9
LP
1144static int timeout_compare(const void *a, const void *b) {
1145 const struct reply_callback *x = a, *y = b;
1146
1147 if (x->timeout != 0 && y->timeout == 0)
1148 return -1;
1149
1150 if (x->timeout == 0 && y->timeout != 0)
1151 return 1;
1152
1153 if (x->timeout < y->timeout)
1154 return -1;
1155
1156 if (x->timeout > y->timeout)
1157 return 1;
1158
1159 return 0;
1160}
1161
de1c301e
LP
1162int sd_bus_send_with_reply(
1163 sd_bus *bus,
1164 sd_bus_message *m,
1165 sd_message_handler_t callback,
1166 void *userdata,
1167 uint64_t usec,
1168 uint64_t *serial) {
1169
1170 struct reply_callback *c;
1171 int r;
1172
1173 if (!bus)
1174 return -EINVAL;
89ffcd2a 1175 if (bus->fd < 0)
de1c301e
LP
1176 return -ENOTCONN;
1177 if (!m)
1178 return -EINVAL;
1179 if (!callback)
1180 return -EINVAL;
89ffcd2a 1181 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
de1c301e 1182 return -EINVAL;
89ffcd2a
LP
1183 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1184 return -EINVAL;
1185
1186 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1187 if (r < 0)
1188 return r;
de1c301e 1189
e3017af9
LP
1190 if (usec != (uint64_t) -1) {
1191 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1192 if (r < 0)
1193 return r;
1194 }
1195
de1c301e
LP
1196 r = bus_seal_message(bus, m);
1197 if (r < 0)
1198 return r;
1199
1200 c = new(struct reply_callback, 1);
1201 if (!c)
1202 return -ENOMEM;
1203
1204 c->callback = callback;
1205 c->userdata = userdata;
1206 c->serial = BUS_MESSAGE_SERIAL(m);
1207 c->timeout = calc_elapse(usec);
1208
1209 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1210 if (r < 0) {
1211 free(c);
1212 return r;
1213 }
1214
e3017af9
LP
1215 if (c->timeout != 0) {
1216 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1217 if (r < 0) {
1218 c->timeout = 0;
1219 sd_bus_send_with_reply_cancel(bus, c->serial);
1220 return r;
1221 }
1222 }
1223
de1c301e
LP
1224 r = sd_bus_send(bus, m, serial);
1225 if (r < 0) {
e3017af9 1226 sd_bus_send_with_reply_cancel(bus, c->serial);
de1c301e
LP
1227 return r;
1228 }
1229
1230 return r;
1231}
1232
1233int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
e3017af9 1234 struct reply_callback *c;
de1c301e
LP
1235
1236 if (!bus)
1237 return -EINVAL;
1238 if (serial == 0)
1239 return -EINVAL;
1240
1241 c = hashmap_remove(bus->reply_callbacks, &serial);
1242 if (!c)
1243 return 0;
1244
e3017af9
LP
1245 if (c->timeout != 0)
1246 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1247
de1c301e
LP
1248 free(c);
1249 return 1;
1250}
1251
89ffcd2a
LP
1252static int ensure_running(sd_bus *bus) {
1253 int r;
1254
1255 assert(bus);
1256
d728d708
LP
1257 if (bus->state == BUS_RUNNING)
1258 return 1;
89ffcd2a
LP
1259
1260 for (;;) {
1261 r = sd_bus_process(bus, NULL);
1262 if (r < 0)
1263 return r;
d728d708
LP
1264 if (bus->state == BUS_RUNNING)
1265 return 1;
e3017af9
LP
1266 if (r > 0)
1267 continue;
89ffcd2a
LP
1268
1269 r = sd_bus_wait(bus, (uint64_t) -1);
1270 if (r < 0)
1271 return r;
1272 }
1273}
1274
de1c301e
LP
1275int sd_bus_send_with_reply_and_block(
1276 sd_bus *bus,
1277 sd_bus_message *m,
1278 uint64_t usec,
1279 sd_bus_error *error,
1280 sd_bus_message **reply) {
1281
1282 int r;
1283 usec_t timeout;
1284 uint64_t serial;
1285 bool room = false;
1286
1287 if (!bus)
1288 return -EINVAL;
89ffcd2a 1289 if (bus->fd < 0)
de1c301e
LP
1290 return -ENOTCONN;
1291 if (!m)
1292 return -EINVAL;
89ffcd2a 1293 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
de1c301e 1294 return -EINVAL;
89ffcd2a 1295 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
de1c301e 1296 return -EINVAL;
89ffcd2a
LP
1297 if (bus_error_is_dirty(error))
1298 return -EINVAL;
1299
1300 r = ensure_running(bus);
1301 if (r < 0)
1302 return r;
de1c301e
LP
1303
1304 r = sd_bus_send(bus, m, &serial);
1305 if (r < 0)
1306 return r;
1307
1308 timeout = calc_elapse(usec);
1309
1310 for (;;) {
1311 usec_t left;
e3017af9 1312 sd_bus_message *incoming = NULL;
de1c301e
LP
1313
1314 if (!room) {
1315 sd_bus_message **q;
1316
25220239
LP
1317 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1318 return -ENOBUFS;
1319
de1c301e
LP
1320 /* Make sure there's room for queuing this
1321 * locally, before we read the message */
1322
1323 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1324 if (!q)
1325 return -ENOMEM;
1326
1327 bus->rqueue = q;
1328 room = true;
1329 }
1330
1331 r = message_read(bus, &incoming);
1332 if (r < 0)
1333 return r;
e3017af9 1334 if (incoming) {
89ffcd2a 1335
de1c301e
LP
1336 if (incoming->reply_serial == serial) {
1337 /* Found a match! */
1338
1339 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1340 *reply = incoming;
1341 return 0;
1342 }
1343
1344 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1345 int k;
1346
1347 r = sd_bus_error_copy(error, &incoming->error);
1348 if (r < 0) {
1349 sd_bus_message_unref(incoming);
1350 return r;
1351 }
1352
1353 k = bus_error_to_errno(&incoming->error);
1354 sd_bus_message_unref(incoming);
1355 return k;
1356 }
1357
1358 sd_bus_message_unref(incoming);
1359 return -EIO;
1360 }
1361
1362 /* There's already guaranteed to be room for
1363 * this, so need to resize things here */
1364 bus->rqueue[bus->rqueue_size ++] = incoming;
1365 room = false;
1366
1367 /* Try to read more, right-away */
1368 continue;
1369 }
e3017af9
LP
1370 if (r != 0)
1371 continue;
de1c301e
LP
1372
1373 if (timeout > 0) {
1374 usec_t n;
1375
1376 n = now(CLOCK_MONOTONIC);
1377 if (n >= timeout)
1378 return -ETIMEDOUT;
1379
1380 left = timeout - n;
1381 } else
1382 left = (uint64_t) -1;
1383
e3017af9 1384 r = bus_poll(bus, true, left);
de1c301e
LP
1385 if (r < 0)
1386 return r;
1387
1388 r = dispatch_wqueue(bus);
1389 if (r < 0)
1390 return r;
1391 }
1392}
1393
1394int sd_bus_get_fd(sd_bus *bus) {
1395 if (!bus)
1396 return -EINVAL;
1397
1398 if (bus->fd < 0)
89ffcd2a 1399 return -ENOTCONN;
de1c301e
LP
1400
1401 return bus->fd;
1402}
1403
1404int sd_bus_get_events(sd_bus *bus) {
1405 int flags = 0;
1406
1407 if (!bus)
1408 return -EINVAL;
de1c301e 1409 if (bus->fd < 0)
89ffcd2a 1410 return -ENOTCONN;
de1c301e
LP
1411
1412 if (bus->state == BUS_OPENING)
1413 flags |= POLLOUT;
89ffcd2a
LP
1414 else if (bus->state == BUS_AUTHENTICATING) {
1415
1416 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1417 flags |= POLLOUT;
1418
1419 flags |= POLLIN;
1420
1421 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
de1c301e
LP
1422 if (bus->rqueue_size <= 0)
1423 flags |= POLLIN;
1424 if (bus->wqueue_size > 0)
1425 flags |= POLLOUT;
1426 }
1427
1428 return flags;
1429}
1430
e3017af9
LP
1431int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1432 struct reply_callback *c;
1433
1434 if (!bus)
1435 return -EINVAL;
1436 if (!timeout_usec)
1437 return -EINVAL;
1438 if (bus->fd < 0)
1439 return -ENOTCONN;
1440
1441 if (bus->state == BUS_AUTHENTICATING) {
1442 *timeout_usec = bus->auth_timeout;
1443 return 1;
1444 }
1445
1446 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1447 return 0;
1448
1449 c = prioq_peek(bus->reply_callbacks_prioq);
1450 if (!c)
1451 return 0;
1452
1453 *timeout_usec = c->timeout;
1454 return 1;
1455}
1456
1457static int process_timeout(sd_bus *bus) {
1458 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1459 struct reply_callback *c;
1460 usec_t n;
1461 int r;
1462
1463 assert(bus);
1464
1465 c = prioq_peek(bus->reply_callbacks_prioq);
1466 if (!c)
1467 return 0;
1468
1469 n = now(CLOCK_MONOTONIC);
1470 if (c->timeout > n)
1471 return 0;
1472
1473 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1474 hashmap_remove(bus->reply_callbacks, &c->serial);
1475
1476 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1477 free(c);
1478
1479 return r < 0 ? r : 1;
1480}
1481
b9bf7e2b
LP
1482static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1483 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1484 int r;
1485
1486 assert(bus);
1487 assert(m);
1488
1489 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1490 return 0;
1491
1492 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1493 return 0;
1494
1495 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1496 return 1;
1497
1498 if (streq_ptr(m->member, "Ping"))
1499 r = sd_bus_message_new_method_return(bus, m, &reply);
1500 else if (streq_ptr(m->member, "GetMachineId")) {
1501 sd_id128_t id;
1502 char sid[33];
1503
1504 r = sd_id128_get_machine(&id);
1505 if (r < 0)
1506 return r;
1507
1508 r = sd_bus_message_new_method_return(bus, m, &reply);
1509 if (r < 0)
1510 return r;
1511
1512 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1513 } else {
1514 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1515
1516 sd_bus_error_set(&error,
1517 "org.freedesktop.DBus.Error.UnknownMethod",
1518 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1519
1520 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1521 }
1522
1523 if (r < 0)
1524 return r;
1525
1526 r = sd_bus_send(bus, reply, NULL);
1527 if (r < 0)
1528 return r;
1529
1530 return 1;
1531}
1532
e3017af9
LP
1533static int process_message(sd_bus *bus, sd_bus_message *m) {
1534 struct filter_callback *l;
1535 int r;
1536
1537 assert(bus);
1538 assert(m);
1539
b9bf7e2b 1540 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
e3017af9
LP
1541 struct reply_callback *c;
1542
1543 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1544 if (c) {
1545 if (c->timeout != 0)
1546 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1547
1548 r = c->callback(bus, 0, m, c->userdata);
1549 free(c);
1550
1551 if (r != 0)
1552 return r;
1553 }
1554 }
1555
1556 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1557 r = l->callback(bus, 0, m, l->userdata);
1558 if (r != 0)
1559 return r;
1560 }
1561
b9bf7e2b 1562 return process_builtin(bus, m);
e3017af9
LP
1563}
1564
de1c301e 1565int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
de1c301e
LP
1566 int r;
1567
e3017af9
LP
1568 /* Returns 0 when we didn't do anything. This should cause the
1569 * caller to invoke sd_bus_wait() before returning the next
1570 * time. Returns > 0 when we did something, which possibly
1571 * means *ret is filled in with an unprocessed message. */
1572
de1c301e
LP
1573 if (!bus)
1574 return -EINVAL;
1575 if (bus->fd < 0)
1576 return -ENOTCONN;
1577
1578 if (bus->state == BUS_OPENING) {
1579 struct pollfd p;
1580
1581 zero(p);
1582 p.fd = bus->fd;
1583 p.events = POLLOUT;
1584
1585 r = poll(&p, 1, 0);
1586 if (r < 0)
1587 return -errno;
1588
1589 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
89ffcd2a 1590 int error = 0;
de1c301e
LP
1591 socklen_t slen = sizeof(error);
1592
1593 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1594 if (r < 0)
e3017af9
LP
1595 bus->last_connect_error = errno;
1596 else if (error != 0)
1597 bus->last_connect_error = error;
de1c301e 1598 else if (p.revents & (POLLERR|POLLHUP))
e3017af9
LP
1599 bus->last_connect_error = ECONNREFUSED;
1600 else {
1601 r = bus_start_auth(bus);
1602 goto null_message;
1603 }
de1c301e
LP
1604
1605 /* Try next address */
e3017af9
LP
1606 r = bus_start_connect(bus);
1607 goto null_message;
de1c301e
LP
1608 }
1609
e3017af9
LP
1610 r = 0;
1611 goto null_message;
de1c301e
LP
1612
1613 } else if (bus->state == BUS_AUTHENTICATING) {
1614
e3017af9
LP
1615 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1616 return -ETIMEDOUT;
1617
de1c301e 1618 r = bus_write_auth(bus);
e3017af9
LP
1619 if (r != 0)
1620 goto null_message;
de1c301e
LP
1621
1622 r = bus_read_auth(bus);
e3017af9 1623 goto null_message;
de1c301e
LP
1624
1625 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
89ffcd2a 1626 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
e3017af9
LP
1627 int k;
1628
1629 r = process_timeout(bus);
1630 if (r != 0)
1631 goto null_message;
de1c301e
LP
1632
1633 r = dispatch_wqueue(bus);
e3017af9
LP
1634 if (r != 0)
1635 goto null_message;
de1c301e 1636
e3017af9 1637 k = r;
de1c301e 1638 r = dispatch_rqueue(bus, &m);
e3017af9 1639 if (r < 0)
de1c301e 1640 return r;
e3017af9
LP
1641 if (!m) {
1642 if (r == 0)
1643 r = k;
1644 goto null_message;
de1c301e
LP
1645 }
1646
e3017af9
LP
1647 r = process_message(bus, m);
1648 if (r != 0)
1649 goto null_message;
de1c301e
LP
1650
1651 if (ret) {
1652 *ret = m;
89ffcd2a 1653 m = NULL;
de1c301e
LP
1654 return 1;
1655 }
1656
b9bf7e2b 1657 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
89ffcd2a 1658 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
b9bf7e2b
LP
1659 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1660
1661 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
89ffcd2a 1662
b9bf7e2b 1663 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
89ffcd2a
LP
1664 if (r < 0)
1665 return r;
1666
1667 r = sd_bus_send(bus, reply, NULL);
1668 if (r < 0)
1669 return r;
1670 }
1671
e3017af9 1672 return 1;
de1c301e
LP
1673 }
1674
89ffcd2a 1675 assert_not_reached("Unknown state");
e3017af9
LP
1676
1677null_message:
1678 if (r >= 0 && ret)
1679 *ret = NULL;
1680
1681 return r;
de1c301e
LP
1682}
1683
e3017af9 1684static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
de1c301e
LP
1685 struct pollfd p;
1686 int r, e;
1687 struct timespec ts;
e3017af9
LP
1688 usec_t until, m;
1689
1690 assert(bus);
de1c301e 1691
de1c301e 1692 if (bus->fd < 0)
89ffcd2a
LP
1693 return -ENOTCONN;
1694
de1c301e
LP
1695 e = sd_bus_get_events(bus);
1696 if (e < 0)
1697 return e;
1698
e3017af9
LP
1699 if (need_more)
1700 e |= POLLIN;
1701
1702 r = sd_bus_get_timeout(bus, &until);
1703 if (r < 0)
1704 return r;
1705 if (r == 0)
1706 m = (uint64_t) -1;
1707 else {
1708 usec_t n;
1709 n = now(CLOCK_MONOTONIC);
1710 m = until > n ? until - n : 0;
1711 }
1712
1713 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1714 m = timeout_usec;
1715
de1c301e
LP
1716 zero(p);
1717 p.fd = bus->fd;
1718 p.events = e;
1719
e3017af9 1720 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
de1c301e 1721 if (r < 0)
89ffcd2a 1722 return -errno;
de1c301e 1723
e3017af9
LP
1724 return r > 0 ? 1 : 0;
1725}
1726
1727int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1728
1729 if (!bus)
1730 return -EINVAL;
1731 if (bus->fd < 0)
1732 return -ENOTCONN;
1733 if (bus->rqueue_size > 0)
1734 return 0;
1735
1736 return bus_poll(bus, false, timeout_usec);
de1c301e
LP
1737}
1738
1739int sd_bus_flush(sd_bus *bus) {
1740 int r;
1741
1742 if (!bus)
1743 return -EINVAL;
1744 if (bus->fd < 0)
1745 return -ENOTCONN;
1746
89ffcd2a
LP
1747 r = ensure_running(bus);
1748 if (r < 0)
1749 return r;
1750
1751 if (bus->wqueue_size <= 0)
de1c301e
LP
1752 return 0;
1753
1754 for (;;) {
1755 r = dispatch_wqueue(bus);
1756 if (r < 0)
1757 return r;
1758
89ffcd2a 1759 if (bus->wqueue_size <= 0)
de1c301e
LP
1760 return 0;
1761
e3017af9 1762 r = bus_poll(bus, false, (uint64_t) -1);
de1c301e
LP
1763 if (r < 0)
1764 return r;
1765 }
1766}
1767
1768int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1769 struct filter_callback *f;
1770
1771 if (!bus)
1772 return -EINVAL;
1773 if (!callback)
1774 return -EINVAL;
1775
1776 f = new(struct filter_callback, 1);
1777 if (!f)
1778 return -ENOMEM;
1779 f->callback = callback;
1780 f->userdata = userdata;
1781
1782 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1783 return 0;
1784}
1785
1786int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1787 struct filter_callback *f;
1788
1789 if (!bus)
1790 return -EINVAL;
1791 if (!callback)
1792 return -EINVAL;
1793
1794 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1795 if (f->callback == callback && f->userdata == userdata) {
1796 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1797 free(f);
1798 return 1;
1799 }
1800 }
1801
1802 return 0;
1803}